Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/config/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (c KafkaIngestConfiguration) Validate() error {
type KafkaConfiguration struct {
Broker string
SecurityProtocol string
TLSInsecure bool
SaslMechanisms string
SaslUsername string
Comment on lines 61 to 66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

TLSInsecure is undocumented and not wired into non-sarama (librdkafka) config; add docs, default, and validation

  • Add a clear comment on what “insecure” means (skips cert and hostname verification).
  • Set an explicit Viper default for discoverability: ingest.kafka.tlsInsecure = false.
  • Validate that TLSInsecure is only used with SSL/SASL_SSL; otherwise return an error to avoid silent misconfigurations.
  • If you intend TLSInsecure to apply across both drivers, wire it into the confluent-kafka-go ConfigMap: disable certificate and hostname verification when set.

Apply these diffs and additions:

 type KafkaConfiguration struct {
   Broker           string
   SecurityProtocol string
-  TLSInsecure      bool
+  // TLSInsecure disables TLS certificate and hostname verification.
+  // WARNING: Do not use in production. Intended for local dev/testing with self-signed certs.
+  TLSInsecure      bool
   SaslMechanisms   string
   SaslUsername     string
   SaslPassword     string
 func ConfigureIngest(v *viper.Viper) {
   v.SetDefault("ingest.kafka.broker", "127.0.0.1:29092")
   v.SetDefault("ingest.kafka.securityProtocol", "")
+  v.SetDefault("ingest.kafka.tlsInsecure", false)
   v.SetDefault("ingest.kafka.saslMechanisms", "")
   v.SetDefault("ingest.kafka.saslUsername", "")
   v.SetDefault("ingest.kafka.saslPassword", "")

Outside the changed hunk (CreateKafkaConfig), wire TLSInsecure into librdkafka config:

// Add near where security.protocol is set
if c.SecurityProtocol == "SSL" || c.SecurityProtocol == "SASL_SSL" {
    if c.TLSInsecure {
        // Disable peer cert verification and hostname verification for librdkafka
        // Ref: librdkafka configs: enable.ssl.certificate.verification=false, ssl.endpoint.identification.algorithm=none
        config["enable.ssl.certificate.verification"] = false
        config["ssl.endpoint.identification.algorithm"] = "none"
    }
}

And in validation:

if c.TLSInsecure && c.SecurityProtocol != "SSL" && c.SecurityProtocol != "SASL_SSL" {
    errs = append(errs, errors.New("tlsInsecure requires securityProtocol to be SSL or SASL_SSL"))
}

To confirm TLSInsecure is consistently recognized in config files and envs, run:


🏁 Script executed:

#!/bin/bash
# Verify config keys and usage
rg -n --no-heading $'tlsInsecure|TLSInsecure|enable\\.ssl\\.certificate\\.verification|ssl\\.endpoint\\.identification\\.algorithm'

Length of output: 294


🏁 Script executed:

#!/bin/bash
set -euo pipefail
echo "== ingest.go =="
sed -n '1,240p' app/config/ingest.go || true
echo
echo "== broker.go (context around match) =="
sed -n '1,240p' openmeter/watermill/driver/kafka/broker.go || true

Length of output: 8574


Action required: document TLSInsecure, add default & validation, wire it into librdkafka config, and fix Sarama TLS handling

Verified: TLSInsecure is declared in app/config/ingest.go and referenced only in openmeter/watermill/driver/kafka/broker.go. It is not wired into the confluent-librdkafka config (CreateKafkaConfig). The sarama code currently enables TLS only when TLSInsecure is true (bug). Apply the fixes below.

Files to change:

  • app/config/ingest.go — add docs on TLSInsecure, viper default, validation, and wire into CreateKafkaConfig for librdkafka.
  • openmeter/watermill/driver/kafka/broker.go — enable TLS for SSL/SASL_SSL and set InsecureSkipVerify when TLSInsecure is true; remove the current incorrect conditional.

Suggested diffs:

  1. Add doc to KafkaConfiguration.TLSInsecure
 type KafkaConfiguration struct {
   Broker           string
   SecurityProtocol string
-  TLSInsecure      bool
+  // TLSInsecure disables TLS certificate and hostname verification.
+  // WARNING: Do NOT use in production. Intended only for local dev/testing with self-signed certs.
+  TLSInsecure      bool
   SaslMechanisms   string
   SaslUsername     string
   SaslPassword     string
  1. Set Viper default in ConfigureIngest
 func ConfigureIngest(v *viper.Viper) {
   v.SetDefault("ingest.kafka.broker", "127.0.0.1:29092")
   v.SetDefault("ingest.kafka.securityProtocol", "")
+  v.SetDefault("ingest.kafka.tlsInsecure", false)
   v.SetDefault("ingest.kafka.saslMechanisms", "")
   v.SetDefault("ingest.kafka.saslUsername", "")
   v.SetDefault("ingest.kafka.saslPassword", "")
  1. Validate TLSInsecure usage
 func (c KafkaConfiguration) Validate() error {
   var errs []error
   ...
+  if c.TLSInsecure && c.SecurityProtocol != "SSL" && c.SecurityProtocol != "SASL_SSL" {
+      errs = append(errs, errors.New("tlsInsecure requires securityProtocol to be SSL or SASL_SSL"))
+  }
   return errors.Join(errs...)
 }
  1. Wire TLSInsecure into confluent-kafka-go (CreateKafkaConfig)
 	if c.SecurityProtocol != "" {
 		config["security.protocol"] = c.SecurityProtocol
 	}
+	// If TLSInsecure is enabled, tell librdkafka to skip cert & hostname verification.
+	if c.SecurityProtocol == "SSL" || c.SecurityProtocol == "SASL_SSL" {
+		if c.TLSInsecure {
+			// librdkafka configs:
+			// enable.ssl.certificate.verification=false
+			// ssl.endpoint.identification.algorithm=none
+			config["enable.ssl.certificate.verification"] = false
+			config["ssl.endpoint.identification.algorithm"] = "none"
+		}
+	}
  1. Fix Sarama broker TLS handling (openmeter/watermill/driver/kafka/broker.go)
-	if o.KafkaConfig.SecurityProtocol == "SSL" && o.KafkaConfig.TLSInsecure {
-		config.Net.TLS.Enable = true
-	}
-
-	if o.KafkaConfig.SecurityProtocol == "SASL_SSL" {
-		config.Net.SASL.Enable = true
-		config.Net.SASL.Handshake = true
-
-		config.Net.TLS.Enable = true
-		config.Net.TLS.Config = &tls.Config{}
-
-		switch o.KafkaConfig.SaslMechanisms {
-		case "PLAIN":
-			config.Net.SASL.User = o.KafkaConfig.SaslUsername
-			config.Net.SASL.Password = o.KafkaConfig.SaslPassword
-			config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
-		default:
-			return nil, fmt.Errorf("unsupported SASL mechanism: %s", o.KafkaConfig.SaslMechanisms)
-		}
-	}
+	// Enable TLS for SSL and SASL_SSL protocols.
+	if o.KafkaConfig.SecurityProtocol == "SSL" || o.KafkaConfig.SecurityProtocol == "SASL_SSL" {
+		config.Net.TLS.Enable = true
+		config.Net.TLS.Config = &tls.Config{}
+		if o.KafkaConfig.TLSInsecure {
+			// WARNING: insecure — skips certificate and hostname verification
+			config.Net.TLS.Config.InsecureSkipVerify = true
+		}
+	}
+
+	if o.KafkaConfig.SecurityProtocol == "SASL_SSL" {
+		config.Net.SASL.Enable = true
+		config.Net.SASL.Handshake = true
+
+		switch o.KafkaConfig.SaslMechanisms {
+		case "PLAIN":
+			config.Net.SASL.User = o.KafkaConfig.SaslUsername
+			config.Net.SASL.Password = o.KafkaConfig.SaslPassword
+			config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
+		default:
+			return nil, fmt.Errorf("unsupported SASL mechanism: %s", o.KafkaConfig.SaslMechanisms)
+		}
+	}

Notes:

  • rg search shows TLSInsecure referenced only in app/config/ingest.go and openmeter/watermill/driver/kafka/broker.go — so update both places.
  • The librdkafka config keys used mirror librdkafka options: enable.ssl.certificate.verification and ssl.endpoint.identification.algorithm=none.
  • The sarama change ensures TLS is enabled whenever protocol requests it, and only toggles InsecureSkipVerify when explicitly requested.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type KafkaConfiguration struct {
Broker string
SecurityProtocol string
TLSInsecure bool
SaslMechanisms string
SaslUsername string
type KafkaConfiguration struct {
Broker string
SecurityProtocol string
// TLSInsecure disables TLS certificate and hostname verification.
// WARNING: Do NOT use in production. Intended only for local dev/testing with self-signed certs.
TLSInsecure bool
SaslMechanisms string
SaslUsername string
🤖 Prompt for AI Agents
In app/config/ingest.go around lines 61-66: add a doc comment on
KafkaConfiguration.TLSInsecure explaining it disables server certificate
verification for TLS connections (unsafe for production), add a Viper default
(false) in ConfigureIngest, and add validation to ensure it's a boolean/allowed
value; then wire TLSInsecure into CreateKafkaConfig by setting the librdkafka
options enable.ssl.certificate.verification (true/false) and when TLSInsecure is
true set ssl.endpoint.identification.algorithm=none. In
openmeter/watermill/driver/kafka/broker.go: change the Sarama TLS logic so TLS
is enabled whenever the SecurityProtocol is SSL or SASL_SSL (not only when
TLSInsecure), and only set TLSConfig.InsecureSkipVerify = true when TLSInsecure
is true; remove the existing conditional that enables TLS only when TLSInsecure.

SaslPassword string
Expand Down
4 changes: 4 additions & 0 deletions openmeter/watermill/driver/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (o *BrokerOptions) createKafkaConfig(role string) (*sarama.Config, error) {
loggerFunc: logger.Debug,
}

if o.KafkaConfig.SecurityProtocol == "SSL" && o.KafkaConfig.TLSInsecure {
config.Net.TLS.Enable = true
}
Comment on lines +75 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Incorrect gating: TLS for SSL should be enabled regardless of TLSInsecure; also missing InsecureSkipVerify

Currently TLS is only enabled for SecurityProtocol == "SSL" when TLSInsecure is true, which breaks valid SSL setups. Additionally, “insecure” mode doesn’t set InsecureSkipVerify, so it won’t achieve the stated goal for self-signed/local certs.

Apply this diff to always enable TLS for SSL and conditionally set InsecureSkipVerify:

-	if o.KafkaConfig.SecurityProtocol == "SSL" && o.KafkaConfig.TLSInsecure {
-		config.Net.TLS.Enable = true
-	}
+	if o.KafkaConfig.SecurityProtocol == "SSL" {
+		config.Net.TLS.Enable = true
+		if config.Net.TLS.Config == nil {
+			config.Net.TLS.Config = &tls.Config{}
+		}
+	}

Add the following block after the SASL_SSL handling (or immediately after SSL/SASL_SSL wiring) to apply “insecure” consistently:

// Apply insecure TLS settings if requested
if o.KafkaConfig.TLSInsecure && config.Net.TLS.Enable {
    if config.Net.TLS.Config == nil {
        config.Net.TLS.Config = &tls.Config{}
    }
    config.Net.TLS.Config.InsecureSkipVerify = true
}
🤖 Prompt for AI Agents
openmeter/watermill/driver/kafka/broker.go lines 75-77: TLS is only being
enabled when SecurityProtocol == "SSL" AND TLSInsecure is true, and
InsecureSkipVerify is never set; change the logic so TLS is enabled whenever
SecurityProtocol == "SSL" (or SASL_SSL where appropriate) and, if TLSInsecure is
true, set config.Net.TLS.Config.InsecureSkipVerify = true, creating
config.Net.TLS.Config if nil; add the InsecureSkipVerify block after the
SASL_SSL/SSL wiring so insecure mode is applied consistently.


if o.KafkaConfig.SecurityProtocol == "SASL_SSL" {
config.Net.SASL.Enable = true
config.Net.SASL.Handshake = true
Expand Down