Feat: Re-deliver mails (#1394)
* Feat: Send undelivered emails * Add cron job * Added to the crontab Co-authored-by: Adrià Casajús <adria.casajus@proton.ch>
This commit is contained in:
parent
3bc976c322
commit
90d60217a4
|
@ -22,6 +22,9 @@ from app.message_utils import message_to_bytes
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class SendRequest:
|
class SendRequest:
|
||||||
|
|
||||||
|
SAVE_EXTENSION = "sendrequest"
|
||||||
|
|
||||||
envelope_from: str
|
envelope_from: str
|
||||||
envelope_to: str
|
envelope_to: str
|
||||||
msg: Message
|
msg: Message
|
||||||
|
@ -169,7 +172,7 @@ class MailSender:
|
||||||
self._save_request_to_unsent_dir(send_request)
|
self._save_request_to_unsent_dir(send_request)
|
||||||
|
|
||||||
def _save_request_to_unsent_dir(self, send_request: SendRequest):
|
def _save_request_to_unsent_dir(self, send_request: SendRequest):
|
||||||
file_name = f"DeliveryFail-{int(time.time())}-{uuid.uuid4()}.eml"
|
file_name = f"DeliveryFail-{int(time.time())}-{uuid.uuid4()}.{SendRequest.SAVE_EXTENSION}"
|
||||||
file_path = os.path.join(config.SAVE_UNSENT_DIR, file_name)
|
file_path = os.path.join(config.SAVE_UNSENT_DIR, file_name)
|
||||||
file_contents = send_request.to_bytes()
|
file_contents = send_request.to_bytes()
|
||||||
with open(file_path, "wb") as fd:
|
with open(file_path, "wb") as fd:
|
||||||
|
@ -180,6 +183,27 @@ class MailSender:
|
||||||
mail_sender = MailSender()
|
mail_sender = MailSender()
|
||||||
|
|
||||||
|
|
||||||
|
def load_unsent_mails_from_fs_and_resend():
|
||||||
|
if not config.SAVE_UNSENT_DIR:
|
||||||
|
return
|
||||||
|
for filename in os.listdir(config.SAVE_UNSENT_DIR):
|
||||||
|
(_, extension) = os.path.splitext(filename)
|
||||||
|
if extension[1:] != SendRequest.SAVE_EXTENSION:
|
||||||
|
LOG.i(f"Skipping {filename} does not have the proper extension")
|
||||||
|
continue
|
||||||
|
full_file_path = os.path.join(config.SAVE_UNSENT_DIR, filename)
|
||||||
|
if not os.path.isfile(full_file_path):
|
||||||
|
LOG.i(f"Skipping {filename} as it's not a file")
|
||||||
|
continue
|
||||||
|
LOG.i(f"Trying to re-deliver email {filename}")
|
||||||
|
try:
|
||||||
|
send_request = SendRequest.load_from_file(full_file_path)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.error(f"Could not load file {filename}: {e}")
|
||||||
|
continue
|
||||||
|
mail_sender.send(send_request, 2)
|
||||||
|
|
||||||
|
|
||||||
def sl_sendmail(
|
def sl_sendmail(
|
||||||
envelope_from: str,
|
envelope_from: str,
|
||||||
envelope_to: str,
|
envelope_to: str,
|
||||||
|
|
44
cron.py
44
cron.py
|
@ -11,20 +11,9 @@ from sqlalchemy.orm import joinedload
|
||||||
from sqlalchemy.orm.exc import ObjectDeletedError
|
from sqlalchemy.orm.exc import ObjectDeletedError
|
||||||
from sqlalchemy.sql import Insert
|
from sqlalchemy.sql import Insert
|
||||||
|
|
||||||
from app import s3
|
from app import s3, config
|
||||||
from app.alias_utils import nb_email_log_for_mailbox
|
from app.alias_utils import nb_email_log_for_mailbox
|
||||||
from app.api.views.apple import verify_receipt
|
from app.api.views.apple import verify_receipt
|
||||||
from app.config import (
|
|
||||||
ADMIN_EMAIL,
|
|
||||||
MACAPP_APPLE_API_SECRET,
|
|
||||||
APPLE_API_SECRET,
|
|
||||||
EMAIL_SERVERS_WITH_PRIORITY,
|
|
||||||
URL,
|
|
||||||
AlERT_WRONG_MX_RECORD_CUSTOM_DOMAIN,
|
|
||||||
HIBP_API_KEYS,
|
|
||||||
HIBP_SCAN_INTERVAL_DAYS,
|
|
||||||
MONITORING_EMAIL,
|
|
||||||
)
|
|
||||||
from app.db import Session
|
from app.db import Session
|
||||||
from app.dns_utils import get_mx_domains, is_mx_equivalent
|
from app.dns_utils import get_mx_domains, is_mx_equivalent
|
||||||
from app.email_utils import (
|
from app.email_utils import (
|
||||||
|
@ -39,6 +28,7 @@ from app.email_utils import (
|
||||||
)
|
)
|
||||||
from app.errors import ProtonPartnerNotSetUp
|
from app.errors import ProtonPartnerNotSetUp
|
||||||
from app.log import LOG
|
from app.log import LOG
|
||||||
|
from app.mail_sender import load_unsent_mails_from_fs_and_resend
|
||||||
from app.models import (
|
from app.models import (
|
||||||
Subscription,
|
Subscription,
|
||||||
User,
|
User,
|
||||||
|
@ -219,7 +209,7 @@ def notify_manual_sub_end():
|
||||||
retries=3,
|
retries=3,
|
||||||
)
|
)
|
||||||
|
|
||||||
extend_subscription_url = URL + "/dashboard/coinbase_checkout"
|
extend_subscription_url = config.URL + "/dashboard/coinbase_checkout"
|
||||||
for coinbase_subscription in CoinbaseSubscription.all():
|
for coinbase_subscription in CoinbaseSubscription.all():
|
||||||
need_reminder = False
|
need_reminder = False
|
||||||
if (
|
if (
|
||||||
|
@ -270,9 +260,9 @@ def poll_apple_subscription():
|
||||||
|
|
||||||
user = apple_sub.user
|
user = apple_sub.user
|
||||||
if "io.simplelogin.macapp.subscription" in apple_sub.product_id:
|
if "io.simplelogin.macapp.subscription" in apple_sub.product_id:
|
||||||
verify_receipt(apple_sub.receipt_data, user, MACAPP_APPLE_API_SECRET)
|
verify_receipt(apple_sub.receipt_data, user, config.MACAPP_APPLE_API_SECRET)
|
||||||
else:
|
else:
|
||||||
verify_receipt(apple_sub.receipt_data, user, APPLE_API_SECRET)
|
verify_receipt(apple_sub.receipt_data, user, config.APPLE_API_SECRET)
|
||||||
|
|
||||||
LOG.d("Finish poll_apple_subscription")
|
LOG.d("Finish poll_apple_subscription")
|
||||||
|
|
||||||
|
@ -508,7 +498,7 @@ def alias_creation_report() -> List[Tuple[str, int]]:
|
||||||
|
|
||||||
def stats():
|
def stats():
|
||||||
"""send admin stats everyday"""
|
"""send admin stats everyday"""
|
||||||
if not ADMIN_EMAIL:
|
if not config.ADMIN_EMAIL:
|
||||||
LOG.w("ADMIN_EMAIL not set, nothing to do")
|
LOG.w("ADMIN_EMAIL not set, nothing to do")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -553,7 +543,7 @@ nb_referred_user_upgrade: {stats_today.nb_referred_user_paid} - {increase_percen
|
||||||
LOG.d("growth_stats email: %s", growth_stats)
|
LOG.d("growth_stats email: %s", growth_stats)
|
||||||
|
|
||||||
send_email(
|
send_email(
|
||||||
ADMIN_EMAIL,
|
config.ADMIN_EMAIL,
|
||||||
subject=f"SimpleLogin Growth Stats for {today}",
|
subject=f"SimpleLogin Growth Stats for {today}",
|
||||||
plaintext=growth_stats,
|
plaintext=growth_stats,
|
||||||
retries=3,
|
retries=3,
|
||||||
|
@ -595,7 +585,7 @@ nb_total_bounced_last_24h: {stats_today.nb_total_bounced_last_24h} - {increase_p
|
||||||
LOG.d("monitoring_report email: %s", monitoring_report)
|
LOG.d("monitoring_report email: %s", monitoring_report)
|
||||||
|
|
||||||
send_email(
|
send_email(
|
||||||
MONITORING_EMAIL,
|
config.MONITORING_EMAIL,
|
||||||
subject=f"SimpleLogin Monitoring Report for {today}",
|
subject=f"SimpleLogin Monitoring Report for {today}",
|
||||||
plaintext=monitoring_report,
|
plaintext=monitoring_report,
|
||||||
retries=3,
|
retries=3,
|
||||||
|
@ -880,7 +870,7 @@ def check_custom_domain():
|
||||||
|
|
||||||
def check_single_custom_domain(custom_domain):
|
def check_single_custom_domain(custom_domain):
|
||||||
mx_domains = get_mx_domains(custom_domain.domain)
|
mx_domains = get_mx_domains(custom_domain.domain)
|
||||||
if not is_mx_equivalent(mx_domains, EMAIL_SERVERS_WITH_PRIORITY):
|
if not is_mx_equivalent(mx_domains, config.EMAIL_SERVERS_WITH_PRIORITY):
|
||||||
user = custom_domain.user
|
user = custom_domain.user
|
||||||
LOG.w(
|
LOG.w(
|
||||||
"The MX record is not correctly set for %s %s %s",
|
"The MX record is not correctly set for %s %s %s",
|
||||||
|
@ -893,11 +883,11 @@ def check_single_custom_domain(custom_domain):
|
||||||
|
|
||||||
# send alert if fail for 5 consecutive days
|
# send alert if fail for 5 consecutive days
|
||||||
if custom_domain.nb_failed_checks > 5:
|
if custom_domain.nb_failed_checks > 5:
|
||||||
domain_dns_url = f"{URL}/dashboard/domains/{custom_domain.id}/dns"
|
domain_dns_url = f"{config.URL}/dashboard/domains/{custom_domain.id}/dns"
|
||||||
LOG.w("Alert domain MX check fails %s about %s", user, custom_domain)
|
LOG.w("Alert domain MX check fails %s about %s", user, custom_domain)
|
||||||
send_email_with_rate_control(
|
send_email_with_rate_control(
|
||||||
user,
|
user,
|
||||||
AlERT_WRONG_MX_RECORD_CUSTOM_DOMAIN,
|
config.AlERT_WRONG_MX_RECORD_CUSTOM_DOMAIN,
|
||||||
user.email,
|
user.email,
|
||||||
f"Please update {custom_domain.domain} DNS on SimpleLogin",
|
f"Please update {custom_domain.domain} DNS on SimpleLogin",
|
||||||
render(
|
render(
|
||||||
|
@ -1007,7 +997,7 @@ async def check_hibp():
|
||||||
"""
|
"""
|
||||||
LOG.d("Checking HIBP API for aliases in breaches")
|
LOG.d("Checking HIBP API for aliases in breaches")
|
||||||
|
|
||||||
if len(HIBP_API_KEYS) == 0:
|
if len(config.HIBP_API_KEYS) == 0:
|
||||||
LOG.e("No HIBP API keys")
|
LOG.e("No HIBP API keys")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -1023,7 +1013,7 @@ async def check_hibp():
|
||||||
|
|
||||||
LOG.d("Preparing list of aliases to check")
|
LOG.d("Preparing list of aliases to check")
|
||||||
queue = asyncio.Queue()
|
queue = asyncio.Queue()
|
||||||
max_date = arrow.now().shift(days=-HIBP_SCAN_INTERVAL_DAYS)
|
max_date = arrow.now().shift(days=-config.HIBP_SCAN_INTERVAL_DAYS)
|
||||||
for alias in (
|
for alias in (
|
||||||
Alias.filter(
|
Alias.filter(
|
||||||
or_(Alias.hibp_last_check.is_(None), Alias.hibp_last_check < max_date)
|
or_(Alias.hibp_last_check.is_(None), Alias.hibp_last_check < max_date)
|
||||||
|
@ -1040,10 +1030,10 @@ async def check_hibp():
|
||||||
# Each checking process will take one alias from the queue, get the info
|
# Each checking process will take one alias from the queue, get the info
|
||||||
# and then sleep for 1.5 seconds (due to HIBP API request limits)
|
# and then sleep for 1.5 seconds (due to HIBP API request limits)
|
||||||
checkers = []
|
checkers = []
|
||||||
for i in range(len(HIBP_API_KEYS)):
|
for i in range(len(config.HIBP_API_KEYS)):
|
||||||
checker = asyncio.create_task(
|
checker = asyncio.create_task(
|
||||||
_hibp_check(
|
_hibp_check(
|
||||||
HIBP_API_KEYS[i],
|
config.HIBP_API_KEYS[i],
|
||||||
queue,
|
queue,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -1129,6 +1119,7 @@ if __name__ == "__main__":
|
||||||
"check_hibp",
|
"check_hibp",
|
||||||
"notify_hibp",
|
"notify_hibp",
|
||||||
"cleanup_tokens",
|
"cleanup_tokens",
|
||||||
|
"send_undelivered_mails",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
@ -1170,3 +1161,6 @@ if __name__ == "__main__":
|
||||||
elif args.job == "cleanup_tokens":
|
elif args.job == "cleanup_tokens":
|
||||||
LOG.d("Cleanup expired tokens")
|
LOG.d("Cleanup expired tokens")
|
||||||
delete_expired_tokens()
|
delete_expired_tokens()
|
||||||
|
elif args.job == "send_undelivered_mails":
|
||||||
|
LOG.d("Sending undelivered emails")
|
||||||
|
load_unsent_mails_from_fs_and_resend()
|
||||||
|
|
|
@ -65,4 +65,11 @@ jobs:
|
||||||
shell: /bin/bash
|
shell: /bin/bash
|
||||||
schedule: "0 19 * * *"
|
schedule: "0 19 * * *"
|
||||||
captureStderr: true
|
captureStderr: true
|
||||||
concurrencyPolicy: Forbid
|
concurrencyPolicy: Forbid
|
||||||
|
|
||||||
|
- name: SimpleLogin send unsent emails
|
||||||
|
command: python /code/cron.py -j send_undelivered_emails
|
||||||
|
shell: /bin/bash
|
||||||
|
schedule: "*/5 * * * *"
|
||||||
|
captureStderr: true
|
||||||
|
concurrencyPolicy: Forbid
|
||||||
|
|
|
@ -10,7 +10,11 @@ import pytest
|
||||||
from aiosmtpd.controller import Controller
|
from aiosmtpd.controller import Controller
|
||||||
|
|
||||||
from app.email import headers
|
from app.email import headers
|
||||||
from app.mail_sender import mail_sender, SendRequest
|
from app.mail_sender import (
|
||||||
|
mail_sender,
|
||||||
|
SendRequest,
|
||||||
|
load_unsent_mails_from_fs_and_resend,
|
||||||
|
)
|
||||||
from app import config
|
from app import config
|
||||||
|
|
||||||
|
|
||||||
|
@ -82,6 +86,15 @@ def smtp_response_server(smtp_response: str) -> Callable[[], int]:
|
||||||
return inner
|
return inner
|
||||||
|
|
||||||
|
|
||||||
|
def compare_send_requests(expected: SendRequest, request: SendRequest):
|
||||||
|
assert request.mail_options == expected.mail_options
|
||||||
|
assert request.rcpt_options == expected.rcpt_options
|
||||||
|
assert request.envelope_to == expected.envelope_to
|
||||||
|
assert request.envelope_from == expected.envelope_from
|
||||||
|
assert request.msg[headers.TO] == expected.msg[headers.TO]
|
||||||
|
assert request.msg[headers.FROM] == expected.msg[headers.FROM]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"server_fn",
|
"server_fn",
|
||||||
[
|
[
|
||||||
|
@ -97,20 +110,37 @@ def test_mail_sender_save_unsent_to_disk(server_fn):
|
||||||
config.NOT_SEND_EMAIL = False
|
config.NOT_SEND_EMAIL = False
|
||||||
config.POSTFIX_SUBMISSION_TLS = False
|
config.POSTFIX_SUBMISSION_TLS = False
|
||||||
config.POSTFIX_PORT = server_fn()
|
config.POSTFIX_PORT = server_fn()
|
||||||
|
try:
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
config.SAVE_UNSENT_DIR = temp_dir
|
||||||
|
send_request = create_dummy_send_request()
|
||||||
|
mail_sender.send(send_request, 0)
|
||||||
|
found_files = os.listdir(temp_dir)
|
||||||
|
assert len(found_files) == 1
|
||||||
|
loaded_send_request = SendRequest.load_from_file(
|
||||||
|
os.path.join(temp_dir, found_files[0])
|
||||||
|
)
|
||||||
|
compare_send_requests(loaded_send_request, send_request)
|
||||||
|
finally:
|
||||||
|
config.POSTFIX_SERVER = original_postfix_server
|
||||||
|
config.NOT_SEND_EMAIL = True
|
||||||
|
|
||||||
|
|
||||||
|
@mail_sender.store_emails_test_decorator
|
||||||
|
def test_send_unsent_email_from_fs():
|
||||||
|
original_postfix_server = config.POSTFIX_SERVER
|
||||||
|
config.POSTFIX_SERVER = "localhost"
|
||||||
|
config.NOT_SEND_EMAIL = False
|
||||||
with tempfile.TemporaryDirectory() as temp_dir:
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
config.SAVE_UNSENT_DIR = temp_dir
|
try:
|
||||||
send_request = create_dummy_send_request()
|
config.SAVE_UNSENT_DIR = temp_dir
|
||||||
mail_sender.send(send_request, 0)
|
send_request = create_dummy_send_request()
|
||||||
found_files = os.listdir(temp_dir)
|
mail_sender.send(send_request, 0)
|
||||||
assert len(found_files) == 1
|
finally:
|
||||||
loaded_send_request = SendRequest.load_from_file(
|
config.POSTFIX_SERVER = original_postfix_server
|
||||||
os.path.join(temp_dir, found_files[0])
|
config.NOT_SEND_EMAIL = True
|
||||||
)
|
mail_sender.purge_stored_emails()
|
||||||
assert send_request.mail_options == loaded_send_request.mail_options
|
load_unsent_mails_from_fs_and_resend()
|
||||||
assert send_request.rcpt_options == loaded_send_request.rcpt_options
|
sent_emails = mail_sender.get_stored_emails()
|
||||||
assert send_request.envelope_to == loaded_send_request.envelope_to
|
assert len(sent_emails) == 1
|
||||||
assert send_request.envelope_from == loaded_send_request.envelope_from
|
compare_send_requests(send_request, sent_emails[0])
|
||||||
assert send_request.msg[headers.TO] == loaded_send_request.msg[headers.TO]
|
|
||||||
assert send_request.msg[headers.FROM] == loaded_send_request.msg[headers.FROM]
|
|
||||||
config.POSTFIX_SERVER = original_postfix_server
|
|
||||||
config.NOT_SEND_EMAIL = True
|
|
||||||
|
|
Loading…
Reference in New Issue