diff --git a/src/iSynaptic.Core.CodeGeneration/Modeling/Domain/AggregateSnapshotCodeAuthoringVisitor.cs b/src/iSynaptic.Core.CodeGeneration/Modeling/Domain/AggregateSnapshotCodeAuthoringVisitor.cs index cf7d696..d0b27aa 100644 --- a/src/iSynaptic.Core.CodeGeneration/Modeling/Domain/AggregateSnapshotCodeAuthoringVisitor.cs +++ b/src/iSynaptic.Core.CodeGeneration/Modeling/Domain/AggregateSnapshotCodeAuthoringVisitor.cs @@ -60,10 +60,8 @@ protected override Maybe GetBaseMolecule(MoleculeSyntax molecule) { var aggregate = (AggregateSyntax)molecule.Parent; - var id = aggregate.GetIdTypeName(SymbolTable); - return base.GetBaseMolecule(molecule) - .Or(String.Format("AggregateSnapshot<{0}>", id)); + .Or("AggregateSnapshot"); } protected override bool ShouldBeEquatable(MoleculeSyntax molecule) diff --git a/src/iSynaptic.Core.Persistence/EventStoreAggregateRepository.cs b/src/iSynaptic.Core.Persistence/EventStoreAggregateRepository.cs index 918b370..6660f50 100644 --- a/src/iSynaptic.Core.Persistence/EventStoreAggregateRepository.cs +++ b/src/iSynaptic.Core.Persistence/EventStoreAggregateRepository.cs @@ -21,35 +21,26 @@ // THE SOFTWARE. using System; -using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; - using EventStore.ClientAPI; using EventStore.ClientAPI.Exceptions; -using Newtonsoft.Json; using iSynaptic.Commons; -using iSynaptic.Commons.Collections.Generic; using iSynaptic.Commons.Linq; -using iSynaptic.Modeling; using iSynaptic.Modeling.Domain; using iSynaptic.Serialization; +using Newtonsoft.Json; namespace iSynaptic.Core.Persistence { - public class EventStoreAggregateRepository : AggregateRepository - where TAggregate : class, IAggregate - where TIdentifier : IEquatable + public class EventStoreAggregateRepository : AggregateRepository { private static readonly Guid _offsetEventId = new Guid(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1); private static readonly EventData _offsetEvent = new EventData(_offsetEventId, "streamOffset", true, Encoding.Default.GetBytes("{}"), null); private readonly ILogicalTypeRegistry _logicalTypeRegistry; - private readonly JsonSerializer _dataSerializer; - private readonly JsonSerializer _metadataSerializer; - private readonly Func _connectionFactory; public EventStoreAggregateRepository(ILogicalTypeRegistry logicalTypeRegistry, Func connectionFactory) @@ -61,11 +52,9 @@ public EventStoreAggregateRepository(ILogicalTypeRegistry logicalTypeRegistry, F var metadataSerializerSettings = JsonSerializerBuilder.BuildSettings(logicalTypeRegistry); metadataSerializerSettings.TypeNameHandling = TypeNameHandling.None; - - _metadataSerializer = JsonSerializer.Create(metadataSerializerSettings); } - protected override async Task> GetEvents(TIdentifier id, int minVersion, int maxVersion) + protected override async Task GetEvents(object id, int minVersion, int maxVersion) { var maxCount = (maxVersion - minVersion) + 1; @@ -102,16 +91,16 @@ protected override async Task> GetEvents(T var events = resolvedEvents .Select(x => x.Event.Data) .Select(Encoding.Default.GetString) - .Select(x => _dataSerializer.Deserialize>(x)); + .Select(x => _dataSerializer.Deserialize(x)); - return new AggregateEventsLoadFrame(aggregateType, id, events); + return new AggregateEventsLoadFrame(aggregateType, id, events); } return null; } } - protected async override Task SaveEvents(AggregateEventsSaveFrame frame) + protected async override Task SaveEvents(AggregateEventsSaveFrame frame) { var aggregateType = frame.AggregateType; var id = frame.Id; @@ -156,7 +145,7 @@ await cn.AppendToStreamAsync( } } - protected override async Task> GetSnapshot(TIdentifier id, int maxVersion) + protected override async Task GetSnapshot(object id, int maxVersion) { using (var cn = _connectionFactory()) { @@ -174,7 +163,7 @@ protected override async Task> GetSnapsh { throw new InvalidOperationException("Aggregate type is not specified in event stream metadata."); } - + var resolvedEvent = (await cn.ReadStreamEventsForwardAsync(snapshotStreamId, 0, int.MaxValue, false).ConfigureAwait(false)) .ToMaybe() .Where(x => x.Status == SliceReadStatus.Success) @@ -184,21 +173,21 @@ protected override async Task> GetSnapsh var snapshot = resolvedEvent .Select(x => x.Event.Data) .Select(Encoding.Default.GetString) - .Select(x => _dataSerializer.Deserialize>(x)) + .Select(x => _dataSerializer.Deserialize(x)) .Where(x => x.Version <= maxVersion); if (snapshot.HasValue) { Type aggregateType = _logicalTypeRegistry.LookupActualType(LogicalType.Parse(aggregateTypeString)); - return new AggregateSnapshotLoadFrame(aggregateType, id, snapshot.Value); + return new AggregateSnapshotLoadFrame(aggregateType, id, snapshot.Value); } return null; } } - protected async override Task SaveSnapshot(AggregateSnapshotSaveFrame frame) + protected async override Task SaveSnapshot(AggregateSnapshotSaveFrame frame) { using (var cn = _connectionFactory()) { @@ -235,7 +224,7 @@ await cn.AppendToStreamAsync( snapshot.SnapshotId, snapshot, aggregateType) - ).ConfigureAwait(false); + ).ConfigureAwait(false); } } } @@ -264,14 +253,24 @@ protected EventData BuildEventData(Guid id, object data, Type aggregateType) null); } - protected virtual String BuildStreamIdentifier(TIdentifier id) + protected virtual String BuildStreamIdentifier(object id) { return _dataSerializer.Serialize(id); } - protected virtual String BuildSnapshotStreamIdentifier(TIdentifier id) + protected virtual String BuildSnapshotStreamIdentifier(object id) { return String.Format("{0}-snapshot", BuildStreamIdentifier(id)); } } -} + + public class EventStoreAggregateRepository : AggregateRepository + where TAggregate : class, IAggregate + where TIdentifier : IEquatable + { + public EventStoreAggregateRepository(ILogicalTypeRegistry logicalTypeRegistry, Func connectionFactory) + :base(new EventStoreAggregateRepository(logicalTypeRegistry, connectionFactory)) + { + } + } +} \ No newline at end of file diff --git a/src/iSynaptic.Core.Persistence/InMemoryAggregateRepository.cs b/src/iSynaptic.Core.Persistence/InMemoryAggregateRepository.cs index 4d1b07b..431dd9c 100644 --- a/src/iSynaptic.Core.Persistence/InMemoryAggregateRepository.cs +++ b/src/iSynaptic.Core.Persistence/InMemoryAggregateRepository.cs @@ -27,18 +27,14 @@ using iSynaptic.Commons; using iSynaptic.Commons.Collections.Generic; using iSynaptic.Modeling.Domain; -using iSynaptic.Serialization; namespace iSynaptic.Core.Persistence { - public class InMemoryAggregateRepository : MementoBasedAggregateRepository - where TAggregate : class, IAggregate - where TIdentifier : IEquatable + public class InMemoryAggregateRepository : MementoBasedAggregateRepository { - private readonly Dictionary> _state = - new Dictionary>(); + private readonly Dictionary _state = new Dictionary(); - protected override Task>> TryLoadMemento(TIdentifier id) + protected override Task> TryLoadMemento(object id) { lock (_state) { @@ -46,23 +42,33 @@ protected override Task>> TryLoadMemento(TId } } - protected override async Task StoreMemento(Func>>> mementoFactory) + protected override async Task StoreMemento(Func>> mementoFactory) { bool lockTaken = false; - + try { Monitor.Enter(_state, ref lockTaken); var memento = await mementoFactory(); _state[memento.Key] = memento.Value; - + } finally { - if(lockTaken) + if (lockTaken) Monitor.Exit(_state); } } } + + public class InMemoryAggregateRepository : AggregateRepository + where TAggregate : class, IAggregate + where TIdentifier : IEquatable + { + public InMemoryAggregateRepository() + : base(new InMemoryAggregateRepository()) + { + } + } } \ No newline at end of file diff --git a/src/iSynaptic.Core.Persistence/InMemoryJsonAggregateRepository.cs b/src/iSynaptic.Core.Persistence/InMemoryJsonAggregateRepository.cs index 3dacd07..6c427ed 100644 --- a/src/iSynaptic.Core.Persistence/InMemoryJsonAggregateRepository.cs +++ b/src/iSynaptic.Core.Persistence/InMemoryJsonAggregateRepository.cs @@ -27,18 +27,14 @@ using Newtonsoft.Json; using iSynaptic.Commons; using iSynaptic.Commons.Collections.Generic; -using iSynaptic.Modeling; using iSynaptic.Modeling.Domain; using iSynaptic.Serialization; namespace iSynaptic.Core.Persistence { - public class InMemoryJsonAggregateRepository : MementoBasedAggregateRepository - where TAggregate : class, IAggregate - where TIdentifier : IEquatable + public class InMemoryJsonAggregateRepository : MementoBasedAggregateRepository { - private readonly Dictionary _state = - new Dictionary(); + private readonly Dictionary _state = new Dictionary(); private readonly JsonSerializer _serializer; @@ -47,27 +43,37 @@ public InMemoryJsonAggregateRepository(JsonSerializer serializer) _serializer = Guard.NotNull(serializer, "serializer"); } - protected override Task>> TryLoadMemento(TIdentifier id) + protected override Task> TryLoadMemento(object id) { return Task.FromResult(_state.TryGetValue(_serializer.Serialize(id)) - .Select(json => _serializer.Deserialize>(json))); + .Select(json => _serializer.Deserialize(json))); } - protected override async Task StoreMemento(Func>>> mementoFactory) + protected override async Task StoreMemento(Func>> mementoFactory) { bool lockTaken = false; try { Monitor.Enter(_state, ref lockTaken); - + var memento = await mementoFactory(); _state[_serializer.Serialize(memento.Key)] = _serializer.Serialize(memento.Value); } finally { - if(lockTaken) + if (lockTaken) Monitor.Exit(_state); } } } -} + + public class InMemoryJsonAggregateRepository : AggregateRepository + where TAggregate : class, IAggregate + where TIdentifier : IEquatable + { + public InMemoryJsonAggregateRepository(JsonSerializer serializer) + : base(new InMemoryJsonAggregateRepository(serializer)) + { + } + } +} \ No newline at end of file diff --git a/src/iSynaptic.Core.Persistence/MementoBasedAggregateRepository.cs b/src/iSynaptic.Core.Persistence/MementoBasedAggregateRepository.cs index a772b7e..b6f13b5 100644 --- a/src/iSynaptic.Core.Persistence/MementoBasedAggregateRepository.cs +++ b/src/iSynaptic.Core.Persistence/MementoBasedAggregateRepository.cs @@ -31,34 +31,32 @@ namespace iSynaptic.Core.Persistence { - public abstract class MementoBasedAggregateRepository : AggregateRepository - where TAggregate : class, IAggregate - where TIdentifier : IEquatable + public abstract class MementoBasedAggregateRepository : AggregateRepository { - protected abstract Task>> TryLoadMemento(TIdentifier id); - protected abstract Task StoreMemento(Func>>> mementoFactory); + protected abstract Task> TryLoadMemento(object id); + protected abstract Task StoreMemento(Func>> mementoFactory); - protected override async Task> GetSnapshot(TIdentifier id, int maxVersion) + protected override async Task GetSnapshot(object id, int maxVersion) { return (await TryLoadMemento(id)) .Where(x => x.Snapshot.Select(y => y.Version <= maxVersion).ValueOrDefault()) - .Select(x => new AggregateSnapshotLoadFrame(x.AggregateType, id, x.Snapshot.Value)) + .Select(x => new AggregateSnapshotLoadFrame(x.AggregateType, id, x.Snapshot.Value)) .ValueOrDefault(); } - protected override async Task> GetEvents(TIdentifier id, int minVersion, int maxVersion) + protected override async Task GetEvents(object id, int minVersion, int maxVersion) { return (await TryLoadMemento(id)) - .Select(x => new AggregateEventsLoadFrame( - x.AggregateType, - id, - x.Events - .SkipWhile(y => y.Version < minVersion) - .TakeWhile(y => y.Version <= maxVersion))) + .Select(x => new AggregateEventsLoadFrame( + x.AggregateType, + id, + x.Events + .SkipWhile(y => y.Version < minVersion) + .TakeWhile(y => y.Version <= maxVersion))) .ValueOrDefault(); } - protected override Task SaveSnapshot(AggregateSnapshotSaveFrame frame) + protected override Task SaveSnapshot(AggregateSnapshotSaveFrame frame) { return StoreMemento(async () => { @@ -67,32 +65,32 @@ protected override Task SaveSnapshot(AggregateSnapshotSaveFrame fra var state = (await TryLoadMemento(snapshot.Id)).ValueOrDefault(); - return KeyValuePair.Create(snapshot.Id, new AggregateMemento(aggregateType, snapshot.ToMaybe(), state != null ? state.Events : null)); + return KeyValuePair.Create(snapshot.Id, new AggregateMemento(aggregateType, snapshot.ToMaybe(), state != null ? state.Events : null)); }); } - protected override Task SaveEvents(AggregateEventsSaveFrame frame) + protected override Task SaveEvents(AggregateEventsSaveFrame frame) { var aggregateType = frame.AggregateType; var id = frame.Id; var events = frame.Events.ToArray(); - return StoreMemento(async () => + return StoreMemento(async () => KeyValuePair.Create(id, (await TryLoadMemento(id)) .Select(x => { var lastEvent = x.Events.TryLast(); var actualVersion = lastEvent.Select(y => y.Version).ValueOrDefault(); - if(actualVersion != frame.ExpectedVersion) + if (actualVersion != frame.ExpectedVersion) throw new AggregateConcurrencyException(); - return new AggregateMemento( + return new AggregateMemento( aggregateType, x.Snapshot, x.Events.Concat(events.SkipWhile(y => y.Version <= lastEvent.Select(z => z.Version).ValueOrDefault()))); }) - .ValueOrDefault(() => new AggregateMemento(aggregateType, Maybe>.NoValue, events)))); + .ValueOrDefault(() => new AggregateMemento(aggregateType, Maybe.NoValue, events)))); } } } \ No newline at end of file diff --git a/src/iSynaptic.Core.Persistence/SqlServerAggregateRepository.cs b/src/iSynaptic.Core.Persistence/SqlServerAggregateRepository.cs index 2356559..f25d1d3 100644 --- a/src/iSynaptic.Core.Persistence/SqlServerAggregateRepository.cs +++ b/src/iSynaptic.Core.Persistence/SqlServerAggregateRepository.cs @@ -36,7 +36,7 @@ namespace iSynaptic.Core.Persistence { - public static class SqlServerAggregateRepository + public class SqlServerAggregateRepository : AggregateRepository { private static readonly Regex _scriptRegex = new Regex(@"(?