use sanitize_email instead of .lower().strip().replace(" ", "")

This commit is contained in:
Son NK 2021-01-11 12:29:40 +01:00
parent 01858ac452
commit e9adb3270d
16 changed files with 68 additions and 38 deletions

View File

@ -17,6 +17,7 @@ from app.email_utils import (
send_email,
render,
)
from app.utils import sanitize_email
from app.extensions import db, limiter
from app.log import LOG
from app.models import User, ApiKey, SocialAuth, AccountActivation
@ -47,7 +48,7 @@ def auth_login():
if not data:
return jsonify(error="request body cannot be empty"), 400
email = data.get("email").strip().lower()
email = sanitize_email(data.get("email"))
password = data.get("password")
device = data.get("device")
@ -84,7 +85,7 @@ def auth_register():
if not data:
return jsonify(error="request body cannot be empty"), 400
email = data.get("email").strip().lower()
email = sanitize_email(data.get("email"))
password = data.get("password")
if DISABLE_REGISTRATION:
@ -134,7 +135,7 @@ def auth_activate():
if not data:
return jsonify(error="request body cannot be empty"), 400
email = data.get("email").strip().lower()
email = sanitize_email(data.get("email"))
code = data.get("code")
user = User.get_by(email=email)
@ -187,7 +188,7 @@ def auth_reactivate():
if not data:
return jsonify(error="request body cannot be empty"), 400
email = data.get("email").strip().lower()
email = sanitize_email(data.get("email"))
user = User.get_by(email=email)
# do not use a different message to avoid exposing existing email
@ -240,7 +241,7 @@ def auth_facebook():
graph = facebook.GraphAPI(access_token=facebook_token)
user_info = graph.get_object("me", fields="email,name")
email = user_info.get("email").strip().lower()
email = sanitize_email(user_info.get("email"))
user = User.get_by(email=email)
@ -253,7 +254,7 @@ def auth_facebook():
return jsonify(error=f"cannot use {email} as personal inbox"), 400
LOG.d("create facebook user with %s", user_info)
user = User.create(email=email.lower(), name=user_info["name"], activated=True)
user = User.create(email=email, name=user_info["name"], activated=True)
db.session.commit()
email_utils.send_welcome_email(user)
@ -293,7 +294,7 @@ def auth_google():
build = googleapiclient.discovery.build("oauth2", "v2", credentials=cred)
user_info = build.userinfo().get().execute()
email = user_info.get("email").strip().lower()
email = sanitize_email(user_info.get("email"))
user = User.get_by(email=email)
@ -306,7 +307,7 @@ def auth_google():
return jsonify(error=f"cannot use {email} as personal inbox"), 400
LOG.d("create Google user with %s", user_info)
user = User.create(email=email.lower(), name="", activated=True)
user = User.create(email=email, name="", activated=True)
db.session.commit()
email_utils.send_welcome_email(user)
@ -355,7 +356,7 @@ def forgot_password():
if not data or not data.get("email"):
return jsonify(error="request body must contain email"), 400
email = data.get("email").strip().lower()
email = sanitize_email(data.get("email"))
user = User.get_by(email=email)

View File

@ -12,6 +12,7 @@ from app.email_utils import (
email_can_be_used_as_mailbox,
is_valid_email,
)
from app.utils import sanitize_email
from app.extensions import db
from app.models import Mailbox
@ -38,7 +39,7 @@ def create_mailbox():
the new mailbox dict
"""
user = g.user
mailbox_email = request.get_json().get("email").lower().strip().replace(" ", "")
mailbox_email = sanitize_email(request.get_json().get("email"))
if not is_valid_email(mailbox_email):
return jsonify(error=f"{mailbox_email} invalid"), 400
@ -126,7 +127,7 @@ def update_mailbox(mailbox_id):
changed = True
if "email" in data:
new_email = data.get("email").lower().strip()
new_email = sanitize_email(data.get("email"))
if mailbox_already_used(new_email, user):
return jsonify(error=f"{new_email} already used"), 400

View File

@ -13,6 +13,7 @@ from app.extensions import db
from app.log import LOG
from app.models import User, SocialAuth
from .login_utils import after_login
from ...utils import sanitize_email
_authorization_base_url = "https://www.facebook.com/dialog/oauth"
_token_url = "https://graph.facebook.com/oauth/access_token"
@ -91,7 +92,7 @@ def facebook_callback():
)
return redirect(url_for("auth.register"))
email = email.strip().lower()
email = sanitize_email(email)
user = User.get_by(email=email)
picture_url = facebook_user_data.get("picture", {}).get("data", {}).get("url")

View File

@ -4,6 +4,7 @@ from wtforms import StringField, validators
from app.auth.base import auth_bp
from app.dashboard.views.setting import send_reset_password_email
from app.utils import sanitize_email
from app.extensions import limiter
from app.models import User
@ -20,7 +21,7 @@ def forgot_password():
form = ForgotPasswordForm(request.form)
if form.validate_on_submit():
email = form.email.data.strip().lower()
email = sanitize_email(form.email.data)
flash(
"If your email is correct, you are going to receive an email to reset your password",
"success",

View File

@ -7,7 +7,7 @@ from app.config import GITHUB_CLIENT_ID, GITHUB_CLIENT_SECRET, URL
from app.extensions import db
from app.log import LOG
from app.models import User, SocialAuth
from app.utils import encode_url
from app.utils import encode_url, sanitize_email
_authorization_base_url = "https://github.com/login/oauth/authorize"
_token_url = "https://github.com/login/oauth/access_token"
@ -82,7 +82,7 @@ def github_callback():
)
return redirect(url_for("auth.login"))
email = email.strip().lower()
email = sanitize_email(email)
user = User.get_by(email=email)
if not user:

View File

@ -7,7 +7,7 @@ from app.config import URL, GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET
from app.extensions import db
from app.log import LOG
from app.models import User, File, SocialAuth
from app.utils import random_string
from app.utils import random_string, sanitize_email
from .login_utils import after_login
_authorization_base_url = "https://accounts.google.com/o/oauth2/v2/auth"
@ -79,7 +79,7 @@ def google_callback():
"https://www.googleapis.com/oauth2/v1/userinfo"
).json()
email = google_user_data["email"].strip().lower()
email = sanitize_email(google_user_data["email"])
user = User.get_by(email=email)
picture_url = google_user_data.get("picture")

View File

@ -5,6 +5,7 @@ from wtforms import StringField, validators
from app.auth.base import auth_bp
from app.auth.views.login_utils import after_login
from app.utils import sanitize_email
from app.extensions import limiter
from app.log import LOG
from app.models import User
@ -29,7 +30,7 @@ def login():
show_resend_activation = False
if form.validate_on_submit():
user = User.filter_by(email=form.email.data.strip().lower()).first()
user = User.filter_by(email=sanitize_email(form.email.data)).first()
if not user or not user.check_password(form.password.data):
# Trigger rate limiter

View File

@ -15,7 +15,7 @@ from app.email_utils import (
from app.extensions import db
from app.log import LOG
from app.models import User, ActivationCode
from app.utils import random_string, encode_url
from app.utils import random_string, encode_url, sanitize_email
class RegisterForm(FlaskForm):
@ -66,7 +66,7 @@ def register():
HCAPTCHA_SITEKEY=HCAPTCHA_SITEKEY,
)
email = form.email.data.strip().lower()
email = sanitize_email(form.email.data)
if not email_can_be_used_as_mailbox(email):
flash("You cannot use this email address as your personal inbox.", "error")

View File

@ -4,6 +4,7 @@ from wtforms import StringField, validators
from app.auth.base import auth_bp
from app.auth.views.register import send_activation_email
from app.utils import sanitize_email
from app.log import LOG
from app.models import User
@ -17,7 +18,7 @@ def resend_activation():
form = ResendActivationForm(request.form)
if form.validate_on_submit():
user = User.filter_by(email=form.email.data.strip().lower()).first()
user = User.filter_by(email=sanitize_email(form.email.data)).first()
if not user:
flash("There is no such email", "warning")

View File

@ -10,7 +10,12 @@ from wtforms import StringField, validators, ValidationError
from app.config import PAGE_LIMIT
from app.dashboard.base import dashboard_bp
from app.email_utils import parseaddr_unicode, is_valid_email, generate_reply_email
from app.email_utils import (
parseaddr_unicode,
is_valid_email,
generate_reply_email,
)
from app.utils import sanitize_email
from app.extensions import db
from app.log import LOG
from app.models import Alias, Contact, EmailLog
@ -166,6 +171,7 @@ def alias_contact_manager(alias_id):
try:
contact_name, contact_email = parseaddr_unicode(contact_addr)
contact_email = sanitize_email(contact_email)
except Exception:
flash(f"{contact_addr} is invalid", "error")
return redirect(

View File

@ -11,6 +11,7 @@ from app.config import ENFORCE_SPF, MAILBOX_SECRET
from app.config import URL
from app.dashboard.base import dashboard_bp
from app.email_utils import email_can_be_used_as_mailbox
from app.utils import sanitize_email
from app.email_utils import mailbox_already_used, render, send_email
from app.extensions import db
from app.log import LOG
@ -45,7 +46,7 @@ def mailbox_detail_route(mailbox_id):
request.form.get("form-name") == "update-email"
and change_email_form.validate_on_submit()
):
new_email = change_email_form.email.data.lower().strip()
new_email = sanitize_email(change_email_form.email.data)
if new_email != mailbox.email and not pending_email:
# check if this email is not already used
if mailbox_already_used(new_email, current_user) or Alias.get_by(
@ -92,7 +93,7 @@ def mailbox_detail_route(mailbox_id):
url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id)
)
elif request.form.get("form-name") == "add-authorized-address":
address = request.form.get("email").lower().strip().replace(" ", "")
address = sanitize_email(request.form.get("email"))
if AuthorizedAddress.get_by(mailbox_id=mailbox.id, email=address):
flash(f"{address} already added", "error")
else:

View File

@ -42,7 +42,7 @@ from app.models import (
SLDomain,
CoinbaseSubscription,
)
from app.utils import random_string
from app.utils import random_string, sanitize_email
class SettingForm(FlaskForm):
@ -79,10 +79,10 @@ def setting():
# whether user can proceed with the email update
new_email_valid = True
if (
change_email_form.email.data.lower().strip() != current_user.email
sanitize_email(change_email_form.email.data) != current_user.email
and not pending_email
):
new_email = change_email_form.email.data.strip().lower()
new_email = sanitize_email(change_email_form.email.data)
# check if this email is not already used
if personal_email_already_used(new_email) or Alias.get_by(

View File

@ -51,7 +51,12 @@ from app.models import (
Alias,
EmailLog,
)
from app.utils import random_string, convert_to_id, convert_to_alphanumeric
from app.utils import (
random_string,
convert_to_id,
convert_to_alphanumeric,
sanitize_email,
)
def render(template_name, **kwargs) -> str:
@ -293,7 +298,7 @@ def send_email_with_rate_control(
Return true if the email is sent, otherwise False
"""
to_email = to_email.lower().strip()
to_email = sanitize_email(to_email)
min_dt = arrow.now().shift(days=-1 * nb_day)
nb_alert = (
SentAlert.query.filter_by(alert_type=alert_type, to_email=to_email)
@ -332,7 +337,7 @@ def send_email_at_most_times(
Return true if the email is sent, otherwise False
"""
to_email = to_email.lower().strip()
to_email = sanitize_email(to_email)
nb_alert = SentAlert.query.filter_by(
alert_type=alert_type, to_email=to_email
).count()
@ -365,7 +370,8 @@ def get_email_domain_part(address):
Get the domain part from email
ab@cd.com -> cd.com
"""
return address[address.find("@") + 1 :].strip().lower()
address = sanitize_email(address)
return address[address.find("@") + 1 :]
def add_dkim_signature(msg: Message, email_domain: str):

View File

@ -30,7 +30,13 @@ from app.errors import AliasInTrashError
from app.extensions import db
from app.log import LOG
from app.oauth_models import Scope
from app.utils import convert_to_id, random_string, random_words, random_word
from app.utils import (
convert_to_id,
random_string,
random_words,
random_word,
sanitize_email,
)
class ModelMixin(object):
@ -1022,7 +1028,7 @@ class Alias(db.Model, ModelMixin):
email = kw["email"]
# make sure email is lowercase and doesn't have any whitespace
email = email.lower().strip().replace(" ", "")
email = sanitize_email(email)
# make sure alias is not in global trash, i.e. DeletedAlias table
if DeletedAlias.get_by(email=email):

12
cron.py
View File

@ -27,6 +27,7 @@ from app.email_utils import (
normalize_reply_email,
is_valid_email,
)
from app.utils import sanitize_email
from app.extensions import db
from app.log import LOG
from app.models import (
@ -425,11 +426,11 @@ def sanity_check():
db.session.commit()
for user in User.filter_by(activated=True).all():
if user.email.lower().strip().replace(" ", "") != user.email:
if sanitize_email(user.email) != user.email:
LOG.exception("%s does not have sanitized email", user)
for alias in Alias.query.all():
if alias.email.lower().strip().replace(" ", "") != alias.email:
if sanitize_email(alias.email) != alias.email:
LOG.exception("Alias %s email not sanitized", alias)
if alias.name and "\n" in alias.name:
@ -438,7 +439,10 @@ def sanity_check():
LOG.exception("Alias %s name contains linebreak %s", alias, alias.name)
for contact in Contact.query.all():
if contact.reply_email.lower().strip().replace(" ", "") != contact.reply_email:
if sanitize_email(contact.reply_email) != contact.reply_email:
LOG.exception("Contact %s reply-email not sanitized", contact)
if sanitize_email(contact.website_email) != contact.website_email:
LOG.exception("Contact %s reply-email not sanitized", contact)
if not contact.invalid_email and not is_valid_email(contact.website_email):
@ -447,7 +451,7 @@ def sanity_check():
db.session.commit()
for mailbox in Mailbox.query.all():
if mailbox.email.lower().strip().replace(" ", "") != mailbox.email:
if sanitize_email(mailbox.email) != mailbox.email:
LOG.exception("Mailbox %s address not sanitized", mailbox)
for contact in Contact.query.all():

View File

@ -20,6 +20,7 @@ from app.email_utils import (
render,
get_email_domain_part,
)
from app.utils import sanitize_email
from app.extensions import db
from app.log import LOG
from app.models import (
@ -126,7 +127,7 @@ def handle_batch_import(batch_import: BatchImport):
for row in reader:
try:
full_alias = row["alias"].lower().strip().replace(" ", "")
full_alias = sanitize_email(row["alias"])
note = row["note"]
except KeyError:
LOG.warning("Cannot parse row %s", row)