app-MAIL-temp/app/dashboard/views/index.py

209 lines
6.8 KiB
Python

from flask import render_template, request, redirect, url_for, flash, session
from flask_login import login_required, current_user
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import joinedload
from app import email_utils
from app.config import HIGHLIGHT_GEN_EMAIL_ID
from app.dashboard.base import dashboard_bp
from app.extensions import db
from app.log import LOG
from app.models import (
GenEmail,
ClientUser,
ForwardEmail,
ForwardEmailLog,
DeletedAlias,
AliasGeneratorEnum,
)
class AliasInfo:
gen_email: GenEmail
nb_forward: int
nb_blocked: int
nb_reply: int
show_intro_test_send_email: bool = False
highlight: bool = False
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
@dashboard_bp.route("/", methods=["GET", "POST"])
@login_required
def index():
# after creating a gen email, it's helpful to highlight it
highlight_gen_email_id = session.get(HIGHLIGHT_GEN_EMAIL_ID)
# reset as it should not persist
if highlight_gen_email_id:
del session[HIGHLIGHT_GEN_EMAIL_ID]
query = request.args.get("query") or ""
# User generates a new email
if request.method == "POST":
if request.form.get("form-name") == "trigger-email":
gen_email_id = request.form.get("gen-email-id")
gen_email = GenEmail.get(gen_email_id)
LOG.d("trigger an email to %s", gen_email)
email_utils.send_test_email_alias(gen_email.email, gen_email.user.name)
flash(
f"An email sent to {gen_email.email} is on its way, please check your inbox/spam folder",
"success",
)
elif request.form.get("form-name") == "create-custom-email":
if current_user.can_create_new_alias():
return redirect(url_for("dashboard.custom_alias"))
else:
flash(f"You need to upgrade your plan to create new alias.", "warning")
elif request.form.get("form-name") == "create-random-email":
if current_user.can_create_new_alias():
scheme = int(
request.form.get("generator_scheme") or current_user.alias_generator
)
if not scheme or not AliasGeneratorEnum.has_value(scheme):
scheme = current_user.alias_generator
gen_email = GenEmail.create_new_random(
user_id=current_user.id, scheme=scheme
)
db.session.commit()
LOG.d("generate new email %s for user %s", gen_email, current_user)
flash(f"Alias {gen_email.email} has been created", "success")
session[HIGHLIGHT_GEN_EMAIL_ID] = gen_email.id
else:
flash(f"You need to upgrade your plan to create new alias.", "warning")
elif request.form.get("form-name") == "switch-email-forwarding":
gen_email_id = request.form.get("gen-email-id")
gen_email: GenEmail = GenEmail.get(gen_email_id)
LOG.d("switch email forwarding for %s", gen_email)
gen_email.enabled = not gen_email.enabled
if gen_email.enabled:
flash(f"Alias {gen_email.email} is enabled", "success")
else:
flash(f"Alias {gen_email.email} is disabled", "warning")
session[HIGHLIGHT_GEN_EMAIL_ID] = gen_email.id
db.session.commit()
elif request.form.get("form-name") == "delete-email":
gen_email_id = request.form.get("gen-email-id")
gen_email: GenEmail = GenEmail.get(gen_email_id)
LOG.d("delete gen email %s", gen_email)
email = gen_email.email
GenEmail.delete(gen_email.id)
db.session.commit()
flash(f"Alias {email} has been deleted", "success")
# try to save deleted alias
try:
DeletedAlias.create(user_id=current_user.id, email=email)
db.session.commit()
# this can happen when a previously deleted alias is re-created via catch-all or directory feature
except IntegrityError:
LOG.error("alias %s has been added before to DeletedAlias", email)
db.session.rollback()
return redirect(url_for("dashboard.index", query=query))
client_users = (
ClientUser.filter_by(user_id=current_user.id)
.options(joinedload(ClientUser.client))
.options(joinedload(ClientUser.gen_email))
.all()
)
sorted(client_users, key=lambda cu: cu.client.name)
return render_template(
"dashboard/index.html",
client_users=client_users,
aliases=get_alias_info(current_user.id, query, highlight_gen_email_id),
highlight_gen_email_id=highlight_gen_email_id,
query=query,
AliasGeneratorEnum=AliasGeneratorEnum,
)
def get_alias_info(user_id, query=None, highlight_gen_email_id=None) -> [AliasInfo]:
if query:
query = query.strip().lower()
aliases = {} # dict of alias and AliasInfo
q = db.session.query(GenEmail, ForwardEmail, ForwardEmailLog).filter(
GenEmail.user_id == user_id,
GenEmail.id == ForwardEmail.gen_email_id,
ForwardEmail.id == ForwardEmailLog.forward_id,
)
if query:
q = q.filter(GenEmail.email.contains(query))
for ge, fe, fel in q:
if ge.email not in aliases:
aliases[ge.email] = AliasInfo(
gen_email=ge,
nb_blocked=0,
nb_forward=0,
nb_reply=0,
highlight=ge.id == highlight_gen_email_id,
)
alias_info = aliases[ge.email]
if fel.is_reply:
alias_info.nb_reply += 1
elif fel.blocked:
alias_info.nb_blocked += 1
else:
alias_info.nb_forward += 1
# also add alias that has no forward email or log
q = (
db.session.query(GenEmail)
.filter(GenEmail.email.notin_(aliases.keys()))
.filter(GenEmail.user_id == user_id)
)
if query:
q = q.filter(GenEmail.email.contains(query))
for ge in q:
aliases[ge.email] = AliasInfo(
gen_email=ge,
nb_blocked=0,
nb_forward=0,
nb_reply=0,
highlight=ge.id == highlight_gen_email_id,
)
ret = list(aliases.values())
# make sure the highlighted alias is the first element
highlight_index = None
for ix, alias in enumerate(ret):
if alias.highlight:
highlight_index = ix
break
if highlight_index:
ret.insert(0, ret.pop(highlight_index))
# only show intro on the first enabled alias
for alias in ret:
if alias.gen_email.enabled:
alias.show_intro_test_send_email = True
break
return ret