Skip to content

Commit 35ad82e

Browse files
committed
Optimized topic name resolution
1 parent 0d1d197 commit 35ad82e

File tree

1 file changed

+50
-54
lines changed
  • modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka

1 file changed

+50
-54
lines changed

modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala

Lines changed: 50 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,13 @@ package com.snowplowanalytics.snowplow.enrich.kafka
1616
import cats.Parallel
1717
import cats.effect.{Blocker, Concurrent, ConcurrentEffect, ContextShift, Resource, Timer}
1818
import cats.implicits._
19-
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
2019
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output
2120
import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, AttributedData, ByteSink}
2221
import fs2.kafka._
2322
import io.circe.Json
2423
import io.circe.parser.parse
2524

2625
import java.util.UUID
27-
import scala.util.Try
2826

2927
object Sink {
3028

@@ -82,61 +80,59 @@ object Sink {
8280
}
8381

8482
private def resolveTopicName(data: Array[Byte], mapping: Map[String, String]): Option[String] = {
83+
if (mapping.isEmpty) return None
84+
8585
val rawEnrichedEvent = new String(data)
86-
val hostOpt = if (mapping.values.exists(_.endsWith("enriched-bad"))) Try {
87-
for {
88-
json <- parse(rawEnrichedEvent).toOption
89-
_ <- json.hcursor
90-
.downField("schema")
91-
.as[String]
92-
.toOption
93-
.filter(_.contains("com.snowplowanalytics.snowplow.badrows"))
94-
payloadCursor = json.hcursor.downField("data").downField("payload")
95-
headers <- payloadCursor
96-
.downField("raw")
97-
.downField("headers")
98-
.as[List[String]]
99-
.toOption
100-
.orElse(payloadCursor.downField("headers").as[List[String]].toOption)
101-
hostHeader <- headers.find(_.toLowerCase.startsWith("host:"))
102-
hostValue = hostHeader.drop("host:".length).trim
103-
if hostValue.nonEmpty
104-
} yield hostValue
86+
87+
if (!rawEnrichedEvent.startsWith("{")) {
88+
extractHostFromGoodEvent(rawEnrichedEvent).flatMap(mapping.get)
89+
} else {
90+
extractHostFromBadRow(rawEnrichedEvent).flatMap(mapping.get)
10591
}
106-
else
107-
Try {
108-
Event
109-
.parse(rawEnrichedEvent)
110-
.toOption
111-
.flatMap { event =>
112-
val rawContexts = event.derived_contexts.data
113-
val headerContexts = rawContexts.filter(_.schema.toSchemaUri == "iglu:org.ietf/http_header/jsonschema/1-0-0")
114-
val contextData = headerContexts.map(_.data).flatMap(_.asObject)
115-
val hostValues = contextData
116-
.flatMap { obj =>
117-
for {
118-
name <- obj("name").flatMap(_.asString)
119-
value <- obj("value").flatMap(_.asString)
120-
} yield (name.toLowerCase, value)
121-
}
122-
.filter(_._1 == "host")
123-
.map(_._2)
92+
}
12493

125-
hostValues.headOption
126-
}
127-
.orElse {
128-
for {
129-
json <- parse(rawEnrichedEvent).toOption
130-
cursor = json.hcursor
131-
contexts <- cursor.downField("contexts_org_ietf_http_header_1").as[List[Json]].toOption
132-
hostValue <- contexts.collectFirst {
133-
case obj if obj.hcursor.downField("name").as[String].toOption.exists(_.equalsIgnoreCase("host")) =>
134-
obj.hcursor.downField("value").as[String].toOption
135-
}.flatten
136-
} yield hostValue
137-
}
138-
}
94+
private def extractHostFromBadRow(message: String): Option[String] = {
95+
if (!message.contains("badrows")) return None
96+
97+
parse(message).toOption.flatMap { json =>
98+
json.hcursor
99+
.downField("payload")
100+
.downField("headers")
101+
.as[List[String]]
102+
.toOption
103+
.flatMap { headers =>
104+
headers
105+
.find(_.startsWith("Host: "))
106+
.map(_.stripPrefix("Host: "))
107+
}
108+
}
109+
}
110+
111+
private def extractHostFromGoodEvent(message: String): Option[String] = {
112+
if (!message.contains("http_header")) return None
113+
114+
val fields = message.split("\t", -1)
115+
if (fields.length < 123) return None
116+
117+
val derivedContexts = fields(122)
118+
if (derivedContexts.isEmpty) return None
139119

140-
hostOpt.toOption.flatten.flatMap(mapping.get)
120+
parse(derivedContexts).toOption.flatMap { json =>
121+
json.hcursor
122+
.downField("data")
123+
.as[List[Json]]
124+
.toOption
125+
.flatMap { contexts =>
126+
contexts
127+
.find(_.hcursor.downField("schema").as[String].toOption.exists(_.contains("http_header")))
128+
.flatMap { httpHeaderContext =>
129+
httpHeaderContext.hcursor
130+
.downField("data")
131+
.downField("Host")
132+
.as[String]
133+
.toOption
134+
}
135+
}
136+
}
141137
}
142138
}

0 commit comments

Comments
 (0)