2020-06-08 13:53:27 +02:00
|
|
|
import os
|
2020-04-02 21:29:16 +02:00
|
|
|
from io import BytesIO
|
2020-11-03 13:30:13 +01:00
|
|
|
from typing import Union
|
2020-04-02 21:29:16 +02:00
|
|
|
|
2020-03-08 12:51:33 +01:00
|
|
|
import gnupg
|
2020-10-28 12:21:42 +01:00
|
|
|
import pgpy
|
2020-06-07 09:47:35 +02:00
|
|
|
from memory_profiler import memory_usage
|
2020-10-28 11:50:14 +01:00
|
|
|
from pgpy import PGPMessage
|
2020-03-08 12:51:33 +01:00
|
|
|
|
2020-11-02 19:09:57 +01:00
|
|
|
from app.config import GNUPGHOME, PGP_SENDER_PRIVATE_KEY
|
2020-06-05 10:10:21 +02:00
|
|
|
from app.log import LOG
|
2020-08-08 10:26:24 +02:00
|
|
|
from app.models import Mailbox, Contact
|
2020-03-08 12:51:33 +01:00
|
|
|
|
|
|
|
gpg = gnupg.GPG(gnupghome=GNUPGHOME)
|
2020-03-13 12:54:52 +01:00
|
|
|
gpg.encoding = "utf-8"
|
2020-03-08 12:51:33 +01:00
|
|
|
|
2020-03-08 23:07:23 +01:00
|
|
|
|
2020-03-08 12:51:33 +01:00
|
|
|
class PGPException(Exception):
|
|
|
|
pass
|
|
|
|
|
2020-03-08 23:07:23 +01:00
|
|
|
|
2020-03-08 12:51:33 +01:00
|
|
|
def load_public_key(public_key: str) -> str:
|
|
|
|
"""Load a public key into keyring and return the fingerprint. If error, raise Exception"""
|
|
|
|
try:
|
2020-11-25 14:31:14 +01:00
|
|
|
import_result = gpg.import_keys(public_key)
|
2020-03-08 12:51:33 +01:00
|
|
|
return import_result.fingerprints[0]
|
|
|
|
except Exception as e:
|
|
|
|
raise PGPException("Cannot load key") from e
|
|
|
|
|
|
|
|
|
2020-10-27 20:27:34 +01:00
|
|
|
def load_public_key_and_check(public_key: str) -> str:
|
|
|
|
"""Same as load_public_key but will try an encryption using the new key.
|
|
|
|
If the encryption fails, remove the newly created fingerprint.
|
|
|
|
Return the fingerprint
|
|
|
|
"""
|
|
|
|
try:
|
2020-11-26 10:27:23 +01:00
|
|
|
import_result = gpg.import_keys(public_key)
|
2020-10-27 20:27:34 +01:00
|
|
|
fingerprint = import_result.fingerprints[0]
|
|
|
|
except Exception as e:
|
|
|
|
raise PGPException("Cannot load key") from e
|
|
|
|
else:
|
|
|
|
dummy_data = BytesIO(b"test")
|
2020-11-26 10:27:23 +01:00
|
|
|
try:
|
2020-12-06 18:02:23 +01:00
|
|
|
encrypt_file(dummy_data, fingerprint)
|
2020-11-26 10:27:23 +01:00
|
|
|
except Exception as e:
|
2021-09-08 11:29:55 +02:00
|
|
|
LOG.w(
|
2020-12-31 11:05:11 +01:00
|
|
|
"Cannot encrypt using the imported key %s %s", fingerprint, public_key
|
|
|
|
)
|
2020-10-27 20:27:34 +01:00
|
|
|
# remove the fingerprint
|
|
|
|
gpg.delete_keys([fingerprint])
|
2020-11-26 10:27:23 +01:00
|
|
|
raise PGPException("Encryption fails with the key") from e
|
2020-10-27 20:27:34 +01:00
|
|
|
|
|
|
|
return fingerprint
|
|
|
|
|
|
|
|
|
2020-06-08 13:53:27 +02:00
|
|
|
def hard_exit():
|
|
|
|
pid = os.getpid()
|
2021-09-08 11:29:55 +02:00
|
|
|
LOG.w("kill pid %s", pid)
|
2020-06-08 13:53:27 +02:00
|
|
|
os.kill(pid, 9)
|
|
|
|
|
|
|
|
|
2020-04-02 21:29:16 +02:00
|
|
|
def encrypt_file(data: BytesIO, fingerprint: str) -> str:
|
2020-06-07 09:47:35 +02:00
|
|
|
LOG.d("encrypt for %s", fingerprint)
|
2020-06-08 13:53:27 +02:00
|
|
|
mem_usage = memory_usage(-1, interval=1, timeout=1)[0]
|
2020-06-07 09:47:35 +02:00
|
|
|
LOG.d("mem_usage %s", mem_usage)
|
|
|
|
|
2020-04-02 21:29:16 +02:00
|
|
|
r = gpg.encrypt_file(data, fingerprint, always_trust=True)
|
2020-03-08 12:51:33 +01:00
|
|
|
if not r.ok:
|
2020-06-10 22:28:15 +02:00
|
|
|
# maybe the fingerprint is not loaded on this host, try to load it
|
2020-08-08 10:26:24 +02:00
|
|
|
found = False
|
|
|
|
# searching for the key in mailbox
|
2020-11-24 11:18:16 +01:00
|
|
|
mailbox = Mailbox.get_by(pgp_finger_print=fingerprint, disable_pgp=False)
|
2020-06-10 22:28:15 +02:00
|
|
|
if mailbox:
|
|
|
|
LOG.d("(re-)load public key for %s", mailbox)
|
|
|
|
load_public_key(mailbox.pgp_public_key)
|
2020-08-08 10:26:24 +02:00
|
|
|
found = True
|
2020-06-10 22:28:15 +02:00
|
|
|
|
2020-08-08 10:26:24 +02:00
|
|
|
# searching for the key in contact
|
|
|
|
contact = Contact.get_by(pgp_finger_print=fingerprint)
|
|
|
|
if contact:
|
|
|
|
LOG.d("(re-)load public key for %s", contact)
|
|
|
|
load_public_key(contact.pgp_public_key)
|
|
|
|
found = True
|
|
|
|
|
|
|
|
if found:
|
2020-06-10 22:28:15 +02:00
|
|
|
LOG.d("retry to encrypt")
|
2020-06-12 00:01:21 +02:00
|
|
|
data.seek(0)
|
2020-06-10 22:28:15 +02:00
|
|
|
r = gpg.encrypt_file(data, fingerprint, always_trust=True)
|
|
|
|
|
|
|
|
if not r.ok:
|
2020-10-27 20:27:34 +01:00
|
|
|
raise PGPException(f"Cannot encrypt, status: {r.status}")
|
2020-03-08 12:51:33 +01:00
|
|
|
|
|
|
|
return str(r)
|
2020-10-28 11:50:14 +01:00
|
|
|
|
|
|
|
|
|
|
|
def encrypt_file_with_pgpy(data: bytes, public_key: str) -> PGPMessage:
|
|
|
|
key = pgpy.PGPKey()
|
|
|
|
key.parse(public_key)
|
|
|
|
msg = pgpy.PGPMessage.new(data, encoding="utf-8")
|
|
|
|
r = key.encrypt(msg)
|
|
|
|
|
|
|
|
return r
|
2020-11-02 19:09:57 +01:00
|
|
|
|
|
|
|
|
|
|
|
if PGP_SENDER_PRIVATE_KEY:
|
|
|
|
_SIGN_KEY_ID = gpg.import_keys(PGP_SENDER_PRIVATE_KEY).fingerprints[0]
|
|
|
|
|
|
|
|
|
2020-11-03 13:30:13 +01:00
|
|
|
def sign_data(data: Union[str, bytes]) -> str:
|
2020-11-02 19:09:57 +01:00
|
|
|
signature = str(gpg.sign(data, keyid=_SIGN_KEY_ID, detach=True))
|
|
|
|
return signature
|
|
|
|
|
|
|
|
|
2020-11-03 13:30:13 +01:00
|
|
|
def sign_data_with_pgpy(data: Union[str, bytes]) -> str:
|
2020-11-02 19:09:57 +01:00
|
|
|
key = pgpy.PGPKey()
|
|
|
|
key.parse(PGP_SENDER_PRIVATE_KEY)
|
|
|
|
signature = str(key.sign(data))
|
|
|
|
return signature
|