File tree Expand file tree Collapse file tree 2 files changed +13
-3
lines changed
main/scala/com.snowplowanalytics.snowplow.enrich.kafka
test/scala/com/snowplowanalytics/snowplow/enrich/kafka Expand file tree Collapse file tree 2 files changed +13
-3
lines changed Original file line number Diff line number Diff line change @@ -123,8 +123,10 @@ object Sink {
123123 .as[List [Json ]]
124124 .toOption
125125 .flatMap { contexts =>
126- contexts.collectFirst {
127- case ctx if ctx.hcursor.downField(" schema" ).as[String ].toOption.exists(_.contains(" http_header" )) =>
126+ contexts
127+ .iterator
128+ .filter(ctx => ctx.hcursor.downField(" schema" ).as[String ].toOption.exists(_.contains(" http_header" )))
129+ .map { ctx =>
128130 val dataCursor = ctx.hcursor.downField(" data" )
129131 val nameOpt = dataCursor.downField(" name" ).as[String ].toOption
130132 val valueOpt = dataCursor.downField(" value" ).as[String ].toOption
@@ -133,7 +135,8 @@ object Sink {
133135 case (Some (" Host" ), Some (hostValue)) => Some (hostValue)
134136 case _ => None
135137 }
136- }.flatten
138+ }
139+ .collectFirst { case Some (host) => host }
137140 }
138141 }
139142 }
Original file line number Diff line number Diff line change @@ -54,6 +54,13 @@ class SinkSpec extends Specification {
5454 "deviceClass": "Unknown"
5555 }
5656 },
57+ {
58+ "schema": "iglu:org.ietf/http_header/jsonschema/1-0-0",
59+ "data": {
60+ "name": "X-Forwarded-For",
61+ "value": "203.0.113.42"
62+ }
63+ },
5764 {
5865 "schema": "iglu:org.ietf/http_header/jsonschema/1-0-0",
5966 "data": {
You can’t perform that action at this time.
0 commit comments