app-MAIL-temp/app/pgp_utils.py

114 lines
3.3 KiB
Python

import os
from io import BytesIO
from typing import Union
import gnupg
import pgpy
from memory_profiler import memory_usage
from pgpy import PGPMessage
from app.config import GNUPGHOME, PGP_SENDER_PRIVATE_KEY
from app.log import LOG
from app.models import Mailbox, Contact
gpg = gnupg.GPG(gnupghome=GNUPGHOME)
gpg.encoding = "utf-8"
class PGPException(Exception):
pass
def load_public_key(public_key: str) -> str:
"""Load a public key into keyring and return the fingerprint. If error, raise Exception"""
import_result = gpg.import_keys(public_key)
try:
return import_result.fingerprints[0]
except Exception as e:
raise PGPException("Cannot load key") from e
def load_public_key_and_check(public_key: str) -> str:
"""Same as load_public_key but will try an encryption using the new key.
If the encryption fails, remove the newly created fingerprint.
Return the fingerprint
"""
import_result = gpg.import_keys(public_key)
try:
fingerprint = import_result.fingerprints[0]
except Exception as e:
raise PGPException("Cannot load key") from e
else:
dummy_data = BytesIO(b"test")
r = gpg.encrypt_file(dummy_data, fingerprint)
if not r.ok:
# remove the fingerprint
gpg.delete_keys([fingerprint])
raise PGPException("Encryption fails with the key")
return fingerprint
def hard_exit():
pid = os.getpid()
LOG.warning("kill pid %s", pid)
os.kill(pid, 9)
def encrypt_file(data: BytesIO, fingerprint: str) -> str:
LOG.d("encrypt for %s", fingerprint)
mem_usage = memory_usage(-1, interval=1, timeout=1)[0]
LOG.d("mem_usage %s", mem_usage)
r = gpg.encrypt_file(data, fingerprint, always_trust=True)
if not r.ok:
# maybe the fingerprint is not loaded on this host, try to load it
found = False
# searching for the key in mailbox
mailbox = Mailbox.get_by(pgp_finger_print=fingerprint, disable_pgp=False)
if mailbox:
LOG.d("(re-)load public key for %s", mailbox)
load_public_key(mailbox.pgp_public_key)
found = True
# searching for the key in contact
contact = Contact.get_by(pgp_finger_print=fingerprint)
if contact:
LOG.d("(re-)load public key for %s", contact)
load_public_key(contact.pgp_public_key)
found = True
if found:
LOG.d("retry to encrypt")
data.seek(0)
r = gpg.encrypt_file(data, fingerprint, always_trust=True)
if not r.ok:
raise PGPException(f"Cannot encrypt, status: {r.status}")
return str(r)
def encrypt_file_with_pgpy(data: bytes, public_key: str) -> PGPMessage:
key = pgpy.PGPKey()
key.parse(public_key)
msg = pgpy.PGPMessage.new(data, encoding="utf-8")
r = key.encrypt(msg)
return r
if PGP_SENDER_PRIVATE_KEY:
_SIGN_KEY_ID = gpg.import_keys(PGP_SENDER_PRIVATE_KEY).fingerprints[0]
def sign_data(data: Union[str, bytes]) -> str:
signature = str(gpg.sign(data, keyid=_SIGN_KEY_ID, detach=True))
return signature
def sign_data_with_pgpy(data: Union[str, bytes]) -> str:
key = pgpy.PGPKey()
key.parse(PGP_SENDER_PRIVATE_KEY)
signature = str(key.sign(data))
return signature