app-MAIL-temp/app/api/views/index.py

90 lines
2.7 KiB
Python

import random
import arrow
from flask import jsonify, request
from app.api.base import api_bp
from app.config import EMAIL_DOMAIN
from app.extensions import db
from app.log import LOG
from app.models import ApiKey, AliasUsedOn, GenEmail, User, DeletedAlias
from app.utils import random_string
@api_bp.route("/alias/new", methods=["GET", "POST"])
def index():
"""
the incoming request must provide a valid api-key in "Authentication" header and
the payload must contain "hostname"
"""
api_code = request.headers.get("Authentication")
api_key = ApiKey.get_by(code=api_code)
if not api_key:
return jsonify(error="Wrong api key"), 401
data = request.get_json()
if not data:
return jsonify(error="hostname must be provided")
hostname = data.get("hostname")
# Update api key stats
api_key.last_used = arrow.now()
api_key.times += 1
db.session.commit()
user = api_key.user
q = db.session.query(AliasUsedOn, GenEmail, User).filter(
AliasUsedOn.gen_email_id == GenEmail.id,
GenEmail.user_id == user.id,
AliasUsedOn.hostname == hostname,
)
r = q.first()
if r:
_, alias, _ = r
LOG.d("found alias %s %s %s", alias, hostname, user)
return jsonify(alias=alias.email)
# use a custom alias for this user
if user.is_premium():
LOG.d("create new custom alias %s %s", hostname, user)
# generate a custom email
found = False
while not found:
email_suffix = random_string(6)
email_prefix = hostname.replace(".", "_")
full_email = f"{email_prefix}.{email_suffix}@{EMAIL_DOMAIN}"
# check if email already exists. Very rare that an email is already used
if GenEmail.get_by(email=full_email) or DeletedAlias.get_by(
email=full_email
):
LOG.warning("full_email already used %s. Retry", full_email)
else:
found = True
gen_email = GenEmail.create(email=full_email, user_id=user.id, custom=True)
db.session.flush()
AliasUsedOn.create(gen_email_id=gen_email.id, hostname=hostname)
db.session.commit()
return jsonify(alias=full_email)
else:
# choose randomly from user non-custom alias
aliases = db.session.query(GenEmail).filter(GenEmail.user_id == user.id).all()
if aliases:
alias = random.choice(aliases)
else:
LOG.d("user %s has no alias, create one", user)
alias = GenEmail.create_new_gen_email(user.id)
db.session.commit()
AliasUsedOn.create(gen_email_id=alias.id, hostname=hostname)
db.session.commit()
return jsonify(alias=alias.email)