Skip to content

Commit

Permalink
Extract snapshot serialization into an external class
Browse files Browse the repository at this point in the history
  • Loading branch information
halgari committed Jan 12, 2024
1 parent 1873c81 commit f25ceaa
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 80 deletions.
110 changes: 110 additions & 0 deletions src/NexusMods.EventSourcing/AEventStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
using System;
using System.Buffers.Binary;
using System.Collections.Generic;
using NexusMods.EventSourcing.Abstractions;
using NexusMods.EventSourcing.Abstractions.Serialization;
using Reloaded.Memory.Extensions;

namespace NexusMods.EventSourcing;

public abstract class AEventStore : IEventStore
{
private readonly IVariableSizeSerializer<string> _stringSerializer;
private readonly IFixedSizeSerializer<EntityDefinition> _entityDefinitionSerializer;
private readonly ISerializationRegistry _serializationRegistry;
private readonly PooledMemoryBufferWriter _writer;

public AEventStore(ISerializationRegistry serializationRegistry)
{
_stringSerializer = (serializationRegistry.GetSerializer(typeof(string)) as IVariableSizeSerializer<string>)!;
_entityDefinitionSerializer = (serializationRegistry.GetSerializer(typeof(EntityDefinition)) as IFixedSizeSerializer<EntityDefinition>)!;
_serializationRegistry = serializationRegistry;
_writer = new PooledMemoryBufferWriter();

}

protected TransactionId DeserializeSnapshot(out IAccumulator loadedDefinition,
out (IAttribute Attribute, IAccumulator Accumulator)[] loadedAttributes, ReadOnlySpan<byte> snapshot,
KeyValuePair<TransactionId, byte[]> startPoint)
{
var entityDefinition = _entityDefinitionSerializer.Deserialize(snapshot.SliceFast(0, 18));

var typeAccumulator = IEntity.TypeAttribute.CreateAccumulator();
typeAccumulator.ReadFrom(ref snapshot, _serializationRegistry);


var appDefinition = EntityStructureRegistry.GetDefinitionByUUID(entityDefinition.UUID);

if (entityDefinition.Revision != appDefinition.Revision)
{
loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>();
loadedDefinition = default!;
return default;
}

snapshot = snapshot.SliceFast(18);

var numberOfAttrs = BinaryPrimitives.ReadUInt16BigEndian(snapshot);
snapshot = snapshot.SliceFast(sizeof(ushort));

var results = GC.AllocateUninitializedArray<(IAttribute, IAccumulator)>(numberOfAttrs);

if (!EntityStructureRegistry.TryGetAttributes(entityDefinition.Type, out var attributes))
throw new Exception("Entity definition does not match the current structure registry.");

for (var i = 0; i < numberOfAttrs; i++)
{
var read = _stringSerializer.Deserialize(snapshot, out var attributeName);
snapshot = snapshot.SliceFast(read);

if (!attributes.TryGetValue(attributeName, out var attribute))
throw new Exception("Entity definition does not match the current structure registry.");

var accumulator = attribute.CreateAccumulator();

accumulator.ReadFrom(ref snapshot, _serializationRegistry);
snapshot = snapshot.SliceFast(read);

results[i] = (attribute, accumulator);
}

loadedAttributes = results;
loadedDefinition = typeAccumulator;
return startPoint.Key;
}

protected ReadOnlySpan<byte> SerializeSnapshot(EntityId id, IDictionary<IAttribute, IAccumulator> attributes)
{
_writer.Reset();

// Snapshot starts with the type attribute value
var typeAccumulator = attributes[IEntity.TypeAttribute];
typeAccumulator.WriteTo(_writer, _serializationRegistry);

var sizeSpan = _writer.GetSpan(sizeof(ushort));
BinaryPrimitives.WriteUInt16BigEndian(sizeSpan, (ushort) (attributes.Count - 1));
_writer.Advance(sizeof(ushort));


// And then each attribute in any order
foreach (var (attribute, accumulator) in attributes)
{
if (attribute == IEntity.TypeAttribute) continue;

var attributeName = attribute.Name;
_stringSerializer.Serialize(attributeName, _writer);
accumulator.WriteTo(_writer, _serializationRegistry);
}

var span = _writer.GetWrittenSpan();
return span;
}

public abstract TransactionId Add<T>(T eventEntity) where T : IEvent;
public abstract void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId) where TIngester : IEventIngester;

public abstract TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, out IAccumulator loadedDefinition,
out (IAttribute Attribute, IAccumulator Accumulator)[] loadedAttributes);

public abstract void SetSnapshot(TransactionId txId, EntityId id, IDictionary<IAttribute, IAccumulator> attributes);
}
88 changes: 8 additions & 80 deletions tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,20 @@

namespace NexusMods.EventSourcing.TestModel;

public class InMemoryEventStore<TSerializer> : IEventStore
public class InMemoryEventStore<TSerializer> : AEventStore
where TSerializer : IEventSerializer
{
private TransactionId _tx = TransactionId.From(0);
private readonly Dictionary<EntityId,IList<(TransactionId TxId, byte[] Data)>> _events = new();
private readonly Dictionary<EntityId, SortedDictionary<TransactionId, byte[]>> _snapshots = new();
private TSerializer _serializer;
private readonly IVariableSizeSerializer<string> _stringSerializer;
private readonly PooledMemoryBufferWriter _writer;
private readonly ISerializationRegistry _serializationRegistry;
private readonly IFixedSizeSerializer<EntityDefinition> _entityDefinitionSerializer;

public InMemoryEventStore(TSerializer serializer, ISerializationRegistry serializationRegistry)
public InMemoryEventStore(TSerializer serializer, ISerializationRegistry serializationRegistry) : base(serializationRegistry)
{
_serializer = serializer;
_stringSerializer = (serializationRegistry.GetSerializer(typeof(string)) as IVariableSizeSerializer<string>)!;
_entityDefinitionSerializer = (serializationRegistry.GetSerializer(typeof(EntityDefinition)) as IFixedSizeSerializer<EntityDefinition>)!;
_serializationRegistry = serializationRegistry;
_writer = new PooledMemoryBufferWriter();
}

public TransactionId Add<T>(T entity) where T : IEvent
public override TransactionId Add<T>(T entity)
{
lock (this)
{
Expand All @@ -51,8 +43,7 @@ public TransactionId Add<T>(T entity) where T : IEvent
}


public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId)
where TIngester : IEventIngester
public override void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId)
{
if (!_events.TryGetValue(entityId, out var events))
return;
Expand All @@ -67,7 +58,7 @@ public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, Tr
}
}

public TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, out IAccumulator loadedDefinition,
public override TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, out IAccumulator loadedDefinition,
out (IAttribute Attribute, IAccumulator Accumulator)[] loadedAttributes)
{
if (!_snapshots.TryGetValue(entityId, out var snapshots))
Expand All @@ -87,76 +78,13 @@ public TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, out IAcc
}

var snapshot = (ReadOnlySpan<byte>)startPoint.Value.AsSpanFast();
var entityDefinition = _entityDefinitionSerializer.Deserialize(snapshot.SliceFast(0, 18));

var typeAccumulator = IEntity.TypeAttribute.CreateAccumulator();
typeAccumulator.ReadFrom(ref snapshot, _serializationRegistry);


var appDefinition = EntityStructureRegistry.GetDefinitionByUUID(entityDefinition.UUID);

if (entityDefinition.Revision != appDefinition.Revision)
{
loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>();
loadedDefinition = default!;
return default;
}

snapshot = snapshot.SliceFast(18);

var numberOfAttrs = BinaryPrimitives.ReadUInt16BigEndian(snapshot);
snapshot = snapshot.SliceFast(sizeof(ushort));

var results = GC.AllocateUninitializedArray<(IAttribute, IAccumulator)>(numberOfAttrs);

if (!EntityStructureRegistry.TryGetAttributes(entityDefinition.Type, out var attributes))
throw new Exception("Entity definition does not match the current structure registry.");

for (var i = 0; i < numberOfAttrs; i++)
{
var read = _stringSerializer.Deserialize(snapshot, out var attributeName);
snapshot = snapshot.SliceFast(read);

if (!attributes.TryGetValue(attributeName, out var attribute))
throw new Exception("Entity definition does not match the current structure registry.");

var accumulator = attribute.CreateAccumulator();

accumulator.ReadFrom(ref snapshot, _serializationRegistry);
snapshot = snapshot.SliceFast(read);

results[i] = (attribute, accumulator);
}

loadedAttributes = results;
loadedDefinition = typeAccumulator;
return startPoint.Key;
return DeserializeSnapshot(out loadedDefinition, out loadedAttributes, snapshot, startPoint);
}

public void SetSnapshot(TransactionId txId, EntityId id, IDictionary<IAttribute, IAccumulator> attributes)
public override void SetSnapshot(TransactionId txId, EntityId id, IDictionary<IAttribute, IAccumulator> attributes)
{
_writer.Reset();

// Snapshot starts with the type attribute value
var typeAccumulator = attributes[IEntity.TypeAttribute];
typeAccumulator.WriteTo(_writer, _serializationRegistry);

var sizeSpan = _writer.GetSpan(sizeof(ushort));
BinaryPrimitives.WriteUInt16BigEndian(sizeSpan, (ushort) (attributes.Count - 1));
_writer.Advance(sizeof(ushort));


// And then each attribute in any order
foreach (var (attribute, accumulator) in attributes)
{
if (attribute == IEntity.TypeAttribute) continue;

var attributeName = attribute.Name;
_stringSerializer.Serialize(attributeName, _writer);
accumulator.WriteTo(_writer, _serializationRegistry);
}

var span = _writer.GetWrittenSpan();
var span = SerializeSnapshot(id, attributes);

if (!_snapshots.TryGetValue(id, out var snapshots))
{
Expand Down

0 comments on commit f25ceaa

Please sign in to comment.