Skip to content

Commit

Permalink
Fix email zone (#8073)
Browse files Browse the repository at this point in the history
  • Loading branch information
isoos authored Sep 25, 2024
1 parent ab43e2d commit 227bbf1
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 99 deletions.
261 changes: 162 additions & 99 deletions app/lib/frontend/email_sender.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,96 @@ abstract class EmailSender {
Future<void> sendMessage(EmailMessage message);
}

abstract class EmailSenderConnection {
Future<void> send(EmailMessage message);
Future<void> close();
}

abstract class EmailSenderBase implements EmailSender {
/// We want to send only one email at a time.
final _pool = Pool(1);

/// The current zone when this [EmailSender] instance was created.
final Zone _parentZone;

final _connectionsBySender = <String, Future<_ZonedConnection>>{};
final _forceReconnectSenders = <String>{};
DateTime _backoffUntil = DateTime(0);

EmailSenderBase() : _parentZone = Zone.current;

Future<EmailSenderConnection> connect(String senderEmail);
void invalidateCredentials();

@override
bool get shouldBackoff => clock.now().isBefore(_backoffUntil);

@override
Future<void> sendMessage(EmailMessage message) async {
// One attempt at a time.
await _pool.withResource(() async {
return await _sendMessage(message);
});
}

Future<void> _sendMessage(EmailMessage message) async {
_logger.info('Sending email: ${message.debugInfo}...');
final sender = message.from.email;
try {
await retry(
() async {
final c = await _getConnection(sender);
await c.send(message);
},
retryIf: (e) =>
e is TimeoutException ||
e is IOException ||
e is SmtpClientCommunicationException ||
e is SmtpNoGreetingException,
delayFactor: Duration(seconds: 2),
maxAttempts: 2,
onRetry: (_) {
_forceReconnectSenders.add(sender);
},
);
} on SmtpMessageValidationException catch (e, st) {
_logger.info('Sending email failed: ${message.debugInfo}.', e, st);
throw EmailSenderException.invalid();
} on SmtpClientAuthenticationException catch (e, st) {
_logger.shout('Sending email failed due to invalid auth: $e', e, st);
_forceReconnectSenders.add(sender);
invalidateCredentials();
_backoffUntil = clock.now().add(Duration(minutes: 2));
throw EmailSenderException.failed();
} on MailerException catch (e, st) {
_logger.warning('Sending email failed: ${message.debugInfo}.', e, st);
throw EmailSenderException.failed();
}
}

Future<_ZonedConnection> _getConnection(String sender) async {
final connectionFuture = _connectionsBySender[sender];
final old = connectionFuture == null ? null : await connectionFuture;
final forceReconnect = _forceReconnectSenders.remove(sender);
if (!forceReconnect && old != null && !old.isExpired) {
return old;
}
final newConnectionFuture = Future.microtask(() async {
// closing the old connection if there was any, ignoring errors
await old?.close();

// PersistentConnection needs to be created in its designated zone, as its
// internal message subscription starts inside the constructor.
final connectionZone = _CatchAllZone(_parentZone);
final connection =
await connectionZone._zone.run(() async => connect(sender));
return _ZonedConnection(connectionZone, connection);
});
_connectionsBySender[sender] = newConnectionFuture;
return newConnectionFuture;
}
}

EmailSender createGmailRelaySender(
String serviceAccountEmail,
http.Client authClient,
Expand All @@ -46,22 +136,32 @@ EmailSender createGmailRelaySender(
authClient,
);

class _LoggingEmailSender implements EmailSender {
class _LoggingEmailSender extends EmailSenderBase {
@override
bool get shouldBackoff => false;
Future<EmailSenderConnection> connect(String senderEmail) async {
return _LoggingEmailSenderConnection();
}

@override
Future<void> sendMessage(EmailMessage message) async {
final debugHeader = '(${message.subject}) '
'from ${message.from} '
'to ${message.recipients.join(', ')}';
void invalidateCredentials() {
// ignore
}
}

class _LoggingEmailSenderConnection extends EmailSenderConnection {
@override
Future<void> send(EmailMessage message) async {
final urls = _simpleUrlRegExp
.allMatches(message.bodyText)
.map((e) => e.group(0))
.toList();
_logger.info('Not sending email (SMTP not configured): '
'$debugHeader${urls.map((e) => '\n$e').join('')}');
'${message.debugInfo}\n${urls.map((e) => '\n$e').join('')}');
}

@override
Future<void> close() async {
// ignored
}
}

Expand Down Expand Up @@ -119,102 +219,33 @@ Address? _toAddress(EmailAddress? input) =>
/// [2]: https://developers.google.com/gmail/imap/xoauth2-protocol#the_sasl_xoauth2_mechanism
/// [3]: https://developers.google.com/identity/protocols/oauth2/service-account
/// [4]: https://cloud.google.com/iam/docs/reference/credentials/rest/v1/projects.serviceAccounts/signJwt
class _GmailSmtpRelay implements EmailSender {
class _GmailSmtpRelay extends EmailSenderBase {
static final _googleOauth2TokenUrl =
Uri.parse('https://oauth2.googleapis.com/token');
static const _scopes = ['https://mail.google.com/'];

final String _serviceAccountEmail;
final http.Client _authClient;

final _connectionsBySender = <String, Future<_GmailConnection>>{};
final _forceReconnectSenders = <String>{};

/// The current zone when this [_GmailSmtpRelay] instance was created.
final Zone _parentZone;
final _pool = Pool(1);

DateTime _accessTokenRefreshed = DateTime(0);
DateTime _backoffUntil = DateTime(0);
Future<String>? _accessToken;

_GmailSmtpRelay(
this._serviceAccountEmail,
this._authClient,
) : _parentZone = Zone.current;

@override
bool get shouldBackoff => clock.now().isBefore(_backoffUntil);
);

@override
Future<void> sendMessage(EmailMessage message) async {
// One attempt at a time.
await _pool.withResource(() async => _sendMessage(message));
Future<EmailSenderConnection> connect(String senderEmail) async {
return _GmailConnection(PersistentConnection(
await _getSmtpServer(senderEmail),
timeout: Duration(seconds: 15),
));
}

Future<void> _sendMessage(EmailMessage message) async {
final debugHeader = 'Message-ID:${message.localMessageId}@pub.dev '
'(${message.subject}) '
'from ${message.from} '
'to ${message.recipients.join(', ')}';
_logger.info('Sending email: $debugHeader...');
final sender = message.from.email;
try {
await retry(
() async {
final c = await _getConnection(sender);
await c.send(_toMessage(message));
},
retryIf: (e) =>
e is TimeoutException ||
e is IOException ||
e is SmtpClientCommunicationException ||
e is SmtpNoGreetingException,
delayFactor: Duration(seconds: 2),
maxAttempts: 2,
onRetry: (_) {
_forceReconnectSenders.add(sender);
},
);
} on SmtpMessageValidationException catch (e, st) {
_logger.info('Sending email failed: $debugHeader.', e, st);
throw EmailSenderException.invalid();
} on SmtpClientAuthenticationException catch (e, st) {
_logger.shout('Sending email failed due to invalid auth: $e', e, st);
_backoffUntil = clock.now().add(Duration(minutes: 2));
_forceReconnectSenders.add(sender);
_accessToken = null;
throw EmailSenderException.failed();
} on MailerException catch (e, st) {
_logger.warning('Sending email failed: $debugHeader.', e, st);
throw EmailSenderException.failed();
}
}

Future<_GmailConnection> _getConnection(String sender) async {
final connectionFuture = _connectionsBySender[sender];
final old = connectionFuture == null ? null : await connectionFuture;
final forceReconnect = _forceReconnectSenders.remove(sender);
if (!forceReconnect && old != null && !old.isExpired) {
return old;
}
final newConnectionFuture = Future.microtask(() async {
// closing the old connection if there was any, ignoring errors
await old?.close();

// PersistentConnection needs to be created in its designated zone, as its
// internal message subscription starts inside the constructor.
final connectionZone = _ConnectionZone(_parentZone);
final connection = await connectionZone._zone.run(
() async => PersistentConnection(
await _getSmtpServer(sender),
timeout: Duration(seconds: 15),
),
);
return _GmailConnection(connectionZone, connection);
});
_connectionsBySender[sender] = newConnectionFuture;
return newConnectionFuture;
@override
void invalidateCredentials() {
_accessToken = null;
}

Future<SmtpServer> _getSmtpServer(String sender) async {
Expand Down Expand Up @@ -284,34 +315,54 @@ class _GmailSmtpRelay implements EmailSender {
}
}

class _ConnectionZone {
class _CatchAllZone {
final Zone _parentZone;
Object? _uncaughtError;

_ConnectionZone(this._parentZone);
_CatchAllZone(this._parentZone);

bool get hasUncaughtError => _uncaughtError != null;

late final _zone = _parentZone.fork(specification: ZoneSpecification(
handleUncaughtError: (self, parent, zone, error, stackTrace) {
_uncaughtError = error;
_logger.severe('Uncaught error while sending email', error, stackTrace);
},
));

Future<R> runAsync<R>(Future<R> Function() fn) async {
final completer = Completer<R>();
_zone.scheduleMicrotask(() async {
try {
final r = await fn();
completer.complete(r);
} catch (e, st) {
completer.completeError(e, st);
}
});
return await completer.future;
}
}

class _GmailConnection {
/// Wraps the physical connection within a [Zone], where the send operation should
/// be wraped, tracking otherwise uncaught exceptions.
///
/// It also tracks age, usage and expiration.
class _ZonedConnection {
final DateTime created;
final PersistentConnection _connection;
final _ConnectionZone _connectionZone;
final _CatchAllZone _zone;
final EmailSenderConnection _connection;

DateTime _lastUsed;
var _sentCount = 0;

_GmailConnection(this._connectionZone, this._connection)
_ZonedConnection(this._zone, this._connection)
: created = clock.now(),
_lastUsed = clock.now();

bool get isExpired {
// The connection is in an unknown state, better not use it.
if (_connectionZone._uncaughtError != null) {
if (_zone.hasUncaughtError) {
return true;
}
// There is a 100-recipient limit per SMTP transaction for smtp-relay.gmail.com.
Expand All @@ -331,27 +382,39 @@ class _GmailConnection {
return false;
}

Future<SendReport> send(Message message) async {
Future<void> send(EmailMessage message) async {
_sentCount += message.recipients.length + message.ccRecipients.length;
try {
final r = await _connectionZone._zone
.run(() async => await _connection.send(message));
if (_connectionZone._uncaughtError != null) {
if (_zone.hasUncaughtError) {
throw EmailSenderException.failed();
}
return r;
await _zone.runAsync(() async {
await _connection.send(message);
});
} finally {
_lastUsed = clock.now();
}
}

Future<void> close() async {
try {
await _connectionZone._zone.run(() async {
await _zone.runAsync(() async {
await _connection.close();
});
} catch (e, st) {
_logger.warning('Unable to close SMTP connection.', e, st);
}
}
}

class _GmailConnection extends EmailSenderConnection {
final PersistentConnection _connection;
_GmailConnection(this._connection);

@override
Future<void> send(EmailMessage message) =>
_connection.send(_toMessage(message));

@override
Future<void> close() => _connection.close();
}
8 changes: 8 additions & 0 deletions app/lib/shared/email.dart
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ class EmailMessage {
'bodyText': bodyText,
};
}

late final debugInfo = [
'Message-ID:$localMessageId@pub.dev',
'($subject)',
'from $from',
'to ${recipients.join(', ')}',
if (ccRecipients.isNotEmpty) 'cc ${ccRecipients.join(', ')}',
].join(' ');
}

/// Parses the body text and splits the [input] paragraphs to [lineLength]
Expand Down
Loading

0 comments on commit 227bbf1

Please sign in to comment.