Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
halgari committed Jan 11, 2024
1 parent 85200db commit 91d56d3
Show file tree
Hide file tree
Showing 17 changed files with 351 additions and 87 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Diagnostics;
using NexusMods.EventSourcing.Abstractions.Serialization;

namespace NexusMods.EventSourcing.Abstractions.AttributeDefinitions;

/// <summary>
/// An attribute for the type of an entity.
/// </summary>
/// <param name="attrName"></param>
public class TypeAttributeDefinition : IAttribute<ScalarAccumulator<Type>>
public class TypeAttributeDefinition : IAttribute<ScalarAccumulator<EntityDefinition>>
{
/// <inheritdoc />
public ScalarAccumulator<Type> CreateAccumulator()
public ScalarAccumulator<EntityDefinition> CreateAccumulator()
{
return new ScalarAccumulator<Type>();
return new ScalarAccumulator<EntityDefinition>();
}

/// <inheritdoc />
Expand All @@ -31,9 +34,9 @@ public ScalarAccumulator<Type> CreateAccumulator()
public Type Get<TCtx>(TCtx context, EntityId owner) where TCtx : IEntityContext
{
EntityStructureRegistry.Register(this);
if (context.GetReadOnlyAccumulator<IEntity, TypeAttributeDefinition, ScalarAccumulator<Type>>(
if (context.GetReadOnlyAccumulator<IEntity, TypeAttributeDefinition, ScalarAccumulator<EntityDefinition>>(
new EntityId<IEntity>(owner), this, out var accumulator))
return accumulator.Value;
return accumulator.Value.Type;
// TODO, make this a custom exception and extract it to another method
throw new InvalidOperationException("No type attribute found for entity");
}
Expand All @@ -50,7 +53,8 @@ public void New<TEventCtx, TType>(TEventCtx context, EntityId<TType> id)
where TEventCtx : IEventContext
where TType : IEntity
{
if (context.GetAccumulator<IEntity, TypeAttributeDefinition, ScalarAccumulator<Type>>(EntityId<IEntity>.From(id.Value.Value), this, out var accumulator))
accumulator.Value = typeof(TType);
var definition = EntityStructureRegistry.GetDefinition<TType>();
if (context.GetAccumulator<IEntity, TypeAttributeDefinition, ScalarAccumulator<EntityDefinition>>(EntityId<IEntity>.From(id.Value.Value), IEntity.TypeAttribute, out var accumulator))
accumulator.Value = definition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,25 @@ public static IServiceCollection AddEvent<T>(this IServiceCollection collection)
return collection;
}


/// <summary>
/// Registers an entity with the service collection, this is required for loading snapshots and properly
/// tracking entity revisions in the application
/// </summary>
/// <param name="collection"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
public static IServiceCollection AddEntity<T>(this IServiceCollection collection) where T : class, IEntity
{
var type = typeof(T);
var attribute = type.GetCustomAttribute<EntityAttribute>();
if (attribute is null)
{
throw new ArgumentException($"Entity type {type.Name} does not have an EntityAttribute.");
}
EntityStructureRegistry.Register(new EntityDefinition(type, attribute.UUID, attribute.Revision));
return collection;
}

}
33 changes: 33 additions & 0 deletions src/NexusMods.EventSourcing.Abstractions/EntityAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;
using System.Buffers.Binary;

namespace NexusMods.EventSourcing.Abstractions;

[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct)]
public class EntityAttribute : Attribute
{
/// <summary>
/// Defines the UID of the entity, and what revision it is. Incrementing
/// the revision will cause the entity snapshots to be discarded, and regenerated
/// during the next load.
/// </summary>
/// <param name="uid"></param>
/// <param name="revision"></param>
public EntityAttribute(string guid, ushort revision)
{
Span<byte> span = stackalloc byte[16];
Guid.Parse(guid).TryWriteBytes(span);
UUID = BinaryPrimitives.ReadUInt128BigEndian(span);
Revision = revision;
}

/// <summary>
/// The revision of the entity *Type*.
/// </summary>
public ushort Revision { get; }

/// <summary>
/// The unique identifier of the entity *Type*.
/// </summary>
public UInt128 UUID { get; }
}
11 changes: 11 additions & 0 deletions src/NexusMods.EventSourcing.Abstractions/EntityDefinition.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;

namespace NexusMods.EventSourcing.Abstractions;

/// <summary>
/// Records the entity type, UUID and revision, for use in the DI container.
/// </summary>
/// <param name="Type"></param>
/// <param name="UUID"></param>
/// <param name="Revision"></param>
public record EntityDefinition(Type Type, UInt128 UUID, ushort Revision);
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ public static class EntityStructureRegistry
{
private static readonly ConcurrentDictionary<Type, ConcurrentDictionary<string, IAttribute>> _entityStructures = new();


private static readonly ConcurrentDictionary<Type, EntityDefinition> _entityDefinitionsByType = new();
private static readonly ConcurrentDictionary<UInt128, EntityDefinition> _entityDefinitionsByUUID = new();

/// <summary>
/// Register an attribute in the global registry.
/// </summary>
Expand All @@ -34,6 +38,18 @@ public static void Register(IAttribute attribute)
}
}

/// <summary>
/// Registers an entity type in the global registry.
/// </summary>
/// <param name="type"></param>
/// <param name="guid"></param>
/// <param name="revison"></param>
public static void Register(EntityDefinition definition)
{
_entityDefinitionsByType.TryAdd(definition.Type, definition);
_entityDefinitionsByUUID.TryAdd(definition.UUID, definition);
}

/// <summary>
/// Returns all attributes for the given entity type.
/// </summary>
Expand All @@ -51,4 +67,35 @@ public static bool TryGetAttributes(Type owner, [NotNullWhen(true)] out Concurre
return false;
}

/// <summary>
/// Gets the entity definition for the given C# type.
/// </summary>
/// <typeparam name="TType"></typeparam>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public static EntityDefinition GetDefinition<TType>() where TType : IEntity
{
if (_entityDefinitionsByType.TryGetValue(typeof(TType), out var result))
{
return result;
}

throw new InvalidOperationException($"No entity definition found for type {typeof(TType).Name}");
}

/// <summary>
/// Gets the entity definition for the given UUID.
/// </summary>
/// <param name="uuid"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public static EntityDefinition GetDefinitionByUUID(UInt128 uuid)
{
if (_entityDefinitionsByUUID.TryGetValue(uuid, out var result))
{
return result;
}

throw new InvalidOperationException($"No entity definition found for UUID {uuid}");
}
}
21 changes: 20 additions & 1 deletion src/NexusMods.EventSourcing.Abstractions/IEventStore.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Collections.Generic;
using System.Threading.Tasks;

namespace NexusMods.EventSourcing.Abstractions;
Expand All @@ -22,6 +23,24 @@ public interface IEventStore
/// <param name="ingester">The ingester to handle the events</param>
/// <param name="reverse">If true, plays the events in reverse</param>
/// <typeparam name="TIngester"></typeparam>
public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, bool reverse = false)
public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester)
where TIngester : IEventIngester;

/// <summary>
/// Replays the most recent snapshot for the given entity id, if one exists, then
/// replays every event.
/// </summary>
/// <param name="entityId"></param>
/// <param name="ingester"></param>
/// <typeparam name="TIngester"></typeparam>
public void EventsAndSnapshotForEntity<TIngester>(EntityId entityId, TIngester ingester)
where TIngester : ISnapshotEventIngester;

/// <summary>
/// Sets the snapshot for the given entity id and transaction id.
/// </summary>
/// <param name="txId"></param>
/// <param name="id"></param>
/// <param name="attributes"></param>
public void SetSnapshot(TransactionId txId, EntityId id, IEnumerable<(string AttributeName, IAttribute attribute)> attributes);
}
14 changes: 14 additions & 0 deletions src/NexusMods.EventSourcing.Abstractions/ISnapshotEventIngester.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace NexusMods.EventSourcing.Abstractions;

/// <summary>
/// An event ingester that supports entity snapshots
/// </summary>
public interface ISnapshotEventIngester : IEventIngester
{
/// <summary>
/// This method will be called for each attribute snapshotted, before the normal event ingestion is called
/// </summary>
/// <param name="attributeName"></param>
/// <param name="attribute"></param>
public void IngestSnapshotAttribute(string attributeName, IAttribute attribute);
}
45 changes: 11 additions & 34 deletions src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public TransactionId Add<T>(T eventValue) where T : IEvent
}
}

public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, bool reverse = false) where TIngester : IEventIngester
public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester) where TIngester : IEventIngester
{
Span<byte> startKey = stackalloc byte[24];
entityId.TryWriteBytes(startKey);
Expand All @@ -81,42 +81,19 @@ public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, bo
{
fixed (byte* endKeyPtr = endKey)
{
if (!reverse)
{

options.SetIterateUpperBound(endKeyPtr, 24);
options.SetIterateLowerBound(startKeyPtr, 24);
using var iterator = _db.NewIterator(_entityIndexColumn, options);
options.SetIterateUpperBound(endKeyPtr, 24);
options.SetIterateLowerBound(startKeyPtr, 24);
using var iterator = _db.NewIterator(_entityIndexColumn, options);

iterator.SeekToFirst();
while (iterator.Valid())
{
var key = iterator.GetKeySpan();
var txId = TransactionId.From(key);
var evt = _db.Get(key[16..], _deserializer, _eventsColumn);
if (!ingester.Ingest(txId, evt)) break;
iterator.Next();
}
}
else
iterator.SeekToFirst();
while (iterator.Valid())
{
options.SetIterateUpperBound(endKeyPtr, 24);
options.SetIterateLowerBound(startKeyPtr, 24);
using var iterator = _db.NewIterator(_entityIndexColumn, options);

iterator.SeekToLast();
while (iterator.Valid())
{
var key = iterator.GetKeySpan();
var keySpan = key.SliceFast(16);
var txId = TransactionId.From(keySpan);
var evt = _db.Get(keySpan, _deserializer, _eventsColumn);
if (!ingester.Ingest(txId, evt)) break;
iterator.Prev();
}

var key = iterator.GetKeySpan();
var txId = TransactionId.From(key);
var evt = _db.Get(key[16..], _deserializer, _eventsColumn);
if (!ingester.Ingest(txId, evt)) break;
iterator.Next();
}

}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using System.Buffers.Binary;
using NexusMods.EventSourcing.Abstractions;
using NexusMods.EventSourcing.Abstractions.Serialization;
using Reloaded.Memory.Extensions;

namespace NexusMods.EventSourcing.Serialization;

/// <summary>
/// Serializes an entity definition, while intentionally not serializing the type of the entity.
/// </summary>
public sealed class EntityDefinitionSerializer : IFixedSizeSerializer<EntityDefinition>
{
/// <inheritdoc />
public bool CanSerialize(Type valueType)
{
return valueType == typeof(EntityDefinition);
}

/// <inheritdoc />
public bool TryGetFixedSize(Type valueType, out int size)
{
size = 16 + sizeof(ushort);
return true;
}

/// <inheritdoc />
public void Serialize(EntityDefinition value, Span<byte> output)
{
BinaryPrimitives.WriteUInt128BigEndian(output, value.UUID);
BinaryPrimitives.WriteUInt16BigEndian(output.SliceFast(16), value.Revision);
}

/// <inheritdoc />
public EntityDefinition Deserialize(ReadOnlySpan<byte> from)
{
var uuid = BinaryPrimitives.ReadUInt128BigEndian(from);
var revision = BinaryPrimitives.ReadUInt16BigEndian(from.SliceFast(16));

var existing = EntityStructureRegistry.GetDefinitionByUUID(uuid);

if (existing.Revision != revision)
return new EntityDefinition(existing.Type, uuid, revision);

return existing;
}
}
43 changes: 43 additions & 0 deletions src/NexusMods.EventSourcing/Serialization/SerializationRegistry.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using NexusMods.EventSourcing.Abstractions.Serialization;
using Reloaded.Memory.Extensions;

namespace NexusMods.EventSourcing.Serialization;

Expand Down Expand Up @@ -79,4 +81,45 @@ public void RegisterSerializer(Type serializedType, ISerializer serializer)
{
_cachedSerializers.TryAdd(serializedType, serializer);
}


/// <inheritdoc />
public void Serialize<TVal>(IBufferWriter<byte> writer, TVal value)
{
var serializer = GetSerializer(typeof(TVal));
if (serializer.TryGetFixedSize(typeof(TVal), out var size) && serializer is IFixedSizeSerializer<TVal> fixedSizeSerializer)
{
var span = writer.GetSpan(size);
fixedSizeSerializer.Serialize(value, span);
writer.Advance(size);
}
else if (serializer is IVariableSizeSerializer<TVal> variableSizeSerializer)
{
variableSizeSerializer.Serialize(value, writer);
}
else
{
throw new Exception($"Unknown serializer type {serializer.GetType().Name}");
}

}

/// <inheritdoc />
public int Deserialize<TVal>(ReadOnlySpan<byte> bytes, out TVal value)
{
var serializer = GetSerializer(typeof(TVal));
if (serializer.TryGetFixedSize(typeof(TVal), out var size) && serializer is IFixedSizeSerializer<TVal> fixedSizeSerializer)
{
value = fixedSizeSerializer.Deserialize(bytes.SliceFast(0, size));
return size;
}
else if (serializer is IVariableSizeSerializer<TVal> variableSizeSerializer)
{
return variableSizeSerializer.Deserialize(bytes, out value);
}
else
{
throw new Exception($"Unknown serializer type {serializer.GetType().Name}");
}
}
}
Loading

0 comments on commit 91d56d3

Please sign in to comment.