app-MAIL-temp/app/dns_utils.py

87 lines
2 KiB
Python
Raw Normal View History

2019-12-02 01:13:39 +01:00
import dns.resolver
def _get_dns_resolver():
my_resolver = dns.resolver.Resolver()
2020-01-18 23:13:20 +01:00
# 1.1.1.1 is CloudFlare's public DNS server
my_resolver.nameservers = ["1.1.1.1"]
return my_resolver
def get_mx_domains(hostname) -> [(int, str)]:
"""return list of (priority, domain name).
domain name ends with a "." at the end.
2019-12-27 23:36:13 +01:00
"""
2019-12-07 23:42:57 +01:00
try:
answers = _get_dns_resolver().query(hostname, "MX")
except Exception:
2019-12-07 23:42:57 +01:00
return []
2019-12-02 01:13:39 +01:00
ret = []
for a in answers:
record = a.to_text() # for ex '20 alt2.aspmx.l.google.com.'
parts = record.split(" ")
2019-12-27 23:36:13 +01:00
ret.append((int(parts[0]), parts[1]))
2019-12-02 01:13:39 +01:00
return ret
2019-12-06 11:54:01 +01:00
_include_spf = "include:"
def get_spf_domain(hostname) -> [str]:
"""return all domains listed in *include:*"""
try:
answers = _get_dns_resolver().query(hostname, "TXT")
2020-01-02 22:15:08 +01:00
except Exception:
2019-12-06 11:54:01 +01:00
return []
ret = []
for a in answers: # type: dns.rdtypes.ANY.TXT.TXT
for record in a.strings:
record = record.decode() # record is bytes
if record.startswith("v=spf1"):
parts = record.split(" ")
for part in parts:
if part.startswith(_include_spf):
ret.append(part[part.find(_include_spf) + len(_include_spf) :])
return ret
2019-12-27 23:36:13 +01:00
def get_txt_record(hostname) -> [str]:
try:
answers = _get_dns_resolver().query(hostname, "TXT")
2020-01-02 22:15:08 +01:00
except Exception:
2019-12-27 23:36:13 +01:00
return []
ret = []
for a in answers: # type: dns.rdtypes.ANY.TXT.TXT
ret.append(a)
2019-12-27 23:36:13 +01:00
return ret
def get_dkim_record(hostname) -> str:
"""query the dkim._domainkey.{hostname} record and returns its value"""
try:
answers = _get_dns_resolver().query(f"dkim._domainkey.{hostname}", "TXT")
2020-01-02 22:15:08 +01:00
except Exception:
2019-12-27 23:36:13 +01:00
return ""
ret = []
for a in answers: # type: dns.rdtypes.ANY.TXT.TXT
for record in a.strings:
record = record.decode() # record is bytes
ret.append(record)
return "".join(ret)