import base64
import enum
import os
import quopri
import random
import time
import uuid
from copy import deepcopy
from email import policy, message_from_bytes, message_from_string
from email.header import decode_header, Header
from email.message import Message
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import make_msgid, formatdate
from smtplib import SMTP, SMTPServerDisconnected, SMTPException
from typing import Tuple, List, Optional, Union
import arrow
import dkim
import re2 as re
import spf
from email_validator import (
validate_email,
EmailNotValidError,
ValidatedEmail,
)
from flanker.addresslib import address
from flanker.addresslib.address import EmailAddress
from jinja2 import Environment, FileSystemLoader
from sqlalchemy import func
from app.config import (
SUPPORT_EMAIL,
ROOT_DIR,
POSTFIX_SERVER,
NOT_SEND_EMAIL,
DKIM_SELECTOR,
DKIM_PRIVATE_KEY,
ALIAS_DOMAINS,
SUPPORT_NAME,
POSTFIX_SUBMISSION_TLS,
MAX_NB_EMAIL_FREE_PLAN,
DISPOSABLE_EMAIL_DOMAINS,
MAX_ALERT_24H,
POSTFIX_PORT,
URL,
LANDING_PAGE_URL,
EMAIL_DOMAIN,
ALERT_DIRECTORY_DISABLED_ALIAS_CREATION,
TRANSACTIONAL_BOUNCE_EMAIL,
ALERT_SPF,
POSTFIX_PORT_FORWARD,
TEMP_DIR,
ALIAS_AUTOMATIC_DISABLE,
)
from app.db import Session
from app.dns_utils import get_mx_domains
from app.email import headers
from app.log import LOG
from app.models import (
Mailbox,
User,
SentAlert,
CustomDomain,
SLDomain,
Contact,
Alias,
EmailLog,
TransactionalEmail,
IgnoreBounceSender,
)
from app.utils import (
random_string,
convert_to_id,
convert_to_alphanumeric,
sanitize_email,
)
def render(template_name, **kwargs) -> str:
templates_dir = os.path.join(ROOT_DIR, "templates", "emails")
env = Environment(loader=FileSystemLoader(templates_dir))
template = env.get_template(template_name)
return template.render(
MAX_NB_EMAIL_FREE_PLAN=MAX_NB_EMAIL_FREE_PLAN,
URL=URL,
LANDING_PAGE_URL=LANDING_PAGE_URL,
YEAR=arrow.now().year,
**kwargs,
)
def send_welcome_email(user):
to_email, unsubscribe_link, via_email = user.get_communication_email()
if not to_email:
return
# whether this email is sent to an alias
alias = to_email if to_email != user.email else None
send_email(
to_email,
f"Welcome to SimpleLogin",
render("com/welcome.txt", user=user, alias=alias),
render("com/welcome.html", user=user, alias=alias),
unsubscribe_link,
via_email,
)
def send_trial_end_soon_email(user):
send_email(
user.email,
f"Your trial will end soon",
render("transactional/trial-end.txt", user=user),
render("transactional/trial-end.html", user=user),
)
def send_activation_email(email, activation_link):
send_email(
email,
f"Just one more step to join SimpleLogin",
render(
"transactional/activation.txt",
activation_link=activation_link,
email=email,
),
render(
"transactional/activation.html",
activation_link=activation_link,
email=email,
),
)
def send_reset_password_email(email, reset_password_link):
send_email(
email,
"Reset your password on SimpleLogin",
render(
"transactional/reset-password.txt",
reset_password_link=reset_password_link,
),
render(
"transactional/reset-password.html",
reset_password_link=reset_password_link,
),
)
def send_change_email(new_email, current_email, link):
send_email(
new_email,
"Confirm email update on SimpleLogin",
render(
"transactional/change-email.txt",
link=link,
new_email=new_email,
current_email=current_email,
),
render(
"transactional/change-email.html",
link=link,
new_email=new_email,
current_email=current_email,
),
)
def send_test_email_alias(email, name):
send_email(
email,
f"This email is sent to {email}",
render("transactional/test-email.txt", name=name, alias=email),
render("transactional/test-email.html", name=name, alias=email),
)
def send_cannot_create_directory_alias(user, alias_address, directory_name):
"""when user cancels their subscription, they cannot create alias on the fly.
If this happens, send them an email to notify
"""
send_email(
user.email,
f"Alias {alias_address} cannot be created",
render(
"transactional/cannot-create-alias-directory.txt",
alias=alias_address,
directory=directory_name,
),
render(
"transactional/cannot-create-alias-directory.html",
alias=alias_address,
directory=directory_name,
),
)
def send_cannot_create_directory_alias_disabled(user, alias_address, directory_name):
"""when the directory is disabled, new alias can't be created on-the-fly.
Send user an email to notify of an attempt
"""
send_email_with_rate_control(
user,
ALERT_DIRECTORY_DISABLED_ALIAS_CREATION,
user.email,
f"Alias {alias_address} cannot be created",
render(
"transactional/cannot-create-alias-directory-disabled.txt",
alias=alias_address,
directory=directory_name,
),
render(
"transactional/cannot-create-alias-directory-disabled.html",
alias=alias_address,
directory=directory_name,
),
)
def send_cannot_create_domain_alias(user, alias, domain):
"""when user cancels their subscription, they cannot create alias on the fly with custom domain.
If this happens, send them an email to notify
"""
send_email(
user.email,
f"Alias {alias} cannot be created",
render(
"transactional/cannot-create-alias-domain.txt",
alias=alias,
domain=domain,
),
render(
"transactional/cannot-create-alias-domain.html",
alias=alias,
domain=domain,
),
)
def send_email(
to_email,
subject,
plaintext,
html=None,
unsubscribe_link=None,
unsubscribe_via_email=False,
):
to_email = sanitize_email(to_email)
if NOT_SEND_EMAIL:
LOG.d(
"send email with subject '%s' to '%s', plaintext: %s, html: %s",
subject,
to_email,
plaintext,
html,
)
return
LOG.d("send email to %s, subject %s", to_email, subject)
if POSTFIX_SUBMISSION_TLS:
smtp = SMTP(POSTFIX_SERVER, 587)
smtp.starttls()
else:
smtp = SMTP(POSTFIX_SERVER, POSTFIX_PORT or 25)
msg = MIMEMultipart("alternative")
msg.attach(MIMEText(plaintext))
if not html:
LOG.d("Use plaintext as html")
html = plaintext.replace("\n", "
")
msg.attach(MIMEText(html, "html"))
msg[headers.SUBJECT] = subject
msg[headers.FROM] = f"{SUPPORT_NAME} <{SUPPORT_EMAIL}>"
msg[headers.TO] = to_email
msg_id_header = make_msgid()
msg[headers.MESSAGE_ID] = msg_id_header
date_header = formatdate()
msg[headers.DATE] = date_header
if unsubscribe_link:
add_or_replace_header(msg, headers.LIST_UNSUBSCRIBE, f"<{unsubscribe_link}>")
if not unsubscribe_via_email:
add_or_replace_header(
msg, headers.LIST_UNSUBSCRIBE_POST, "List-Unsubscribe=One-Click"
)
# add DKIM
email_domain = SUPPORT_EMAIL[SUPPORT_EMAIL.find("@") + 1 :]
add_dkim_signature(msg, email_domain)
msg_raw = to_bytes(msg)
transaction = TransactionalEmail.create(email=to_email, commit=True)
# use a different envelope sender for each transactional email (aka VERP)
smtp.sendmail(TRANSACTIONAL_BOUNCE_EMAIL.format(transaction.id), to_email, msg_raw)
def send_email_with_rate_control(
user: User,
alert_type: str,
to_email: str,
subject,
plaintext,
html=None,
max_nb_alert=MAX_ALERT_24H,
nb_day=1,
ignore_smtp_error=False,
) -> bool:
"""Same as send_email with rate control over alert_type.
Make sure no more than `max_nb_alert` emails are sent over the period of `nb_day` days
Return true if the email is sent, otherwise False
"""
to_email = sanitize_email(to_email)
min_dt = arrow.now().shift(days=-1 * nb_day)
nb_alert = (
SentAlert.filter_by(alert_type=alert_type, to_email=to_email)
.filter(SentAlert.created_at > min_dt)
.count()
)
if nb_alert >= max_nb_alert:
LOG.w(
"%s emails were sent to %s in the last %s days, alert type %s",
nb_alert,
to_email,
nb_day,
alert_type,
)
return False
SentAlert.create(user_id=user.id, alert_type=alert_type, to_email=to_email)
Session.commit()
if ignore_smtp_error:
try:
send_email(to_email, subject, plaintext, html)
except SMTPException:
LOG.w("Cannot send email to %s, subject %s", to_email, subject)
else:
send_email(to_email, subject, plaintext, html)
return True
def send_email_at_most_times(
user: User,
alert_type: str,
to_email: str,
subject,
plaintext,
html=None,
max_times=1,
) -> bool:
"""Same as send_email with rate control over alert_type.
Sent at most `max_times`
This is used to inform users about a warning.
Return true if the email is sent, otherwise False
"""
to_email = sanitize_email(to_email)
nb_alert = SentAlert.filter_by(alert_type=alert_type, to_email=to_email).count()
if nb_alert >= max_times:
LOG.w(
"%s emails were sent to %s alert type %s",
nb_alert,
to_email,
alert_type,
)
return False
SentAlert.create(user_id=user.id, alert_type=alert_type, to_email=to_email)
Session.commit()
send_email(to_email, subject, plaintext, html)
return True
def get_email_local_part(address) -> str:
"""
Get the local part from email
ab@cd.com -> ab
Convert the local part to lowercase
"""
r: ValidatedEmail = validate_email(
address, check_deliverability=False, allow_smtputf8=False
)
return r.local_part.lower()
def get_email_domain_part(address):
"""
Get the domain part from email
ab@cd.com -> cd.com
"""
address = sanitize_email(address)
return address[address.find("@") + 1 :]
def add_dkim_signature(msg: Message, email_domain: str):
for dkim_headers in headers.DKIM_HEADERS:
try:
add_dkim_signature_with_header(msg, email_domain, dkim_headers)
return
except dkim.DKIMException:
LOG.w("DKIM fail with %s", dkim_headers, exc_info=True)
# try with another headers
continue
# To investigate why some emails can't be DKIM signed. todo: remove
if TEMP_DIR:
file_name = str(uuid.uuid4()) + ".eml"
with open(os.path.join(TEMP_DIR, file_name), "wb") as f:
f.write(msg.as_bytes())
LOG.w("email saved to %s", file_name)
raise Exception("Cannot create DKIM signature")
def add_dkim_signature_with_header(
msg: Message, email_domain: str, dkim_headers: [bytes]
):
delete_header(msg, "DKIM-Signature")
# Specify headers in "byte" form
# Generate message signature
if DKIM_PRIVATE_KEY:
sig = dkim.sign(
to_bytes(msg),
DKIM_SELECTOR,
email_domain.encode(),
DKIM_PRIVATE_KEY.encode(),
include_headers=dkim_headers,
)
sig = sig.decode()
# remove linebreaks from sig
sig = sig.replace("\n", " ").replace("\r", "")
msg[headers.DKIM_SIGNATURE] = sig[len("DKIM-Signature: ") :]
def add_or_replace_header(msg: Message, header: str, value: str):
"""
Remove all occurrences of `header` and add `header` with `value`.
"""
delete_header(msg, header)
msg[header] = value
def delete_header(msg: Message, header: str):
"""a header can appear several times in message."""
# inspired from https://stackoverflow.com/a/47903323/1428034
for i in reversed(range(len(msg._headers))):
header_name = msg._headers[i][0].lower()
if header_name == header.lower():
del msg._headers[i]
def sanitize_header(msg: Message, header: str):
"""remove trailing space and remove linebreak from a header"""
for i in reversed(range(len(msg._headers))):
header_name = msg._headers[i][0].lower()
if header_name == header.lower():
# msg._headers[i] is a tuple like ('From', 'hey@google.com')
if msg._headers[i][1]:
msg._headers[i] = (
msg._headers[i][0],
msg._headers[i][1].strip().replace("\n", " "),
)
def delete_all_headers_except(msg: Message, headers: [str]):
headers = [h.lower() for h in headers]
for i in reversed(range(len(msg._headers))):
header_name = msg._headers[i][0].lower()
if header_name not in headers:
del msg._headers[i]
def can_create_directory_for_address(email_address: str) -> bool:
"""return True if an email ends with one of the alias domains provided by SimpleLogin"""
# not allow creating directory with premium domain
for domain in ALIAS_DOMAINS:
if email_address.endswith("@" + domain):
return True
return False
def is_valid_alias_address_domain(email_address) -> bool:
"""Return whether an address domain might a domain handled by SimpleLogin"""
domain = get_email_domain_part(email_address)
if SLDomain.get_by(domain=domain):
return True
if CustomDomain.get_by(domain=domain, verified=True):
return True
return False
def email_can_be_used_as_mailbox(email_address: str) -> bool:
"""Return True if an email can be used as a personal email.
Use the email domain as criteria. A domain can be used if it is not:
- one of ALIAS_DOMAINS
- one of PREMIUM_ALIAS_DOMAINS
- one of custom domains
- a disposable domain
"""
try:
domain = validate_email(
email_address, check_deliverability=False, allow_smtputf8=False
).domain
except EmailNotValidError:
return False
if not domain:
return False
if SLDomain.get_by(domain=domain):
return False
from app.models import CustomDomain
if CustomDomain.get_by(domain=domain, verified=True):
LOG.d("domain %s is a SimpleLogin custom domain", domain)
return False
if is_disposable_domain(domain):
LOG.d("Domain %s is disposable", domain)
return False
# check if email MX domain is disposable
mx_domains = get_mx_domain_list(domain)
# if no MX record, email is not valid
if not mx_domains:
LOG.d("No MX record for domain %s", domain)
return False
for mx_domain in mx_domains:
if is_disposable_domain(mx_domain):
LOG.d("MX Domain %s %s is disposable", mx_domain, domain)
return False
return True
def is_disposable_domain(domain):
for d in DISPOSABLE_EMAIL_DOMAINS:
if domain == d:
return True
# subdomain
if domain.endswith("." + d):
return True
return False
def get_mx_domain_list(domain) -> [str]:
"""return list of MX domains for a given email.
domain name ends *without* a dot (".") at the end.
"""
priority_domains = get_mx_domains(domain)
return [d[:-1] for _, d in priority_domains]
def personal_email_already_used(email_address: str) -> bool:
"""test if an email can be used as user email"""
if User.get_by(email=email_address):
return True
return False
def mailbox_already_used(email: str, user) -> bool:
if Mailbox.get_by(email=email, user_id=user.id):
return True
# support the case user wants to re-add their real email as mailbox
# can happen when user changes their root email and wants to add this new email as mailbox
if email == user.email:
return False
return False
def get_orig_message_from_bounce(bounce_report: Message) -> Optional[Message]:
"""parse the original email from Bounce"""
i = 0
for part in bounce_report.walk():
i += 1
# 1st part is the container (bounce report)
# 2nd part is the report from our own Postfix
# 3rd is report from other mailbox
# 4th is the container of the original message
# ...
# 7th is original message
if i == 7:
return part
def get_mailbox_bounce_info(bounce_report: Message) -> Optional[Message]:
"""
Return the bounce info from the bounce report
An example of bounce info:
Final-Recipient: rfc822; not-existing@gmail.com
Original-Recipient: rfc822;not-existing@gmail.com
Action: failed
Status: 5.1.1
Remote-MTA: dns; gmail-smtp-in.l.google.com
Diagnostic-Code: smtp;
550-5.1.1 The email account that you tried to reach does
not exist. Please try 550-5.1.1 double-checking the recipient's email
address for typos or 550-5.1.1 unnecessary spaces. Learn more at 550 5.1.1
https://support.google.com/mail/?p=NoSuchUser z127si6173191wmc.132 - gsmtp
"""
i = 0
for part in bounce_report.walk():
i += 1
# 1st part is the container (bounce report)
# 2nd part is the report from our own Postfix
# 3rd is report from other mailbox
# 4th is the container of the original message
# 5th is a child of 3rd that contains more info about the bounce
if i == 5:
if not part["content-transfer-encoding"]:
LOG.w("add missing content-transfer-encoding header")
part["content-transfer-encoding"] = "7bit"
return part
def get_orig_message_from_hotmail_complaint(msg: Message) -> Optional[Message]:
i = 0
for part in msg.walk():
i += 1
# 1st part is the container
# 2nd part is the empty body
# 3rd is original message
if i == 3:
return part
def get_orig_message_from_yahoo_complaint(msg: Message) -> Optional[Message]:
i = 0
for part in msg.walk():
i += 1
# 1st part is the container
# 2nd part is the empty body
# 6th is original message
if i == 6:
return part
def get_header_from_bounce(msg: Message, header: str) -> str:
"""using regex to get header value from bounce message
get_orig_message_from_bounce is better. This should be the last option
"""
msg_str = str(msg)
exp = re.compile(f"{header}.*\n")
r = re.search(exp, msg_str)
if r:
# substr should be something like 'HEADER: 1234'
substr = msg_str[r.start() : r.end()].strip()
parts = substr.split(":")
return parts[1].strip()
return None
def get_orig_message_from_spamassassin_report(msg: Message) -> Message:
"""parse the original email from Spamassassin report"""
i = 0
for part in msg.walk():
i += 1
# the original message is the 4th part
# 1st part is the root part, multipart/report
# 2nd is text/plain, SpamAssassin part
# 3rd is the original message in message/rfc822 content type
# 4th is original message
if i == 4:
return part
def get_spam_info(msg: Message, max_score=None) -> (bool, str):
"""parse SpamAssassin header to detect whether a message is classified as spam.
Return (is spam, spam status detail)
The header format is
```X-Spam-Status: No, score=-0.1 required=5.0 tests=DKIM_SIGNED,DKIM_VALID,
DKIM_VALID_AU,RCVD_IN_DNSWL_BLOCKED,RCVD_IN_MSPIKE_H2,SPF_PASS,
URIBL_BLOCKED autolearn=unavailable autolearn_force=no version=3.4.2```
"""
spamassassin_status = msg[headers.X_SPAM_STATUS]
if not spamassassin_status:
return False, ""
return get_spam_from_header(spamassassin_status, max_score=max_score)
def get_spam_from_header(spam_status_header, max_score=None) -> (bool, str):
"""get spam info from X-Spam-Status header
Return (is spam, spam status detail).
The spam_status_header has the following format
```No, score=-0.1 required=5.0 tests=DKIM_SIGNED,DKIM_VALID,
DKIM_VALID_AU,RCVD_IN_DNSWL_BLOCKED,RCVD_IN_MSPIKE_H2,SPF_PASS,
URIBL_BLOCKED autolearn=unavailable autolearn_force=no version=3.4.2```
"""
# yes or no
spamassassin_answer = spam_status_header[: spam_status_header.find(",")]
if max_score:
# spam score
# get the score section "score=-0.1"
score_section = (
spam_status_header[spam_status_header.find(",") + 1 :].strip().split(" ")[0]
)
score = float(score_section[len("score=") :])
if score >= max_score:
LOG.w("Spam score %s exceeds %s", score, max_score)
return True, spam_status_header
return spamassassin_answer.lower() == "yes", spam_status_header
def get_header_unicode(header: Union[str, Header]) -> str:
"""
Convert a header to unicode
Should be used to handle headers like From:, To:, CC:, Subject:
"""
if header is None:
return ""
ret = ""
for to_decoded_str, charset in decode_header(header):
if charset is None:
if type(to_decoded_str) is bytes:
decoded_str = to_decoded_str.decode()
else:
decoded_str = to_decoded_str
else:
try:
decoded_str = to_decoded_str.decode(charset)
except (LookupError, UnicodeDecodeError): # charset is unknown
LOG.w("Cannot decode %s with %s, try utf-8", to_decoded_str, charset)
try:
decoded_str = to_decoded_str.decode("utf-8")
except UnicodeDecodeError:
LOG.w("Cannot UTF-8 decode %s", to_decoded_str)
decoded_str = to_decoded_str.decode("utf-8", errors="replace")
ret += decoded_str
return ret
def copy(msg: Message) -> Message:
"""return a copy of message"""
try:
return deepcopy(msg)
except Exception:
LOG.w("deepcopy fails, try string parsing")
try:
return message_from_string(msg.as_string())
except (UnicodeEncodeError, KeyError, LookupError):
LOG.w("as_string() fails, try bytes parsing")
return message_from_bytes(to_bytes(msg))
def to_bytes(msg: Message):
"""replace Message.as_bytes() method by trying different policies"""
try:
return msg.as_bytes()
except UnicodeEncodeError:
LOG.w("as_bytes fails with default policy, try SMTP policy")
try:
return msg.as_bytes(policy=policy.SMTP)
except UnicodeEncodeError:
LOG.w("as_bytes fails with SMTP policy, try SMTPUTF8 policy")
try:
return msg.as_bytes(policy=policy.SMTPUTF8)
except UnicodeEncodeError:
LOG.w("as_bytes fails with SMTPUTF8 policy, try converting to string")
msg_string = msg.as_string()
try:
return msg_string.encode()
except UnicodeEncodeError as e:
LOG.w("can't encode msg, err:%s", e)
return msg_string.encode(errors="replace")
def should_add_dkim_signature(domain: str) -> bool:
if SLDomain.get_by(domain=domain):
return True
custom_domain: CustomDomain = CustomDomain.get_by(domain=domain)
if custom_domain.dkim_verified:
return True
return False
def is_valid_email(email_address: str) -> bool:
"""
Used to check whether an email address is valid
NOT run MX check.
NOT allow unicode.
"""
try:
validate_email(email_address, check_deliverability=False, allow_smtputf8=False)
return True
except EmailNotValidError:
return False
class EmailEncoding(enum.Enum):
BASE64 = "base64"
QUOTED = "quoted-printable"
NO = "no-encoding"
def get_encoding(msg: Message) -> EmailEncoding:
"""
Return the message encoding, possible values:
- quoted-printable
- base64
- 7bit: default if unknown or empty
"""
cte = str(msg.get(headers.CONTENT_TRANSFER_ENCODING, "")).lower().strip()
if cte in ("", "7bit", "8bit", "binary", "8bit;"):
return EmailEncoding.NO
if cte == "base64":
return EmailEncoding.BASE64
if cte == "quoted-printable":
return EmailEncoding.QUOTED
# some email services use unknown encoding
if cte in ("amazonses.com",):
return EmailEncoding.NO
LOG.e("Unknown encoding %s", cte)
return EmailEncoding.NO
def encode_text(text: str, encoding: EmailEncoding = EmailEncoding.NO) -> str:
if encoding == EmailEncoding.QUOTED:
encoded = quopri.encodestring(text.encode("utf-8"))
return str(encoded, "utf-8")
elif encoding == EmailEncoding.BASE64:
encoded = base64.b64encode(text.encode("utf-8"))
return str(encoded, "utf-8")
else: # 7bit - no encoding
return text
def decode_text(text: str, encoding: EmailEncoding = EmailEncoding.NO) -> str:
if encoding == EmailEncoding.QUOTED:
decoded = quopri.decodestring(text.encode("utf-8"))
return decoded.decode(errors="ignore")
elif encoding == EmailEncoding.BASE64:
decoded = base64.b64decode(text.encode("utf-8"))
return decoded.decode(errors="ignore")
else: # 7bit - no encoding
return text
def add_header(msg: Message, text_header, html_header) -> Message:
content_type = msg.get_content_type().lower()
if content_type == "text/plain":
encoding = get_encoding(msg)
payload = msg.get_payload()
if type(payload) is str:
clone_msg = copy(msg)
new_payload = f"""{text_header}
---
{decode_text(payload, encoding)}"""
clone_msg.set_payload(encode_text(new_payload, encoding))
return clone_msg
elif content_type == "text/html":
encoding = get_encoding(msg)
payload = msg.get_payload()
if type(payload) is str:
new_payload = f"""
{html_header} |
{decode_text(payload, encoding)} |