net-dnsserver/dns_tests.py

126 lines
5.6 KiB
Python

#!/usr/bin/env python2
""" Tests for your DNS resolver and server """
portnr = 5353
server = "localhost"
import dns.resolver
from dns.resource import ResourceRecord, RecordData
from dns.classes import Class
from dns.types import Type
import unittest
import sys
import time
class TestResolver(unittest.TestCase):
# solve a FQDN, output with corresponding IP/CNAME/authoitative status generated
def test_solve(self):
resolver = dns.resolver.Resolver(False, 0)
host, alias, ip = resolver.gethostbyname("mail.polvanaubel.com")
self.assertEqual(host, "sog.polvanaubel.com")
self.assertEqual(alias, ["mail.polvanaubel.com"])
self.assertEqual(ip, ["138.201.39.104"])
def test_invalid(self):
resolver = dns.resolver.Resolver(False, 0)
host, alias, ip = resolver.gethostbyname("invalid.example.com")
self.assertFalse(host)
self.assertFalse(alias)
self.assertFalse(ip)
class TestResolverCache(unittest.TestCase):
# solve an invalid cached FQDN, output corresponds to cache
def test_solveinv(self):
resolver = dns.resolver.Resolver(True, 0)
resolver.cache.add_record(ResourceRecord("invalid.example.com", Type.A, Class.IN, 60, RecordData("1.2.3.4")))
host, alias, ip = resolver.gethostbyname("invalid.example.com")
self.assertEqual(host, "invalid.example.com")
self.assertEqual(alias, [])
self.assertEqual(ip, ["1.2.3.4"])
# start your server and wait configured TTL + 1 time for an invalid cached FQDN to expire,
# an empty output should be generated
def test_ttlexpire(self):
resolver = dns.resolver.Resolver(True, 0.05)
resolver.cache.add_record(ResourceRecord("invalid.example.com", Type.A, Class.IN, 60, RecordData("1.2.3.4")))
time.sleep(0.06)
host, alias, ip = resolver.gethostbyname("invalid.example.com")
self.assertFalse(host)
self.assertFalse(alias)
self.assertFalse(ip)
import socket
class TestServer(unittest.TestCase):
# solve a query for a FQDN for which your server has direct authority
def test_solve_auth(self):
resolver = dns.resolver.Resolver(True, 0)
resolver.cache.add_record(ResourceRecord("localhost", Type.A, Class.IN, 60, RecordData("127.0.0.1")))
resolver.cache.add_record(ResourceRecord("test.com", Type.NS, Class.IN, 60, RecordData(server + ":" + str(portnr))))
host, alias, ip = resolver.gethostbyname("cnametest.test.com")
self.assertEqual(host, "hello.test.com")
self.assertEqual(alias, ["cnametest.test.com"])
self.assertEqual(ip, ["1.2.3.4"])
# solve a query for a FQDN for which your server does not have direct authority,
# yet there is a name server in your zone which does
def test_solve_ns_inside(self):
# I had a bit of trouble figuring this one out
# I think it would require me to set up two servers
resolver = dns.resolver.Resolver(True, 0)
ans = resolver.query_dns("a.subzone.test.com", Type.A, (server, portnr), False)
self.assertEqual(ans.authorities[0].rdata.data, "nameserver.test.com")
self.assertEqual(ans.additionals[0].name, "nameserver.test.com")
# solve a query for a fqdn which points outside your zone
def test_solve_outside(self):
resolver = dns.resolver.Resolver(True, 0)
# resolver.cache.add_record(ResourceRecord("localhost", Type.A, Class.IN, 60, RecordData("127.0.0.1")))
# resolver.cache.add_record(ResourceRecord("edu", Type.NS, Class.IN, 60, RecordData(server + ":" + str(portnr))))
# host, alias, ip = resolver.gethostbyname("gaia.cs.umass.edu")
ans = resolver.query_dns("gaia.cs.umass.edu", Type.A, (server, portnr), True)
self.assertEqual(ans.answers[0].rdata.data, "128.119.245.12")
# solve parallel requests for different FQDN, their servicing should be made in parallel and correct
# responses should be generated
def test_solve_parallel(self):
resolver = dns.resolver.Resolver(True, 0)
question1 = dns.message.Question("gaia.cs.umass.edu", Type.A, Class.IN)
question2 = dns.message.Question("yori.cc", Type.A, Class.IN)
query1 = dns.message.makepacket({"qr": 0, "opcode": 0, "rd": True}, [question1], ident=10)
query2 = dns.message.makepacket({"qr": 0, "opcode": 0, "rd": True}, [question2], ident=20)
timeout = 2
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
sock.sendto(query1.to_bytes(), (server, portnr))
sock.sendto(query2.to_bytes(), (server, portnr))
# Receive response
for i in range(2):
data = sock.recv(65530) # max MTU is not this value
response = dns.message.Message.from_bytes(data)
if response.header.ident == 10:
self.assertEqual(response.answers[0].rdata.data, "128.119.245.12")
elif response.header.ident == 20:
self.assertEqual(response.answers[0].rdata.data, "138.201.39.105")
else:
self.assertFalse(True)
sock.close()
if __name__ == "__main__":
# Parse command line arguments
import argparse
parser = argparse.ArgumentParser(description="HTTP Tests")
parser.add_argument("-s", "--server", type=str, default="localhost")
parser.add_argument("-p", "--port", type=int, default=5001)
args, extra = parser.parse_known_args()
portnr = args.port
server = args.server
# Pass the extra arguments to unittest
sys.argv[1:] = extra
# Start test suite
unittest.main()