Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
halgari committed Jan 12, 2024
1 parent f25ceaa commit 0b38b82
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
using System.Buffers.Binary;
using System.Collections.Generic;
using NexusMods.EventSourcing.Abstractions;
using NexusMods.EventSourcing.Serialization;
using Reloaded.Memory.Extensions;
using RocksDbSharp;

namespace NexusMods.EventSourcing.RocksDB;

public class RocksDBEventStore<TSerializer> : IEventStore
public sealed class RocksDBEventStore<TSerializer> : AEventStore
where TSerializer : IEventSerializer
{
private readonly ColumnFamilies _families;
Expand All @@ -18,8 +19,9 @@ public class RocksDBEventStore<TSerializer> : IEventStore
private readonly ColumnFamilyHandle _entityIndexColumn;
private readonly TSerializer _serializer;
private readonly SpanDeserializer<TSerializer> _deserializer;
private readonly ColumnFamilyHandle _snapshotColumn;

public RocksDBEventStore(TSerializer serializer, Settings settings)
public RocksDBEventStore(TSerializer serializer, Settings settings, SerializationRegistry serializationRegistry) : base(serializationRegistry)
{
_serializer = serializer;
_families = new ColumnFamilies();
Expand All @@ -31,27 +33,28 @@ public RocksDBEventStore(TSerializer serializer, Settings settings)
settings.StorageLocation.ToString(), new ColumnFamilies());
_eventsColumn = _db.CreateColumnFamily(new ColumnFamilyOptions(), "events");
_entityIndexColumn = _db.CreateColumnFamily(new ColumnFamilyOptions(), "entityIndex");
//_eventsColumn = _db.GetColumnFamily("events");
//_entityIndexColumn = _db.GetColumnFamily("entityIndex");
_snapshotColumn = _db.CreateColumnFamily(new ColumnFamilyOptions(), "snapshots");
_tx = TransactionId.From(0);

_deserializer = new SpanDeserializer<TSerializer>(serializer);
}


public TransactionId Add<T>(T eventValue) where T : IEvent
public override TransactionId Add<T>(T eventValue)
{
lock (this)
{
_tx = _tx.Next();

// Write the event itself
{
Span<byte> keySpan = stackalloc byte[8];
_tx.WriteTo(keySpan);
var span = _serializer.Serialize(eventValue);
_db.Put(keySpan, span, _eventsColumn);
}

// Update the entity indexes to mark them as having this event
{
var ingester = new ModifiedEntitiesIngester();
eventValue.Apply(ingester);
Expand All @@ -67,20 +70,14 @@ public TransactionId Add<T>(T eventValue) where T : IEvent
}
}

public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId) where TIngester : IEventIngester
{
throw new NotImplementedException();
}



public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester) where TIngester : IEventIngester
public override void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId)
{
Span<byte> startKey = stackalloc byte[24];
entityId.TryWriteBytes(startKey);
BinaryPrimitives.WriteUInt64BigEndian(startKey.SliceFast(16), fromId.Value);
Span<byte> endKey = stackalloc byte[24];
entityId.TryWriteBytes(endKey);
BinaryPrimitives.WriteUInt64BigEndian(endKey[16..], ulong.MaxValue);
BinaryPrimitives.WriteUInt64BigEndian(endKey.SliceFast(16), toId.Value);

var options = new ReadOptions();
unsafe
Expand Down Expand Up @@ -113,8 +110,13 @@ public TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, out IAcc
throw new NotImplementedException();
}

public void SetSnapshot(TransactionId txId, EntityId id, IDictionary<IAttribute, IAccumulator> attributes)
public override void SetSnapshot(TransactionId txId, EntityId id, IDictionary<IAttribute, IAccumulator> attributes)
{
throw new NotImplementedException();
var span = SerializeSnapshot(id, attributes);

Span<byte> keySpan = stackalloc byte[24];
id.TryWriteBytes(keySpan);
BinaryPrimitives.WriteUInt64BigEndian(keySpan.SliceFast(16), txId.Value);
_db.Put(keySpan, span, _snapshotColumn);
}
}

0 comments on commit 0b38b82

Please sign in to comment.