126 lines
5.6 KiB
Python
126 lines
5.6 KiB
Python
#!/usr/bin/env python2
|
|
|
|
""" Tests for your DNS resolver and server """
|
|
|
|
portnr = 5353
|
|
server = "localhost"
|
|
|
|
import dns.resolver
|
|
from dns.resource import ResourceRecord, RecordData
|
|
from dns.classes import Class
|
|
from dns.types import Type
|
|
import unittest
|
|
import sys
|
|
import time
|
|
|
|
class TestResolver(unittest.TestCase):
|
|
# solve a FQDN, output with corresponding IP/CNAME/authoitative status generated
|
|
def test_solve(self):
|
|
resolver = dns.resolver.Resolver(False, 0)
|
|
host, alias, ip = resolver.gethostbyname("mail.polvanaubel.com")
|
|
self.assertEqual(host, "sog.polvanaubel.com")
|
|
self.assertEqual(alias, ["mail.polvanaubel.com"])
|
|
self.assertEqual(ip, ["138.201.39.104"])
|
|
def test_invalid(self):
|
|
resolver = dns.resolver.Resolver(False, 0)
|
|
host, alias, ip = resolver.gethostbyname("invalid.example.com")
|
|
self.assertFalse(host)
|
|
self.assertFalse(alias)
|
|
self.assertFalse(ip)
|
|
|
|
|
|
class TestResolverCache(unittest.TestCase):
|
|
# solve an invalid cached FQDN, output corresponds to cache
|
|
def test_solveinv(self):
|
|
resolver = dns.resolver.Resolver(True, 0)
|
|
resolver.cache.add_record(ResourceRecord("invalid.example.com", Type.A, Class.IN, 60, RecordData("1.2.3.4")))
|
|
host, alias, ip = resolver.gethostbyname("invalid.example.com")
|
|
self.assertEqual(host, "invalid.example.com")
|
|
self.assertEqual(alias, [])
|
|
self.assertEqual(ip, ["1.2.3.4"])
|
|
# start your server and wait configured TTL + 1 time for an invalid cached FQDN to expire,
|
|
# an empty output should be generated
|
|
def test_ttlexpire(self):
|
|
resolver = dns.resolver.Resolver(True, 0.05)
|
|
resolver.cache.add_record(ResourceRecord("invalid.example.com", Type.A, Class.IN, 60, RecordData("1.2.3.4")))
|
|
time.sleep(0.06)
|
|
host, alias, ip = resolver.gethostbyname("invalid.example.com")
|
|
self.assertFalse(host)
|
|
self.assertFalse(alias)
|
|
self.assertFalse(ip)
|
|
|
|
import socket
|
|
|
|
class TestServer(unittest.TestCase):
|
|
# solve a query for a FQDN for which your server has direct authority
|
|
def test_solve_auth(self):
|
|
resolver = dns.resolver.Resolver(True, 0)
|
|
resolver.cache.add_record(ResourceRecord("localhost", Type.A, Class.IN, 60, RecordData("127.0.0.1")))
|
|
resolver.cache.add_record(ResourceRecord("test.com", Type.NS, Class.IN, 60, RecordData(server + ":" + str(portnr))))
|
|
host, alias, ip = resolver.gethostbyname("cnametest.test.com")
|
|
self.assertEqual(host, "hello.test.com")
|
|
self.assertEqual(alias, ["cnametest.test.com"])
|
|
self.assertEqual(ip, ["1.2.3.4"])
|
|
# solve a query for a FQDN for which your server does not have direct authority,
|
|
# yet there is a name server in your zone which does
|
|
def test_solve_ns_inside(self):
|
|
# I had a bit of trouble figuring this one out
|
|
# I think it would require me to set up two servers
|
|
resolver = dns.resolver.Resolver(True, 0)
|
|
ans = resolver.query_dns("a.subzone.test.com", Type.A, (server, portnr), False)
|
|
self.assertEqual(ans.authorities[0].rdata.data, "nameserver.test.com")
|
|
self.assertEqual(ans.additionals[0].name, "nameserver.test.com")
|
|
# solve a query for a fqdn which points outside your zone
|
|
def test_solve_outside(self):
|
|
resolver = dns.resolver.Resolver(True, 0)
|
|
# resolver.cache.add_record(ResourceRecord("localhost", Type.A, Class.IN, 60, RecordData("127.0.0.1")))
|
|
# resolver.cache.add_record(ResourceRecord("edu", Type.NS, Class.IN, 60, RecordData(server + ":" + str(portnr))))
|
|
# host, alias, ip = resolver.gethostbyname("gaia.cs.umass.edu")
|
|
ans = resolver.query_dns("gaia.cs.umass.edu", Type.A, (server, portnr), True)
|
|
self.assertEqual(ans.answers[0].rdata.data, "128.119.245.12")
|
|
# solve parallel requests for different FQDN, their servicing should be made in parallel and correct
|
|
# responses should be generated
|
|
def test_solve_parallel(self):
|
|
resolver = dns.resolver.Resolver(True, 0)
|
|
question1 = dns.message.Question("gaia.cs.umass.edu", Type.A, Class.IN)
|
|
question2 = dns.message.Question("yori.cc", Type.A, Class.IN)
|
|
query1 = dns.message.makepacket({"qr": 0, "opcode": 0, "rd": True}, [question1], ident=10)
|
|
query2 = dns.message.makepacket({"qr": 0, "opcode": 0, "rd": True}, [question2], ident=20)
|
|
|
|
timeout = 2
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.settimeout(timeout)
|
|
|
|
sock.sendto(query1.to_bytes(), (server, portnr))
|
|
sock.sendto(query2.to_bytes(), (server, portnr))
|
|
|
|
# Receive response
|
|
for i in range(2):
|
|
data = sock.recv(65530) # max MTU is not this value
|
|
response = dns.message.Message.from_bytes(data)
|
|
if response.header.ident == 10:
|
|
self.assertEqual(response.answers[0].rdata.data, "128.119.245.12")
|
|
elif response.header.ident == 20:
|
|
self.assertEqual(response.answers[0].rdata.data, "138.201.39.105")
|
|
else:
|
|
self.assertFalse(True)
|
|
sock.close()
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Parse command line arguments
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description="HTTP Tests")
|
|
parser.add_argument("-s", "--server", type=str, default="localhost")
|
|
parser.add_argument("-p", "--port", type=int, default=5001)
|
|
args, extra = parser.parse_known_args()
|
|
portnr = args.port
|
|
server = args.server
|
|
|
|
# Pass the extra arguments to unittest
|
|
sys.argv[1:] = extra
|
|
|
|
# Start test suite
|
|
unittest.main()
|