use user.max_spam_score if present

This commit is contained in:
Son NK 2020-07-23 11:11:43 +02:00
parent 8e9968a7d9
commit d89e41d0e5
3 changed files with 52 additions and 5 deletions

View File

@ -464,7 +464,7 @@ def get_addrs_from_header(msg: Message, header) -> [str]:
return [r for r in ret if r]
def get_spam_info(msg: Message) -> (bool, str):
def get_spam_info(msg: Message, max_score=None) -> (bool, str):
"""parse SpamAssassin header to detect whether a message is classified as spam.
Return (is spam, spam status detail)
The header format is
@ -476,10 +476,31 @@ def get_spam_info(msg: Message) -> (bool, str):
if not spamassassin_status:
return False, ""
# yes or no
spamassassin_answer = spamassassin_status[: spamassassin_status.find(",")]
return get_spam_from_header(spamassassin_status, max_score=max_score)
return spamassassin_answer.lower() == "yes", spamassassin_status
def get_spam_from_header(spam_status_header, max_score=None) -> (bool, str):
"""get spam info from X-Spam-Status header
Return (is spam, spam status detail).
The spam_status_header has the following format
```No, score=-0.1 required=5.0 tests=DKIM_SIGNED,DKIM_VALID,
DKIM_VALID_AU,RCVD_IN_DNSWL_BLOCKED,RCVD_IN_MSPIKE_H2,SPF_PASS,
URIBL_BLOCKED autolearn=unavailable autolearn_force=no version=3.4.2```
"""
# yes or no
spamassassin_answer = spam_status_header[: spam_status_header.find(",")]
if max_score:
# spam score
# get the score section "score=-0.1"
score_section = (
spam_status_header[spam_status_header.find(",") + 1 :].strip().split(" ")[0]
)
score = float(score_section[len("score=") :])
if score >= max_score:
return True, spam_status_header
return spamassassin_answer.lower() == "yes", spam_status_header
def parseaddr_unicode(addr) -> (str, str):

View File

@ -421,7 +421,7 @@ def forward_email_to_mailbox(
)
return False, "550 SL E14"
is_spam, spam_status = get_spam_info(msg)
is_spam, spam_status = get_spam_info(msg, max_score=user.max_spam_score)
if is_spam:
LOG.warning("Email detected as spam. Alias: %s, from: %s", alias, contact)
email_log.is_spam = True

View File

@ -11,6 +11,7 @@ from app.email_utils import (
parseaddr_unicode,
send_email_with_rate_control,
copy,
get_spam_from_header,
)
from app.extensions import db
from app.models import User, CustomDomain
@ -134,3 +135,28 @@ def test_copy():
msg2 = copy(msg)
assert msg.as_bytes() == msg2.as_bytes()
def test_get_spam_from_header():
is_spam, _ = get_spam_from_header(
"""No, score=-0.1 required=5.0 tests=DKIM_SIGNED,DKIM_VALID,
DKIM_VALID_AU,RCVD_IN_DNSWL_BLOCKED,RCVD_IN_MSPIKE_H2,SPF_PASS,
URIBL_BLOCKED autolearn=unavailable autolearn_force=no version=3.4.2"""
)
assert not is_spam
is_spam, _ = get_spam_from_header(
"""Yes, score=-0.1 required=5.0 tests=DKIM_SIGNED,DKIM_VALID,
DKIM_VALID_AU,RCVD_IN_DNSWL_BLOCKED,RCVD_IN_MSPIKE_H2,SPF_PASS,
URIBL_BLOCKED autolearn=unavailable autolearn_force=no version=3.4.2"""
)
assert is_spam
# the case where max_score is less than the default used by SpamAssassin
is_spam, _ = get_spam_from_header(
"""No, score=6 required=10.0 tests=DKIM_SIGNED,DKIM_VALID,
DKIM_VALID_AU,RCVD_IN_DNSWL_BLOCKED,RCVD_IN_MSPIKE_H2,SPF_PASS,
URIBL_BLOCKED autolearn=unavailable autolearn_force=no version=3.4.2""",
max_score=5,
)
assert is_spam