app-MAIL-temp/app/email_utils.py
2020-02-03 13:47:34 +07:00

333 lines
8.6 KiB
Python

import os
from email.message import EmailMessage, Message
from email.utils import make_msgid, formatdate
from smtplib import SMTP
import dkim
from jinja2 import Environment, FileSystemLoader
from app.config import (
SUPPORT_EMAIL,
ROOT_DIR,
POSTFIX_SERVER,
NOT_SEND_EMAIL,
DKIM_SELECTOR,
DKIM_PRIVATE_KEY,
DKIM_HEADERS,
ALIAS_DOMAINS,
SUPPORT_NAME,
)
from app.log import LOG
def render(template_name, **kwargs) -> str:
templates_dir = os.path.join(ROOT_DIR, "templates", "emails")
env = Environment(loader=FileSystemLoader(templates_dir))
template = env.get_template(template_name)
return template.render(**kwargs)
def send_welcome_email(user):
send_email(
user.email,
f"Welcome to SimpleLogin {user.name}",
render("com/welcome.txt", name=user.name, user=user),
render("com/welcome.html", name=user.name, user=user),
)
def send_trial_end_soon_email(user):
send_email(
user.email,
f"Your trial will end soon {user.name}",
render("transactional/trial-end.txt", name=user.name, user=user),
render("transactional/trial-end.html", name=user.name, user=user),
)
def send_activation_email(email, name, activation_link):
send_email(
email,
f"Just one more step to join SimpleLogin {name}",
render(
"transactional/activation.txt",
name=name,
activation_link=activation_link,
email=email,
),
render(
"transactional/activation.html",
name=name,
activation_link=activation_link,
email=email,
),
)
def send_reset_password_email(email, name, reset_password_link):
send_email(
email,
f"Reset your password on SimpleLogin",
render(
"transactional/reset-password.txt",
name=name,
reset_password_link=reset_password_link,
),
render(
"transactional/reset-password.html",
name=name,
reset_password_link=reset_password_link,
),
)
def send_change_email(new_email, current_email, name, link):
send_email(
new_email,
f"Confirm email update on SimpleLogin",
render(
"transactional/change-email.txt",
name=name,
link=link,
new_email=new_email,
current_email=current_email,
),
render(
"transactional/change-email.html",
name=name,
link=link,
new_email=new_email,
current_email=current_email,
),
)
def send_new_app_email(email, name):
send_email(
email,
f"Any question/feedback for SimpleLogin {name}?",
render("com/new-app.txt", name=name),
render("com/new-app.html", name=name),
)
def send_test_email_alias(email, name):
send_email(
email,
f"This email is sent to {email}",
render("transactional/test-email.txt", name=name, alias=email),
render("transactional/test-email.html", name=name, alias=email),
)
def send_cannot_create_directory_alias(user, alias, directory):
"""when user cancels their subscription, they cannot create alias on the fly.
If this happens, send them an email to notify
"""
send_email(
user.email,
f"Alias {alias} cannot be created",
render(
"transactional/cannot-create-alias-directory.txt",
name=user.name,
alias=alias,
directory=directory,
),
render(
"transactional/cannot-create-alias-directory.html",
name=user.name,
alias=alias,
directory=directory,
),
)
def send_cannot_create_domain_alias(user, alias, domain):
"""when user cancels their subscription, they cannot create alias on the fly with custom domain.
If this happens, send them an email to notify
"""
send_email(
user.email,
f"Alias {alias} cannot be created",
render(
"transactional/cannot-create-alias-domain.txt",
name=user.name,
alias=alias,
domain=domain,
),
render(
"transactional/cannot-create-alias-domain.html",
name=user.name,
alias=alias,
domain=domain,
),
)
def send_reply_alias_must_use_personal_email(user, alias, sender):
"""
The reply_email can be used only by user personal email.
Notify user if it's used by someone else
"""
send_email(
user.email,
f"Reply from your alias {alias} only works with your personal email",
render(
"transactional/reply-must-use-personal-email.txt",
name=user.name,
alias=alias,
sender=sender,
user_email=user.email,
),
render(
"transactional/reply-must-use-personal-email.html",
name=user.name,
alias=alias,
sender=sender,
user_email=user.email,
),
)
def send_email(to_email, subject, plaintext, html):
if NOT_SEND_EMAIL:
LOG.d(
"send email with subject %s to %s, plaintext: %s",
subject,
to_email,
plaintext,
)
return
# host IP, setup via Docker network
smtp = SMTP(POSTFIX_SERVER, 25)
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = f"{SUPPORT_NAME} <{SUPPORT_EMAIL}>"
msg["To"] = to_email
msg.set_content(plaintext)
if html is not None:
msg.add_alternative(html, subtype="html")
msg_id_header = make_msgid()
LOG.d("message-id %s", msg_id_header)
msg["Message-ID"] = msg_id_header
date_header = formatdate()
LOG.d("Date header: %s", date_header)
msg["Date"] = date_header
# add DKIM
email_domain = SUPPORT_EMAIL[SUPPORT_EMAIL.find("@") + 1 :]
add_dkim_signature(msg, email_domain)
msg_raw = msg.as_string().encode()
smtp.sendmail(SUPPORT_EMAIL, to_email, msg_raw)
def get_email_name(email_from):
"""parse email from header and return the name part
First Last <ab@cd.com> -> First Last
ab@cd.com -> ""
"""
if "<" in email_from:
return email_from[: email_from.find("<")].strip()
return ""
def get_email_part(email_from):
"""parse email from header and return the email part
First Last <ab@cd.com> -> ab@cd.com
ab@cd.com -> ""
"""
if "<" in email_from:
return email_from[email_from.find("<") + 1 : email_from.find(">")].strip()
return email_from
def get_email_local_part(email):
"""
Get the local part from email
ab@cd.com -> ab
"""
return email[: email.find("@")]
def get_email_domain_part(email):
"""
Get the domain part from email
ab@cd.com -> cd.com
"""
return email[email.find("@") + 1 :]
def add_dkim_signature(msg: Message, email_domain: str):
if msg["DKIM-Signature"]:
LOG.d("Remove DKIM-Signature %s", msg["DKIM-Signature"])
del msg["DKIM-Signature"]
# Specify headers in "byte" form
# Generate message signature
sig = dkim.sign(
msg.as_string().encode(),
DKIM_SELECTOR,
email_domain.encode(),
DKIM_PRIVATE_KEY.encode(),
include_headers=DKIM_HEADERS,
)
sig = sig.decode()
# remove linebreaks from sig
sig = sig.replace("\n", " ").replace("\r", "")
msg.add_header("DKIM-Signature", sig[len("DKIM-Signature: ") :])
def add_or_replace_header(msg: Message, header: str, value: str):
try:
msg.add_header(header, value)
except ValueError:
# the header exists already
msg.replace_header(header, value)
def delete_header(msg: Message, header: str):
"""a header can appear several times in message."""
for h in msg._headers:
if h[0].lower() == header.lower():
msg._headers.remove(h)
def email_belongs_to_alias_domains(email: str) -> bool:
"""return True if an emails ends with one of the alias domains provided by SimpleLogin"""
for domain in ALIAS_DOMAINS:
if email.endswith("@" + domain):
return True
return False
def can_be_used_as_personal_email(email: str) -> bool:
"""return True if an email can be used as a personal email. Currently the only condition is email domain is not
- one of ALIAS_DOMAINS
- one of custom domains
"""
domain = get_email_domain_part(email)
if not domain:
return False
if domain in ALIAS_DOMAINS:
return False
from app.models import CustomDomain
if CustomDomain.get_by(domain=domain, verified=True):
return False
return True