Skip to content

Commit 727ec43

Browse files
committed
Added ElasticSearch left fold projections
Cleaned EventStoreDB README links Minor updates in other places
1 parent e624f4b commit 727ec43

File tree

21 files changed

+172
-71
lines changed

21 files changed

+172
-71
lines changed

Core.ElasticSearch/Config.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
using System;
2+
using Microsoft.Extensions.Configuration;
3+
using Microsoft.Extensions.DependencyInjection;
4+
using Nest;
5+
6+
namespace Core.ElasticSearch
7+
{
8+
public class ElasticSearchConfig
9+
{
10+
public string Url { get; set; } = default!;
11+
public string DefaultIndex { get; set; } = default!;
12+
}
13+
14+
public static class ElasticSearchConfigExtensions
15+
{
16+
private const string DefaultConfigKey = "ElasticSearch";
17+
public static void AddElasticsearch(
18+
this IServiceCollection services, IConfiguration configuration, Action<ConnectionSettings>? config = null)
19+
{
20+
var elasticSearchConfig = configuration.GetSection(DefaultConfigKey).Get<ElasticSearchConfig>();
21+
22+
var settings = new ConnectionSettings(new Uri(elasticSearchConfig.Url))
23+
.DefaultIndex(elasticSearchConfig.DefaultIndex);
24+
25+
config?.Invoke(settings);
26+
27+
var client = new ElasticClient(settings);
28+
29+
services.AddSingleton<IElasticClient>(client);
30+
}
31+
}
32+
}

Core.ElasticSearch/Core.ElasticSearch.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
<ItemGroup>
1111
<PackageReference Include="NEST" Version="7.13.0" />
1212
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="5.0.0" />
13+
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="5.0.0" />
1314
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.1" />
1415
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
1516
</ItemGroup>

Core.ElasticSearch/ElasticSearchConfig.cs

Lines changed: 0 additions & 26 deletions
This file was deleted.
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Linq;
4+
5+
namespace Core.ElasticSearch.Indices
6+
{
7+
public class IndexNameMapper
8+
{
9+
private static readonly IndexNameMapper Instance = new();
10+
11+
private readonly ConcurrentDictionary<Type, string> typeNameMap = new();
12+
13+
public static void AddCustomMap<TStream>(string mappedStreamName) =>
14+
AddCustomMap(typeof(TStream), mappedStreamName);
15+
16+
public static void AddCustomMap(Type streamType, string mappedStreamName)
17+
{
18+
Instance.typeNameMap.AddOrUpdate(streamType, mappedStreamName, (_, _) => mappedStreamName);
19+
}
20+
21+
public static string ToIndexPrefix<TStream>() => ToIndexPrefix(typeof(TStream));
22+
23+
public static string ToIndexPrefix(Type streamType) => Instance.typeNameMap.GetOrAdd(streamType, (_) =>
24+
{
25+
var modulePrefix = streamType.Namespace!.Split(".").First();
26+
return $"{modulePrefix}-{streamType.Name}".ToLower();
27+
});
28+
29+
public static string ToIndexName<TStream>(object? tenantId = null) =>
30+
ToIndexName(typeof(TStream));
31+
32+
public static string ToIndexName(Type streamType, object? tenantId = null)
33+
{
34+
var tenantPrefix = tenantId != null ? $"{tenantId}-" : "";
35+
36+
return $"{tenantPrefix}{ToIndexPrefix(streamType)}".ToLower();
37+
}
38+
39+
}
40+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Core.ElasticSearch.Indices;
5+
using Core.Events;
6+
using Core.Projections;
7+
using MediatR;
8+
using Microsoft.Extensions.DependencyInjection;
9+
using Nest;
10+
11+
namespace Core.ElasticSearch.Projections
12+
{
13+
public class ElasticSearchProjection<TEvent, TView> : IEventHandler<TEvent>
14+
where TView : class, IProjection
15+
where TEvent : IEvent
16+
{
17+
private readonly IElasticClient elasticClient;
18+
private readonly Func<TEvent, string> getId;
19+
20+
public ElasticSearchProjection(
21+
IElasticClient elasticClient,
22+
Func<TEvent, string> getId
23+
)
24+
{
25+
this.elasticClient = elasticClient ?? throw new ArgumentNullException(nameof(elasticClient));
26+
this.getId = getId ?? throw new ArgumentNullException(nameof(getId));
27+
}
28+
29+
public async Task Handle(TEvent @event, CancellationToken ct)
30+
{
31+
string id = getId(@event);
32+
33+
var entity = (await elasticClient.GetAsync<TView>(id, ct: ct))?.Source
34+
?? (TView) Activator.CreateInstance(typeof(TView), true)!;
35+
36+
entity.When(@event);
37+
38+
var result = await elasticClient.UpdateAsync<TView>(id,
39+
u => u.Doc(entity).Upsert(entity).Index(IndexNameMapper.ToIndexName<TView>()),
40+
ct
41+
);
42+
}
43+
}
44+
45+
public static class ElasticSearchProjectionConfig
46+
{
47+
public static IServiceCollection Project<TEvent, TView>(this IServiceCollection services,
48+
Func<TEvent, string> getId)
49+
where TView : class, IProjection
50+
where TEvent : IEvent
51+
{
52+
services.AddTransient<INotificationHandler<TEvent>>(sp =>
53+
{
54+
var session = sp.GetRequiredService<IElasticClient>();
55+
56+
return new ElasticSearchProjection<TEvent, TView>(session, getId);
57+
});
58+
59+
return services;
60+
}
61+
}
62+
}

Core.ElasticSearch/Repository/ElasticSearchRepository.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
using System;
22
using System.Threading;
33
using System.Threading.Tasks;
4-
using Core.Aggregates;
4+
using Core.ElasticSearch.Indices;
55
using Core.Events;
6-
using Core.Repositories;
6+
using Nest;
7+
using IAggregate = Core.Aggregates.IAggregate;
78

89
namespace Core.ElasticSearch.Repository
910
{
10-
public class ElasticSearchRepository<T>: IRepository<T> where T : class, IAggregate, new()
11+
public class ElasticSearchRepository<T>: Repositories.IRepository<T> where T : class, IAggregate, new()
1112
{
12-
private readonly Nest.IElasticClient elasticClient;
13+
private readonly IElasticClient elasticClient;
1314
private readonly IEventBus eventBus;
1415

1516
public ElasticSearchRepository(
16-
Nest.IElasticClient elasticClient,
17+
IElasticClient elasticClient,
1718
IEventBus eventBus
1819
)
1920
{
@@ -29,12 +30,12 @@ IEventBus eventBus
2930

3031
public Task Add(T aggregate, CancellationToken cancellationToken)
3132
{
32-
return elasticClient.IndexAsync(aggregate, i => i.Id(aggregate.Id), cancellationToken);
33+
return elasticClient.IndexAsync(aggregate, i => i.Id(aggregate.Id).Index(IndexNameMapper.ToIndexName<T>()), cancellationToken);
3334
}
3435

3536
public Task Update(T aggregate, CancellationToken cancellationToken)
3637
{
37-
return elasticClient.UpdateAsync<T>(aggregate.Id, i => i.Doc(aggregate), cancellationToken);
38+
return elasticClient.UpdateAsync<T>(aggregate.Id, i => i.Doc(aggregate).Index(IndexNameMapper.ToIndexName<T>()), cancellationToken);
3839
}
3940

4041
public Task Delete(T aggregate, CancellationToken cancellationToken)

Core.EventStoreDB/Config.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System;
22
using Core.EventStoreDB.Subscriptions;
3-
using Core.Subscriptions;
43
using EventStore.Client;
54
using Microsoft.Extensions.DependencyInjection;
65
using Microsoft.Extensions.Configuration;

Core.EventStoreDB/Serialization/EventStoreDBSerializer.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System.Text;
22
using Core.Events;
3-
using Core.Reflection;
43
using EventStore.Client;
54
using Newtonsoft.Json;
65

Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionCheckpointRepository.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using System.Threading.Tasks;
55
using Core.Events;
66
using Core.EventStoreDB.Serialization;
7-
using Core.Subscriptions;
87
using EventStore.Client;
98

109
namespace Core.EventStoreDB.Subscriptions

Core/Subscriptions/ISubscriptionCheckpointRepository.cs renamed to Core.EventStoreDB/Subscriptions/ISubscriptionCheckpointRepository.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
using System.Threading;
22
using System.Threading.Tasks;
33

4-
namespace Core.Subscriptions
4+
namespace Core.EventStoreDB.Subscriptions
55
{
66
public interface ISubscriptionCheckpointRepository
77
{

0 commit comments

Comments
 (0)