diff --git a/src/Akka.Persistence.Migration/Actors/PersistenceWriterActor.cs b/src/Akka.Persistence.Migration/Actors/PersistenceWriterActor.cs index 1d255a7..153c2be 100644 --- a/src/Akka.Persistence.Migration/Actors/PersistenceWriterActor.cs +++ b/src/Akka.Persistence.Migration/Actors/PersistenceWriterActor.cs @@ -7,13 +7,9 @@ namespace Akka.Persistence.Migration.Actors; -public class PersistenceWriterActor: ReceivePersistentActor, IWithTimers +public class PersistenceWriterActor: ReceivePersistenceWithRetryActor { - private readonly ILoggingAdapter _log; - private readonly List _exceptions = []; - private readonly int _maxRetries; - private readonly TimeSpan _retryInterval; - private PersistenceWriterProtocol.IOperation? _currentOperation; + private IActorRef? _sender; public PersistenceWriterActor(MigrationOptions options, string persistenceId) { @@ -21,15 +17,14 @@ public PersistenceWriterActor(MigrationOptions options, string persistenceId) JournalPluginId = options.ToJournalId; SnapshotPluginId = options.ToSnapshotStoreId; - _maxRetries = options.MaxRetries; - _retryInterval = options.RetryInterval; - _log = Context.GetLogger(); - + MaxRetries = options.MaxRetries; + RetryInterval = options.RetryInterval; Become(Idle); } public override string PersistenceId { get; } - public ITimerScheduler Timers { get; set; } = null!; + protected override int MaxRetries { get; } + protected override TimeSpan RetryInterval { get; } private void Idle() { @@ -43,109 +38,64 @@ private void Idle() { HandleSaveSnapshot(snapshot.Snapshot, Sender); }); + + CommandRetry(); + CommandAny(HandleUnhandled); } private void Writing() { - Command(fail => HandleFailure(fail.Cause)); - Command(_ => Self.Tell(PersistenceWriterProtocol.SnapshotWriteCompleted.Instance)); - Command(fail => HandleFailure(fail.Cause)); - Command(_ => HandleRetry()); - Command(HandleComplete); + CommandRetry(); + CommandAny(HandleUnhandled); } private void HandlePersist(object evt, IActorRef sender) { - _currentOperation ??= new PersistenceWriterProtocol.PersistOperation(evt, sender); + _sender = sender; Become(Writing); - _log.Debug($"Migrating event {evt}"); - Persist(evt, _ => + Log.Debug($"Migrating event {evt}"); + PersistWithRetry(evt, _ => { - _log.Debug($"Event {evt} migrated"); - Self.Tell(PersistenceWriterProtocol.PersistWriteCompleted.Instance); + Log.Debug($"Event {evt} migrated"); + _sender.Tell(PersistenceWriterProtocol.PersistWriteCompleted.Instance); + _sender = null; + + Become(Idle); }); } private void HandleSaveSnapshot(object snapshot, IActorRef sender) { - _currentOperation ??= new PersistenceWriterProtocol.SnapshotOperation(snapshot, sender); + _sender = sender; Become(Writing); - SaveSnapshot(snapshot); + SaveSnapshotWithRetry(snapshot); } - private void HandleComplete(PersistenceWriterProtocol.IWriteSucceeded message) + private void HandleUnhandled(object msg) { - _exceptions.Clear(); - _currentOperation!.ReplyTo.Tell(message); - _currentOperation = null; - - Become(Idle); + Log.Error($"Illegal out of band command detected: {msg}"); + Unhandled(msg); } - private void HandleRetry() + protected override void OnPersistFailure(Exception cause) { - switch (_currentOperation) - { - case PersistenceWriterProtocol.PersistOperation p: - HandlePersist(p.Message, p.ReplyTo); - break; - case PersistenceWriterProtocol.SnapshotOperation s: - HandleSaveSnapshot(s.Message, s.ReplyTo); - break; - default: - throw new AggregateException($"Unknown migration operation: {_currentOperation!.GetType()}", _exceptions); - } + _sender.Tell(new PersistenceWriterProtocol.PersistFailed(cause)); } - private void HandleFailure(Exception cause) + protected override void OnSaveSnapshotSuccess(SaveSnapshotSuccess success) { - if (_currentOperation is null) - throw new NullReferenceException("_currentOperation should not be null"); + _sender.Tell(PersistenceWriterProtocol.SnapshotWriteCompleted.Instance); + _sender = null; - _exceptions.Add(cause); - if (_exceptions.Count < _maxRetries) - { - _log.Info("{0} write operation failed ({1}/{2}), retrying in {3} seconds...", - _currentOperation.Name, _exceptions.Count, _maxRetries, _retryInterval.TotalSeconds); - - Timers.StartSingleTimer(PersistenceWriterProtocol.Retry.Instance, PersistenceWriterProtocol.Retry.Instance, _retryInterval); - return; - } - - try - { - throw new AggregateException( - message: string.Format(_currentOperation.ErrorMessage, _maxRetries), - innerExceptions: _exceptions); - } - catch (AggregateException ex) - { - _log.Error(ex, _currentOperation.ErrorMessage, _maxRetries); - _currentOperation.ReplyTo.Tell(_currentOperation.FailedMessage(ex)); - } - } - - private void HandleUnhandled(object msg) - { - _log.Error($"Illegal out of band command detected: {msg}"); - Unhandled(msg); - } - - protected override void OnPersistFailure(Exception cause, object @event, long sequenceNr) - { - Log.Warning(cause, "Rejected to persist event type [{0}] with sequence number [{1}] for persistenceId [{2}] due to [{3}].", - @event.GetType(), sequenceNr, PersistenceId, cause.Message); - _currentOperation?.ReplyTo.Tell(new PersistenceWriterProtocol.PersistFailed(cause)); + Become(Idle); } - - protected override void OnPersistRejected(Exception cause, object @event, long sequenceNr) + + protected override void OnSaveSnapshotFailure(Exception cause) { - Log.Warning(cause, "Rejected to persist event type [{0}] with sequence number [{1}] for persistenceId [{2}] due to [{3}].", - @event.GetType(), sequenceNr, PersistenceId, cause.Message); - Self.Tell(new PersistenceWriterProtocol.PersistFailed(cause)); + _sender.Tell(new PersistenceWriterProtocol.SnapshotFailed(cause)); } }