Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@ protected override Maybe<string> GetBaseMolecule(MoleculeSyntax molecule)
{
var aggregate = (AggregateSyntax)molecule.Parent;

var id = aggregate.GetIdTypeName(SymbolTable);

return base.GetBaseMolecule(molecule)
.Or(String.Format("AggregateSnapshot<{0}>", id));
.Or("AggregateSnapshot");
}

protected override bool ShouldBeEquatable(MoleculeSyntax molecule)
Expand Down
51 changes: 25 additions & 26 deletions src/iSynaptic.Core.Persistence/EventStoreAggregateRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,26 @@
// THE SOFTWARE.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using EventStore.ClientAPI;
using EventStore.ClientAPI.Exceptions;
using Newtonsoft.Json;
using iSynaptic.Commons;
using iSynaptic.Commons.Collections.Generic;
using iSynaptic.Commons.Linq;
using iSynaptic.Modeling;
using iSynaptic.Modeling.Domain;
using iSynaptic.Serialization;
using Newtonsoft.Json;

namespace iSynaptic.Core.Persistence
{
public class EventStoreAggregateRepository<TAggregate, TIdentifier> : AggregateRepository<TAggregate, TIdentifier>
where TAggregate : class, IAggregate<TIdentifier>
where TIdentifier : IEquatable<TIdentifier>
public class EventStoreAggregateRepository : AggregateRepository
{
private static readonly Guid _offsetEventId = new Guid(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1);
private static readonly EventData _offsetEvent = new EventData(_offsetEventId, "streamOffset", true, Encoding.Default.GetBytes("{}"), null);

private readonly ILogicalTypeRegistry _logicalTypeRegistry;

private readonly JsonSerializer _dataSerializer;
private readonly JsonSerializer _metadataSerializer;

private readonly Func<IEventStoreConnection> _connectionFactory;

public EventStoreAggregateRepository(ILogicalTypeRegistry logicalTypeRegistry, Func<IEventStoreConnection> connectionFactory)
Expand All @@ -61,11 +52,9 @@ public EventStoreAggregateRepository(ILogicalTypeRegistry logicalTypeRegistry, F

var metadataSerializerSettings = JsonSerializerBuilder.BuildSettings(logicalTypeRegistry);
metadataSerializerSettings.TypeNameHandling = TypeNameHandling.None;

_metadataSerializer = JsonSerializer.Create(metadataSerializerSettings);
}

protected override async Task<AggregateEventsLoadFrame<TIdentifier>> GetEvents(TIdentifier id, int minVersion, int maxVersion)
protected override async Task<AggregateEventsLoadFrame> GetEvents(object id, int minVersion, int maxVersion)
{
var maxCount = (maxVersion - minVersion) + 1;

Expand Down Expand Up @@ -102,16 +91,16 @@ protected override async Task<AggregateEventsLoadFrame<TIdentifier>> GetEvents(T
var events = resolvedEvents
.Select(x => x.Event.Data)
.Select(Encoding.Default.GetString)
.Select(x => _dataSerializer.Deserialize<IAggregateEvent<TIdentifier>>(x));
.Select(x => _dataSerializer.Deserialize<IAggregateEvent>(x));

return new AggregateEventsLoadFrame<TIdentifier>(aggregateType, id, events);
return new AggregateEventsLoadFrame(aggregateType, id, events);
}

return null;
}
}

protected async override Task SaveEvents(AggregateEventsSaveFrame<TIdentifier> frame)
protected async override Task SaveEvents(AggregateEventsSaveFrame frame)
{
var aggregateType = frame.AggregateType;
var id = frame.Id;
Expand Down Expand Up @@ -156,7 +145,7 @@ await cn.AppendToStreamAsync(
}
}

protected override async Task<AggregateSnapshotLoadFrame<TIdentifier>> GetSnapshot(TIdentifier id, int maxVersion)
protected override async Task<AggregateSnapshotLoadFrame> GetSnapshot(object id, int maxVersion)
{
using (var cn = _connectionFactory())
{
Expand All @@ -174,7 +163,7 @@ protected override async Task<AggregateSnapshotLoadFrame<TIdentifier>> GetSnapsh
{
throw new InvalidOperationException("Aggregate type is not specified in event stream metadata.");
}

var resolvedEvent = (await cn.ReadStreamEventsForwardAsync(snapshotStreamId, 0, int.MaxValue, false).ConfigureAwait(false))
.ToMaybe()
.Where(x => x.Status == SliceReadStatus.Success)
Expand All @@ -184,21 +173,21 @@ protected override async Task<AggregateSnapshotLoadFrame<TIdentifier>> GetSnapsh
var snapshot = resolvedEvent
.Select(x => x.Event.Data)
.Select(Encoding.Default.GetString)
.Select(x => _dataSerializer.Deserialize<IAggregateSnapshot<TIdentifier>>(x))
.Select(x => _dataSerializer.Deserialize<IAggregateSnapshot>(x))
.Where(x => x.Version <= maxVersion);

if (snapshot.HasValue)
{
Type aggregateType = _logicalTypeRegistry.LookupActualType(LogicalType.Parse(aggregateTypeString));

return new AggregateSnapshotLoadFrame<TIdentifier>(aggregateType, id, snapshot.Value);
return new AggregateSnapshotLoadFrame(aggregateType, id, snapshot.Value);
}

return null;
}
}

protected async override Task SaveSnapshot(AggregateSnapshotSaveFrame<TIdentifier> frame)
protected async override Task SaveSnapshot(AggregateSnapshotSaveFrame frame)
{
using (var cn = _connectionFactory())
{
Expand Down Expand Up @@ -235,7 +224,7 @@ await cn.AppendToStreamAsync(
snapshot.SnapshotId,
snapshot,
aggregateType)
).ConfigureAwait(false);
).ConfigureAwait(false);
}
}
}
Expand Down Expand Up @@ -264,14 +253,24 @@ protected EventData BuildEventData(Guid id, object data, Type aggregateType)
null);
}

protected virtual String BuildStreamIdentifier(TIdentifier id)
protected virtual String BuildStreamIdentifier(object id)
{
return _dataSerializer.Serialize(id);
}

protected virtual String BuildSnapshotStreamIdentifier(TIdentifier id)
protected virtual String BuildSnapshotStreamIdentifier(object id)
{
return String.Format("{0}-snapshot", BuildStreamIdentifier(id));
}
}
}

public class EventStoreAggregateRepository<TAggregate, TIdentifier> : AggregateRepository<TAggregate, TIdentifier>
where TAggregate : class, IAggregate<TIdentifier>
where TIdentifier : IEquatable<TIdentifier>
{
public EventStoreAggregateRepository(ILogicalTypeRegistry logicalTypeRegistry, Func<IEventStoreConnection> connectionFactory)
:base(new EventStoreAggregateRepository(logicalTypeRegistry, connectionFactory))
{
}
}
}
28 changes: 17 additions & 11 deletions src/iSynaptic.Core.Persistence/InMemoryAggregateRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,42 +27,48 @@
using iSynaptic.Commons;
using iSynaptic.Commons.Collections.Generic;
using iSynaptic.Modeling.Domain;
using iSynaptic.Serialization;

namespace iSynaptic.Core.Persistence
{
public class InMemoryAggregateRepository<TAggregate, TIdentifier> : MementoBasedAggregateRepository<TAggregate, TIdentifier>
where TAggregate : class, IAggregate<TIdentifier>
where TIdentifier : IEquatable<TIdentifier>
public class InMemoryAggregateRepository : MementoBasedAggregateRepository
{
private readonly Dictionary<TIdentifier, AggregateMemento<TIdentifier>> _state =
new Dictionary<TIdentifier, AggregateMemento<TIdentifier>>();
private readonly Dictionary<object, AggregateMemento> _state = new Dictionary<object, AggregateMemento>();

protected override Task<Maybe<AggregateMemento<TIdentifier>>> TryLoadMemento(TIdentifier id)
protected override Task<Maybe<AggregateMemento>> TryLoadMemento(object id)
{
lock (_state)
{
return Task.FromResult(_state.TryGetValue(id));
}
}

protected override async Task StoreMemento(Func<Task<KeyValuePair<TIdentifier, AggregateMemento<TIdentifier>>>> mementoFactory)
protected override async Task StoreMemento(Func<Task<KeyValuePair<object, AggregateMemento>>> mementoFactory)
{
bool lockTaken = false;

try
{
Monitor.Enter(_state, ref lockTaken);

var memento = await mementoFactory();
_state[memento.Key] = memento.Value;

}
finally
{
if(lockTaken)
if (lockTaken)
Monitor.Exit(_state);
}
}
}

public class InMemoryAggregateRepository<TAggregate, TIdentifier> : AggregateRepository<TAggregate, TIdentifier>
where TAggregate : class, IAggregate<TIdentifier>
where TIdentifier : IEquatable<TIdentifier>
{
public InMemoryAggregateRepository()
: base(new InMemoryAggregateRepository())
{
}
}
}
30 changes: 18 additions & 12 deletions src/iSynaptic.Core.Persistence/InMemoryJsonAggregateRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,14 @@
using Newtonsoft.Json;
using iSynaptic.Commons;
using iSynaptic.Commons.Collections.Generic;
using iSynaptic.Modeling;
using iSynaptic.Modeling.Domain;
using iSynaptic.Serialization;

namespace iSynaptic.Core.Persistence
{
public class InMemoryJsonAggregateRepository<TAggregate, TIdentifier> : MementoBasedAggregateRepository<TAggregate, TIdentifier>
where TAggregate : class, IAggregate<TIdentifier>
where TIdentifier : IEquatable<TIdentifier>
public class InMemoryJsonAggregateRepository : MementoBasedAggregateRepository
{
private readonly Dictionary<String, String> _state =
new Dictionary<String, String>();
private readonly Dictionary<String, String> _state = new Dictionary<String, String>();

private readonly JsonSerializer _serializer;

Expand All @@ -47,27 +43,37 @@ public InMemoryJsonAggregateRepository(JsonSerializer serializer)
_serializer = Guard.NotNull(serializer, "serializer");
}

protected override Task<Maybe<AggregateMemento<TIdentifier>>> TryLoadMemento(TIdentifier id)
protected override Task<Maybe<AggregateMemento>> TryLoadMemento(object id)
{
return Task.FromResult(_state.TryGetValue(_serializer.Serialize(id))
.Select(json => _serializer.Deserialize<AggregateMemento<TIdentifier>>(json)));
.Select(json => _serializer.Deserialize<AggregateMemento>(json)));
}

protected override async Task StoreMemento(Func<Task<KeyValuePair<TIdentifier, AggregateMemento<TIdentifier>>>> mementoFactory)
protected override async Task StoreMemento(Func<Task<KeyValuePair<object, AggregateMemento>>> mementoFactory)
{
bool lockTaken = false;
try
{
Monitor.Enter(_state, ref lockTaken);

var memento = await mementoFactory();
_state[_serializer.Serialize(memento.Key)] = _serializer.Serialize(memento.Value);
}
finally
{
if(lockTaken)
if (lockTaken)
Monitor.Exit(_state);
}
}
}
}

public class InMemoryJsonAggregateRepository<TAggregate, TIdentifier> : AggregateRepository<TAggregate, TIdentifier>
where TAggregate : class, IAggregate<TIdentifier>
where TIdentifier : IEquatable<TIdentifier>
{
public InMemoryJsonAggregateRepository(JsonSerializer serializer)
: base(new InMemoryJsonAggregateRepository(serializer))
{
}
}
}
40 changes: 19 additions & 21 deletions src/iSynaptic.Core.Persistence/MementoBasedAggregateRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,32 @@

namespace iSynaptic.Core.Persistence
{
public abstract class MementoBasedAggregateRepository<TAggregate, TIdentifier> : AggregateRepository<TAggregate, TIdentifier>
where TAggregate : class, IAggregate<TIdentifier>
where TIdentifier : IEquatable<TIdentifier>
public abstract class MementoBasedAggregateRepository : AggregateRepository
{
protected abstract Task<Maybe<AggregateMemento<TIdentifier>>> TryLoadMemento(TIdentifier id);
protected abstract Task StoreMemento(Func<Task<KeyValuePair<TIdentifier, AggregateMemento<TIdentifier>>>> mementoFactory);
protected abstract Task<Maybe<AggregateMemento>> TryLoadMemento(object id);
protected abstract Task StoreMemento(Func<Task<KeyValuePair<object, AggregateMemento>>> mementoFactory);

protected override async Task<AggregateSnapshotLoadFrame<TIdentifier>> GetSnapshot(TIdentifier id, int maxVersion)
protected override async Task<AggregateSnapshotLoadFrame> GetSnapshot(object id, int maxVersion)
{
return (await TryLoadMemento(id))
.Where(x => x.Snapshot.Select(y => y.Version <= maxVersion).ValueOrDefault())
.Select(x => new AggregateSnapshotLoadFrame<TIdentifier>(x.AggregateType, id, x.Snapshot.Value))
.Select(x => new AggregateSnapshotLoadFrame(x.AggregateType, id, x.Snapshot.Value))
.ValueOrDefault();
}

protected override async Task<AggregateEventsLoadFrame<TIdentifier>> GetEvents(TIdentifier id, int minVersion, int maxVersion)
protected override async Task<AggregateEventsLoadFrame> GetEvents(object id, int minVersion, int maxVersion)
{
return (await TryLoadMemento(id))
.Select(x => new AggregateEventsLoadFrame<TIdentifier>(
x.AggregateType,
id,
x.Events
.SkipWhile(y => y.Version < minVersion)
.TakeWhile(y => y.Version <= maxVersion)))
.Select(x => new AggregateEventsLoadFrame(
x.AggregateType,
id,
x.Events
.SkipWhile(y => y.Version < minVersion)
.TakeWhile(y => y.Version <= maxVersion)))
.ValueOrDefault();
}

protected override Task SaveSnapshot(AggregateSnapshotSaveFrame<TIdentifier> frame)
protected override Task SaveSnapshot(AggregateSnapshotSaveFrame frame)
{
return StoreMemento(async () =>
{
Expand All @@ -67,32 +65,32 @@ protected override Task SaveSnapshot(AggregateSnapshotSaveFrame<TIdentifier> fra

var state = (await TryLoadMemento(snapshot.Id)).ValueOrDefault();

return KeyValuePair.Create(snapshot.Id, new AggregateMemento<TIdentifier>(aggregateType, snapshot.ToMaybe(), state != null ? state.Events : null));
return KeyValuePair.Create(snapshot.Id, new AggregateMemento(aggregateType, snapshot.ToMaybe(), state != null ? state.Events : null));
});
}

protected override Task SaveEvents(AggregateEventsSaveFrame<TIdentifier> frame)
protected override Task SaveEvents(AggregateEventsSaveFrame frame)
{
var aggregateType = frame.AggregateType;
var id = frame.Id;
var events = frame.Events.ToArray();

return StoreMemento(async () =>
return StoreMemento(async () =>
KeyValuePair.Create(id, (await TryLoadMemento(id))
.Select(x =>
{
var lastEvent = x.Events.TryLast();
var actualVersion = lastEvent.Select(y => y.Version).ValueOrDefault();

if(actualVersion != frame.ExpectedVersion)
if (actualVersion != frame.ExpectedVersion)
throw new AggregateConcurrencyException();

return new AggregateMemento<TIdentifier>(
return new AggregateMemento(
aggregateType,
x.Snapshot,
x.Events.Concat(events.SkipWhile(y => y.Version <= lastEvent.Select(z => z.Version).ValueOrDefault())));
})
.ValueOrDefault(() => new AggregateMemento<TIdentifier>(aggregateType, Maybe<IAggregateSnapshot<TIdentifier>>.NoValue, events))));
.ValueOrDefault(() => new AggregateMemento(aggregateType, Maybe<IAggregateSnapshot>.NoValue, events))));
}
}
}
Loading