Skip to content

Commit

Permalink
Add TxId and a early termination flag to IEventIngester.cs
Browse files Browse the repository at this point in the history
  • Loading branch information
halgari committed Jan 10, 2024
1 parent 09c492c commit 1a5283e
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ private struct EventCounter : IEventIngester
{
public int Count { get; private set; }

public void Ingest(IEvent @event)
public bool Ingest(TransactionId id, IEvent @event)
{
Count++;
return true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ public void ReadEvents()
private class Counter : IEventIngester
{
public int Count { get; private set; }
public void Ingest(IEvent @event)
public bool Ingest(TransactionId _, IEvent @event)
{
Count++;
return true;
}
}
}
7 changes: 4 additions & 3 deletions src/NexusMods.EventSourcing.Abstractions/IEventIngester.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ public interface IEventIngester
/// <summary>
/// Ingests the given event into the context.
/// </summary>
/// <param name="event"></param>
/// <returns></returns>
public void Ingest(IEvent @event);
/// <param name="id">The transaction id of the event</param>
/// <param name="event">The event</param>
/// <returns>false if playback of events should be stopped, true if the next event should be read</returns>
public bool Ingest(TransactionId id, IEvent @event);
}
21 changes: 21 additions & 0 deletions src/NexusMods.EventSourcing.Abstractions/TransactionId.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System;
using System.Buffers.Binary;
using TransparentValueObjects;

namespace NexusMods.EventSourcing.Abstractions;
Expand All @@ -16,4 +18,23 @@ public readonly partial struct TransactionId
/// </summary>
/// <returns></returns>
public TransactionId Next() => new(Value + 1);

/// <summary>
/// Write the transaction id to the given span.
/// </summary>
/// <param name="span"></param>
public void WriteTo(Span<byte> span)
{
BinaryPrimitives.WriteUInt64BigEndian(span, Value);
}

/// <summary>
/// Read a transaction id from the given span.
/// </summary>
/// <param name="span"></param>
/// <returns></returns>
public static TransactionId From(ReadOnlySpan<byte> span)
{
return new(BinaryPrimitives.ReadUInt64BigEndian(span));
}
}
3 changes: 2 additions & 1 deletion src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester) wh
while (iterator.Valid())
{
var key = iterator.GetKeySpan();
var txId = TransactionId.From(key);
var evt = _db.Get(key[16..], _deserializer, _eventsColumn);
ingester.Ingest(evt);
if (!ingester.Ingest(txId, evt)) break;
iterator.Next();
}

Expand Down
3 changes: 2 additions & 1 deletion src/NexusMods.EventSourcing/EntityContextIngester.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ namespace NexusMods.EventSourcing;
public struct EntityContextIngester(Dictionary<IAttribute, IAccumulator> values, EntityId id) : IEventContext, IEventIngester
{
/// <inheritdoc />
public void Ingest(IEvent @event)
public bool Ingest(TransactionId _, IEvent @event)
{
@event.Apply(this);
return true;
}

/// <inheritdoc />
Expand Down
3 changes: 0 additions & 3 deletions src/NexusMods.EventSourcing/NexusMods.EventSourcing.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
<ItemGroup>
<ProjectReference Include="..\NexusMods.EventSourcing.Abstractions\NexusMods.EventSourcing.Abstractions.csproj" />
</ItemGroup>
<ItemGroup>
<Folder Include="Extensions\" />
</ItemGroup>
<ItemGroup>
<None Update="xunit.runner.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
Expand Down
10 changes: 5 additions & 5 deletions tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class InMemoryEventStore<TSerializer>(TSerializer serializer) : IEventSto
where TSerializer : IEventSerializer
{
private TransactionId _tx = TransactionId.From(0);
private readonly Dictionary<EntityId,IList<byte[]>> _events = new();
private readonly Dictionary<EntityId,IList<(TransactionId TxId, byte[] Data)>> _events = new();

public TransactionId Add<T>(T entity) where T : IEvent
{
Expand All @@ -21,11 +21,11 @@ public TransactionId Add<T>(T entity) where T : IEvent
{
if (!_events.TryGetValue(id, out var value))
{
value = new List<byte[]>();
value = new List<(TransactionId, byte[])>();
_events.Add(id, value);
}

value.Add(data.ToArray());
value.Add((_tx, data.ToArray()));
}

return _tx;
Expand All @@ -40,8 +40,8 @@ public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester)
return;
foreach (var data in events)
{
var @event = serializer.Deserialize(data)!;
ingester.Ingest(@event);
var @event = serializer.Deserialize(data.Data)!;
if (!ingester.Ingest(data.TxId, @event)) break;
}
}
}
3 changes: 2 additions & 1 deletion tests/NexusMods.EventSourcing.Tests/AEventStoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ public void CanGetAndReturnEvents()
private class EventAccumulator : IEventIngester
{
public List<IEvent> Events { get; } = new();
public void Ingest(IEvent @event)
public bool Ingest(TransactionId _, IEvent @event)
{
Events.Add(@event);
return true;
}
}

Expand Down

0 comments on commit 1a5283e

Please sign in to comment.