|
2 | 2 | Apache Kafka is an event streaming platform used to collect, store and process real time data streams at scale. |
3 | 3 | It has numerous use cases, including distributed logging, stream processing and Pub-Sub Messaging. |
4 | 4 |
|
| 5 | + |
| 6 | + |
5 | 7 | <img width="700" alt="image" src="https://github.com/akhanalcs/dotnet-kafka/assets/30603497/19978d3c-3f9c-4474-9a3b-995b5ea437ed"> |
6 | 8 |
|
7 | 9 | ## Helpful Links |
@@ -447,6 +449,45 @@ It will work as a simple REST endpoint that accepts data from a fitness tracker |
447 | 449 |
|
448 | 450 | In the long run, this may be dangerous because it could allow a malfunctioning device to push invalid data into our stream. We probably want to perform a minimal amount of validation, prior to pushing the data. We'll do that later. |
449 | 451 |
|
| 452 | +Register an instance of `IProducer<string, string>`. |
| 453 | +We use a singleton because the producer maintains connections that we want to reuse. |
| 454 | +```cs |
| 455 | +var producerConfig = builder.Configuration.GetSection("KafkaProducer").Get<ProducerConfig>(); |
| 456 | +builder.Services.AddSingleton(new ProducerBuilder<string, string>(producerConfig).Build()); |
| 457 | +``` |
| 458 | + |
| 459 | +### Test it |
| 460 | +#### Start the app |
| 461 | +<img width="200" alt="image" src="https://github.com/akhanalcs/dotnet-kafka/assets/30603497/f7250701-7d04-4477-bead-6c9c5f83db7e"> |
| 462 | + |
| 463 | +#### Send a message to the endpoint through Swagger |
| 464 | +<img width="550" alt="image" src="https://github.com/akhanalcs/dotnet-kafka/assets/30603497/a83c9991-e5b9-4256-863a-a9461490aec6"> |
| 465 | + |
| 466 | +#### Verify it in the cluster |
| 467 | +Home -> Environments -> kafka-with-dotnet -> cluster_0 -> Topics |
| 468 | + |
| 469 | +<img width="550" alt="image" src="https://github.com/akhanalcs/dotnet-kafka/assets/30603497/974b3b19-fa64-4402-8e40-8b2b3b48cfc1"> |
| 470 | + |
| 471 | +## Serialization & Deserialization |
| 472 | +The message producer is created by providing two types. |
| 473 | +```cs |
| 474 | +new ProducerBuilder<TypeofKey, TypeofValue>(config) |
| 475 | +``` |
| 476 | +The first type represents the Key of the message while the second is for the Value. |
| 477 | +These types can be simple types, such as strings or integers, but they can also be more complex types like a custom class or struct. |
| 478 | + |
| 479 | +However, Kafka itself doesn't directly understand these types, it just operates with blobs of data in byte form. This implies that we need to convert our data into an array of bytes before sending it to Kafka if it is a complex object. |
| 480 | + |
| 481 | +<img width="650" alt="image" src="https://github.com/akhanalcs/dotnet-kafka/assets/30603497/528a11e9-a910-4ffa-b6ae-afe2e4997195"> |
| 482 | + |
| 483 | +To register a serializer for the message **value**, we use the `SetValueSerializer` method on the ProducerBuilder. |
| 484 | +For eg: |
| 485 | +```cs |
| 486 | +new ProducerBuilder<string, Biometrics>(producerConfig) |
| 487 | + .SetValueSerializer(new JsonSerializer<Biometrics>(schemaRegistry)) |
| 488 | + .Build(); |
| 489 | +``` |
| 490 | + |
450 | 491 |
|
451 | 492 |
|
452 | 493 | - Truth: That comports to reality. |
|
0 commit comments