#!/usr/bin/env python2 """ Tests for your DNS resolver and server """ portnr = 5353 server = "localhost" import dns.resolver from dns.resource import ResourceRecord, RecordData from dns.classes import Class from dns.types import Type import unittest import sys import time class TestResolver(unittest.TestCase): # solve a FQDN, output with corresponding IP/CNAME/authoitative status generated def test_solve(self): resolver = dns.resolver.Resolver(False, 0) host, alias, ip = resolver.gethostbyname("mail.polvanaubel.com") self.assertEqual(host, "sog.polvanaubel.com") self.assertEqual(alias, ["mail.polvanaubel.com"]) self.assertEqual(ip, ["138.201.39.104"]) def test_invalid(self): resolver = dns.resolver.Resolver(False, 0) host, alias, ip = resolver.gethostbyname("invalid.example.com") self.assertFalse(host) self.assertFalse(alias) self.assertFalse(ip) class TestResolverCache(unittest.TestCase): # solve an invalid cached FQDN, output corresponds to cache def test_solveinv(self): resolver = dns.resolver.Resolver(True, 0) resolver.cache.add_record(ResourceRecord("invalid.example.com", Type.A, Class.IN, 60, RecordData("1.2.3.4"))) host, alias, ip = resolver.gethostbyname("invalid.example.com") self.assertEqual(host, "invalid.example.com") self.assertEqual(alias, []) self.assertEqual(ip, ["1.2.3.4"]) # start your server and wait configured TTL + 1 time for an invalid cached FQDN to expire, # an empty output should be generated def test_ttlexpire(self): resolver = dns.resolver.Resolver(True, 0.05) resolver.cache.add_record(ResourceRecord("invalid.example.com", Type.A, Class.IN, 60, RecordData("1.2.3.4"))) time.sleep(0.06) host, alias, ip = resolver.gethostbyname("invalid.example.com") self.assertFalse(host) self.assertFalse(alias) self.assertFalse(ip) import socket class TestServer(unittest.TestCase): # solve a query for a FQDN for which your server has direct authority def test_solve_auth(self): resolver = dns.resolver.Resolver(True, 0) resolver.cache.add_record(ResourceRecord("localhost", Type.A, Class.IN, 60, RecordData("127.0.0.1"))) resolver.cache.add_record(ResourceRecord("test.com", Type.NS, Class.IN, 60, RecordData(server + ":" + str(portnr)))) host, alias, ip = resolver.gethostbyname("cnametest.test.com") self.assertEqual(host, "hello.test.com") self.assertEqual(alias, ["cnametest.test.com"]) self.assertEqual(ip, ["1.2.3.4"]) # solve a query for a FQDN for which your server does not have direct authority, # yet there is a name server in your zone which does def test_solve_ns_inside(self): # I had a bit of trouble figuring this one out # I think it would require me to set up two servers resolver = dns.resolver.Resolver(True, 0) ans = resolver.query_dns("a.subzone.test.com", Type.A, (server, portnr), False) self.assertEqual(ans.authorities[0].rdata.data, "nameserver.test.com") self.assertEqual(ans.additionals[0].name, "nameserver.test.com") # solve a query for a fqdn which points outside your zone def test_solve_outside(self): resolver = dns.resolver.Resolver(True, 0) # resolver.cache.add_record(ResourceRecord("localhost", Type.A, Class.IN, 60, RecordData("127.0.0.1"))) # resolver.cache.add_record(ResourceRecord("edu", Type.NS, Class.IN, 60, RecordData(server + ":" + str(portnr)))) # host, alias, ip = resolver.gethostbyname("gaia.cs.umass.edu") ans = resolver.query_dns("gaia.cs.umass.edu", Type.A, (server, portnr), True) self.assertEqual(ans.answers[0].rdata.data, "128.119.245.12") # solve parallel requests for different FQDN, their servicing should be made in parallel and correct # responses should be generated def test_solve_parallel(self): resolver = dns.resolver.Resolver(True, 0) question1 = dns.message.Question("gaia.cs.umass.edu", Type.A, Class.IN) question2 = dns.message.Question("yori.cc", Type.A, Class.IN) query1 = dns.message.makepacket({"qr": 0, "opcode": 0, "rd": True}, [question1], ident=10) query2 = dns.message.makepacket({"qr": 0, "opcode": 0, "rd": True}, [question2], ident=20) timeout = 2 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(timeout) sock.sendto(query1.to_bytes(), (server, portnr)) sock.sendto(query2.to_bytes(), (server, portnr)) # Receive response for i in range(2): data = sock.recv(65530) # max MTU is not this value response = dns.message.Message.from_bytes(data) if response.header.ident == 10: self.assertEqual(response.answers[0].rdata.data, "128.119.245.12") elif response.header.ident == 20: self.assertEqual(response.answers[0].rdata.data, "138.201.39.105") else: self.assertFalse(True) sock.close() if __name__ == "__main__": # Parse command line arguments import argparse parser = argparse.ArgumentParser(description="HTTP Tests") parser.add_argument("-s", "--server", type=str, default="localhost") parser.add_argument("-p", "--port", type=int, default=5001) args, extra = parser.parse_known_args() portnr = args.port server = args.server # Pass the extra arguments to unittest sys.argv[1:] = extra # Start test suite unittest.main()