"""Inspired from https://github.com/petermat/spamassassin_client """ import socket, select, re, logging from io import BytesIO divider_pattern = re.compile(br'^(.*?)\r?\n(.*?)\r?\n\r?\n', re.DOTALL) first_line_pattern = re.compile(br'^SPAMD/[^ ]+ 0 EX_OK$') class SpamAssassin(object): def __init__(self, message, timeout=20): self.score = None self.symbols = None # Connecting client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.settimeout(timeout) client.connect(('127.0.0.1', 783)) # Sending client.sendall(self._build_message(message)) client.shutdown(socket.SHUT_WR) # Reading resfp = BytesIO() while True: ready = select.select([client], [], [], timeout) if ready[0] is None: # Kill with Timeout! logging.info('[SpamAssassin] - Timeout ({0}s)!'.format(str(timeout))) break data = client.recv(4096) if data == b'': break resfp.write(data) # Closing client.close() client = None self._parse_response(resfp.getvalue()) def _build_message(self, message): reqfp = BytesIO() data_len = str(len(message)).encode() reqfp.write(b'REPORT SPAMC/1.2\r\n') reqfp.write(b'Content-Length: ' + data_len + b'\r\n') reqfp.write(b'User: cx42\r\n\r\n') reqfp.write(message) return reqfp.getvalue() def _parse_response(self, response): if response == b'': logging.info("[SPAM ASSASSIN] Empty response") return None match = divider_pattern.match(response) if not match: logging.error("[SPAM ASSASSIN] Response error:") logging.error(response) return None first_line = match.group(1) headers = match.group(2) body = response[match.end(0):] # Checking response is good match = first_line_pattern.match(first_line) if not match: logging.error("[SPAM ASSASSIN] invalid response:") logging.error(first_line) return None report_list = [s.strip() for s in body.decode('utf-8').strip().split('\n')] linebreak_num = report_list.index([s for s in report_list if "---" in s][0]) tablelists = [s for s in report_list[linebreak_num + 1:]] self.report_fulltext = '\n'.join(report_list) # join line when current one is only wrap of previous tablelists_temp = [] if tablelists: for counter, tablelist in enumerate(tablelists): if len(tablelist)>1: if (tablelist[0].isnumeric() or tablelist[0] == '-') and (tablelist[1].isnumeric() or tablelist[1] == '.'): tablelists_temp.append(tablelist) else: if tablelists_temp: tablelists_temp[-1] += " " + tablelist tablelists = tablelists_temp # create final json self.report_json = dict() for tablelist in tablelists: wordlist = re.split('\s+', tablelist) self.report_json[wordlist[1]] = {'partscore': float(wordlist[0]), 'description': ' '.join(wordlist[1:])} headers = headers.decode('utf-8').replace(' ', '').replace(':', ';').replace('/', ';').split(';') self.score = float(headers[2]) def get_report_json(self): return self.report_json def get_score(self): return self.score def is_spam(self, level=5): return self.score is None or self.score > level def get_fulltext(self): return self.report_fulltext