Skip to content

Commit 2e404ba

Browse files
author
Ashish Khanal
committed
Add packages and setup for using Schema registry
1 parent f87849a commit 2e404ba

File tree

2 files changed

+27
-3
lines changed

2 files changed

+27
-3
lines changed

Producer/Producer.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
<ItemGroup>
1212
<PackageReference Include="Confluent.Kafka" Version="2.3.0" />
13+
<PackageReference Include="Confluent.SchemaRegistry" Version="2.3.0" />
14+
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.3.0" />
1315
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.0"/>
1416
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0"/>
1517
</ItemGroup>

Producer/Program.cs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
using Confluent.Kafka;
2+
using Confluent.SchemaRegistry;
3+
using Confluent.SchemaRegistry.Serdes; //Serdes means Serializer and Deserializer
4+
using Microsoft.Extensions.Options;
25

36
var builder = WebApplication.CreateBuilder(args);
47

@@ -7,9 +10,28 @@
710
builder.Services.AddEndpointsApiExplorer();
811
builder.Services.AddSwaggerGen();
912

10-
const string biometricsImportedTopicName = "RawBiometricsImported";
11-
var producerConfig = builder.Configuration.GetSection("KafkaProducer").Get<ProducerConfig>();
12-
builder.Services.AddSingleton(new ProducerBuilder<string, string>(producerConfig).Build());
13+
// Populate config objects
14+
builder.Services.Configure<ProducerConfig>(builder.Configuration.GetSection("KafkaProducer")); // OR var producerConfig = builder.Configuration.GetSection("KafkaProducer").Get<ProducerConfig>();
15+
builder.Services.Configure<SchemaRegistryConfig>(builder.Configuration.GetSection("KafkaSchemaRegistry"));
16+
17+
const string biometricsImportedTopicName = "BiometricsImported";
18+
19+
// Producer
20+
builder.Services.AddSingleton<IProducer<string, Biometrics>>(sp =>
21+
{
22+
var config = sp.GetRequiredService<IOptions<ProducerConfig>>();
23+
var schemaRegistryClient = sp.GetRequiredService<ISchemaRegistryClient>();
24+
return new ProducerBuilder<string, Biometrics>(config.Value)
25+
.SetValueSerializer(new JsonSerializer<Biometrics>(schemaRegistryClient))
26+
.Build();
27+
});
28+
29+
// Schema Registry
30+
builder.Services.AddSingleton<ISchemaRegistryClient>(sp =>
31+
{
32+
var config = sp.GetRequiredService<IOptions<SchemaRegistryConfig>>();
33+
return new CachedSchemaRegistryClient(config.Value);
34+
});
1335

1436
var app = builder.Build();
1537

0 commit comments

Comments
 (0)