From a9ed27ee029d69ca4877be0114645704f2e0997a Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Mon, 30 Sep 2024 22:51:53 -0400 Subject: [PATCH] PGP Encryption working --- apprise/plugins/email.py | 68 ++++++++++++++++++++++++++++++--------- test/test_plugin_email.py | 8 ++--- 2 files changed, 57 insertions(+), 19 deletions(-) diff --git a/apprise/plugins/email.py b/apprise/plugins/email.py index 7b8af50ca..b1fbbd5c0 100644 --- a/apprise/plugins/email.py +++ b/apprise/plugins/email.py @@ -34,6 +34,7 @@ from email.mime.text import MIMEText from email.mime.application import MIMEApplication from email.mime.multipart import MIMEMultipart +from email.mime.base import MIMEBase from email.utils import formataddr, make_msgid from email.header import Header from email import charset @@ -787,6 +788,10 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None, 'There are no Email recipients to notify') return False + elif self.use_pgp and not PGP_SUPPORT: + self.logger.warning('PGP Support unavailable') + return False + messages: t.List[EmailMessage] = [] # Create a copy of the targets list @@ -887,10 +892,33 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None, if self.use_pgp: self.logger.debug("Securing email with PGP Encryption") + # Set our header information to include in the encryption + base['From'] = formataddr( + (None, self.from_addr[1]), charset='utf-8') + base['To'] = formataddr((None, to_addr), charset='utf-8') + base['Subject'] = Header(title, self._get_charset(title)) + # Apply our encryption encrypted_content = self.pgp_encrypt_message(base.as_string()) if encrypted_content: - base = MIMEText(encrypted_content, "plain") + # prepare our messsage + base = MIMEMultipart( + "encrypted", protocol="application/pgp-encrypted") + + # Store Autocrypt header (DeltaChat Support) + base.add_header( + "Autocrypt", + "addr=%s; prefer-encrypt=mutual" % formataddr( + (False, to_addr), charset='utf-8')) + + # Set Encryption Info Part + enc_payload = MIMEText("Version: 1", "plain") + enc_payload.set_type("application/pgp-encrypted") + base.attach(enc_payload) + + enc_payload = MIMEBase("application", "octet-stream") + enc_payload.set_payload(encrypted_content) + base.attach(enc_payload) # Apply any provided custom headers for k, v in self.headers.items(): @@ -984,7 +1012,13 @@ def submit(self, messages: t.List[EmailMessage]): finally: # Gracefully terminate the connection with the server if socket is not None: # pragma: no branch - socket.quit() + try: + socket.quit() + + except (SocketError, smtplib.SMTPException): + # No need to make this a bigger issue as we were exiting + # anyway + pass # Reduce our dictionary (eliminate expired keys if any) self.pgp_public_keys = { @@ -1079,8 +1113,7 @@ def pgp_generate_keys(self, path=None): os.path.basename(pub_path)) return True - @property - def pgp_pubkey(self): + def pgp_pubkey(self, email=None): """ Returns a list of filenames worth scanning for """ @@ -1107,17 +1140,22 @@ def pgp_pubkey(self): 'pub.asc', ] + emails = [] + if email: + emails.append(email) + # Prepare our key files: - email = self.from_addr[1] + emails.append(self.from_addr[1]) - _entry = email.split('@')[0].lower() - if _entry not in fnames: - fnames.insert(0, f'{_entry}-pub.asc') + for email in emails: + _entry = email.split('@')[0].lower() + if _entry not in fnames: + fnames.insert(0, f'{_entry}-pub.asc') - # Lowercase email (Highest Priority) - _entry = email.lower() - if _entry not in fnames: - fnames.insert(0, f'{_entry}-pub.asc') + # Lowercase email (Highest Priority) + _entry = email.lower() + if _entry not in fnames: + fnames.insert(0, f'{_entry}-pub.asc') return next( (os.path.join(self.store.path, fname) @@ -1125,17 +1163,17 @@ def pgp_pubkey(self): if os.path.isfile(os.path.join(self.store.path, fname))), None) - def pgp_public_key(self, path=None): + def pgp_public_key(self, path=None, email=None): """ Opens a spcified pgp public file and returns the key from it which is used to encrypt the message """ if path is None: - path = self.pgp_pubkey + path = self.pgp_pubkey(email=email) if not path: if self.pgp_generate_keys(path=self.store.path): - path = self.pgp_pubkey + path = self.pgp_pubkey(email=email) if path: # We should get a hit now return self.pgp_public_key(path=path) diff --git a/test/test_plugin_email.py b/test/test_plugin_email.py index e520614b7..fce5eb985 100644 --- a/test/test_plugin_email.py +++ b/test/test_plugin_email.py @@ -2069,7 +2069,7 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): obj = Apprise.instantiate('mailto://user:pass@nuxref.com?pgp=yes') # Nothing to lookup - assert obj.pgp_pubkey is None + assert obj.pgp_pubkey() is None assert obj.pgp_public_key() is None assert obj.pgp_encrypt_message("message") is False # Keys can not be generated in memory mode @@ -2090,15 +2090,15 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): assert obj.store.mode == PersistentStoreMode.FLUSH # Still no public key - assert obj.pgp_pubkey is None + assert obj.pgp_pubkey() is None assert obj.pgp_generate_keys() is True # Now we'll have a public key - assert isinstance(obj.pgp_pubkey, str) + assert isinstance(obj.pgp_pubkey(), str) # Prepare PGP obj = Apprise.instantiate( - f'mailto://pgp:pass@nuxref.com?pgp=yes&pgpkey={obj.pgp_pubkey}', + 'mailto://pgp:pass@nuxref.com?pgp=yes&pgpkey=%s' % obj.pgp_pubkey(), asset=asset) # We will find our key