2022-02-24 15:05:05 +01:00
|
|
|
from app import config
|
2021-11-23 14:31:53 +01:00
|
|
|
from typing import Optional, List, Tuple
|
2020-05-03 11:19:14 +02:00
|
|
|
|
2019-12-02 01:13:39 +01:00
|
|
|
import dns.resolver
|
|
|
|
|
|
|
|
|
2020-01-05 19:01:38 +01:00
|
|
|
def _get_dns_resolver():
|
|
|
|
my_resolver = dns.resolver.Resolver()
|
2022-02-24 15:05:05 +01:00
|
|
|
my_resolver.nameservers = config.NAMESERVERS
|
2020-01-05 19:01:38 +01:00
|
|
|
|
|
|
|
return my_resolver
|
|
|
|
|
|
|
|
|
2020-11-16 19:16:06 +01:00
|
|
|
def get_ns(hostname) -> [str]:
|
|
|
|
try:
|
2022-02-24 15:05:05 +01:00
|
|
|
answers = _get_dns_resolver().resolve(hostname, "NS", search=True)
|
2020-12-06 17:59:07 +01:00
|
|
|
except Exception:
|
2020-11-16 19:16:06 +01:00
|
|
|
return []
|
|
|
|
return [a.to_text() for a in answers]
|
|
|
|
|
|
|
|
|
2020-05-03 11:19:14 +02:00
|
|
|
def get_cname_record(hostname) -> Optional[str]:
|
2020-05-03 12:48:42 +02:00
|
|
|
"""Return the CNAME record if exists for a domain, WITHOUT the trailing period at the end"""
|
2020-05-03 11:19:14 +02:00
|
|
|
try:
|
2022-02-24 15:05:05 +01:00
|
|
|
answers = _get_dns_resolver().resolve(hostname, "CNAME", search=True)
|
2020-05-03 11:19:14 +02:00
|
|
|
except Exception:
|
|
|
|
return None
|
|
|
|
|
|
|
|
for a in answers:
|
2020-05-03 12:48:42 +02:00
|
|
|
ret = a.to_text()
|
|
|
|
return ret[:-1]
|
2020-05-03 11:19:14 +02:00
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
2019-12-30 19:34:38 +01:00
|
|
|
def get_mx_domains(hostname) -> [(int, str)]:
|
|
|
|
"""return list of (priority, domain name).
|
|
|
|
domain name ends with a "." at the end.
|
2019-12-27 23:36:13 +01:00
|
|
|
"""
|
2019-12-07 23:42:57 +01:00
|
|
|
try:
|
2022-02-24 15:05:05 +01:00
|
|
|
answers = _get_dns_resolver().resolve(hostname, "MX", search=True)
|
2020-01-02 22:08:37 +01:00
|
|
|
except Exception:
|
2019-12-07 23:42:57 +01:00
|
|
|
return []
|
|
|
|
|
2019-12-02 01:13:39 +01:00
|
|
|
ret = []
|
|
|
|
|
|
|
|
for a in answers:
|
|
|
|
record = a.to_text() # for ex '20 alt2.aspmx.l.google.com.'
|
2019-12-30 19:34:38 +01:00
|
|
|
parts = record.split(" ")
|
2019-12-27 23:36:13 +01:00
|
|
|
|
2019-12-30 19:34:38 +01:00
|
|
|
ret.append((int(parts[0]), parts[1]))
|
2019-12-02 01:13:39 +01:00
|
|
|
|
|
|
|
return ret
|
2019-12-06 11:54:01 +01:00
|
|
|
|
|
|
|
|
|
|
|
_include_spf = "include:"
|
|
|
|
|
|
|
|
|
|
|
|
def get_spf_domain(hostname) -> [str]:
|
|
|
|
"""return all domains listed in *include:*"""
|
|
|
|
try:
|
2022-02-24 15:05:05 +01:00
|
|
|
answers = _get_dns_resolver().resolve(hostname, "TXT", search=True)
|
2020-01-02 22:15:08 +01:00
|
|
|
except Exception:
|
2019-12-06 11:54:01 +01:00
|
|
|
return []
|
|
|
|
|
|
|
|
ret = []
|
|
|
|
|
|
|
|
for a in answers: # type: dns.rdtypes.ANY.TXT.TXT
|
|
|
|
for record in a.strings:
|
|
|
|
record = record.decode() # record is bytes
|
|
|
|
|
|
|
|
if record.startswith("v=spf1"):
|
|
|
|
parts = record.split(" ")
|
|
|
|
for part in parts:
|
|
|
|
if part.startswith(_include_spf):
|
2020-12-06 22:11:58 +01:00
|
|
|
ret.append(part[part.find(_include_spf) + len(_include_spf) :])
|
2019-12-06 11:54:01 +01:00
|
|
|
|
|
|
|
return ret
|
2019-12-27 23:36:13 +01:00
|
|
|
|
|
|
|
|
|
|
|
def get_txt_record(hostname) -> [str]:
|
|
|
|
try:
|
2022-02-24 15:05:05 +01:00
|
|
|
answers = _get_dns_resolver().resolve(hostname, "TXT", search=True)
|
2020-01-02 22:15:08 +01:00
|
|
|
except Exception:
|
2019-12-27 23:36:13 +01:00
|
|
|
return []
|
|
|
|
|
|
|
|
ret = []
|
|
|
|
|
|
|
|
for a in answers: # type: dns.rdtypes.ANY.TXT.TXT
|
|
|
|
for record in a.strings:
|
|
|
|
record = record.decode() # record is bytes
|
|
|
|
|
|
|
|
ret.append(record)
|
|
|
|
|
2020-05-03 12:01:31 +02:00
|
|
|
return ret
|
2021-11-23 14:31:53 +01:00
|
|
|
|
|
|
|
|
|
|
|
def is_mx_equivalent(
|
|
|
|
mx_domains: List[Tuple[int, str]], ref_mx_domains: List[Tuple[int, str]]
|
|
|
|
) -> bool:
|
|
|
|
"""
|
|
|
|
Compare mx_domains with ref_mx_domains to see if they are equivalent.
|
|
|
|
mx_domains and ref_mx_domains are list of (priority, domain)
|
|
|
|
|
|
|
|
The priority order is taken into account but not the priority number.
|
|
|
|
For example, [(1, domain1), (2, domain2)] is equivalent to [(10, domain1), (20, domain2)]
|
|
|
|
"""
|
|
|
|
mx_domains = sorted(mx_domains, key=lambda priority_domain: priority_domain[0])
|
|
|
|
ref_mx_domains = sorted(
|
|
|
|
ref_mx_domains, key=lambda priority_domain: priority_domain[0]
|
|
|
|
)
|
|
|
|
|
2022-02-24 17:23:45 +01:00
|
|
|
if len(mx_domains) < len(ref_mx_domains):
|
2021-11-23 14:31:53 +01:00
|
|
|
return False
|
|
|
|
|
2022-02-24 17:23:45 +01:00
|
|
|
for i in range(0, len(ref_mx_domains)):
|
2021-11-23 14:31:53 +01:00
|
|
|
if mx_domains[i][1] != ref_mx_domains[i][1]:
|
|
|
|
return False
|
|
|
|
|
|
|
|
return True
|