httpz- Hyper-fast HTTP Scraping Tool |
git clone git://git.acid.vegas/httpz.git |
Log | Files | Refs | Archive | README | LICENSE |
dns.py (4008B)
1 #!/usr/bin/env python3 2 # HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz) 3 # httpz/dns.py 4 5 import asyncio 6 import os 7 import aiohttp 8 import dns.asyncresolver 9 import dns.query 10 import dns.resolver 11 import dns.zone 12 13 from .utils import debug, info, SILENT_MODE 14 15 async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, check_axfr: bool = False) -> tuple: 16 ''' 17 Resolve all DNS records for a domain 18 19 :param domain: Domain to resolve 20 :param timeout: Timeout in seconds 21 :param nameserver: Specific nameserver to use 22 :param check_axfr: Whether to attempt zone transfer 23 ''' 24 resolver = dns.asyncresolver.Resolver() 25 resolver.lifetime = timeout 26 if nameserver: 27 resolver.nameservers = [nameserver] 28 29 results = await asyncio.gather(*[resolver.resolve(domain, rtype) 30 for rtype in ('NS', 'A', 'AAAA', 'CNAME')], 31 return_exceptions=True) 32 33 nameservers = [str(ns).rstrip('.') for ns in results[0]] if isinstance(results[0], dns.resolver.Answer) else [] 34 ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + \ 35 ([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else []) 36 cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None 37 38 ns_ips = {} 39 if nameservers: 40 ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype) 41 for ns in nameservers 42 for rtype in ('A', 'AAAA')], 43 return_exceptions=True) 44 for i, ns in enumerate(nameservers): 45 ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2] 46 if isinstance(records, dns.resolver.Answer) 47 for ip in records] 48 49 if check_axfr: 50 await attempt_axfr(domain, ns_ips, timeout) 51 52 return sorted(set(ips)), cname, nameservers, ns_ips 53 54 async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None: 55 ''' 56 Attempt zone transfer for a domain 57 58 :param domain: Domain to attempt AXFR transfer 59 :param ns_ips: Dictionary of nameserver hostnames to their IPs 60 :param timeout: Timeout in seconds 61 ''' 62 try: 63 os.makedirs('axfrout', exist_ok=True) 64 65 for ns_host, ips in ns_ips.items(): 66 for ns_ip in ips: 67 try: 68 zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout)) 69 with open(f'axfrout/{domain}_{ns_ip}.zone', 'w') as f: 70 zone.to_text(f) 71 info(f'[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip})') 72 except Exception as e: 73 debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}') 74 except Exception as e: 75 debug(f'Failed AXFR for {domain}: {str(e)}') 76 77 async def load_resolvers(resolver_file: str = None) -> list: 78 ''' 79 Load DNS resolvers from file or default source 80 81 :param resolver_file: Path to file containing resolver IPs 82 :return: List of resolver IPs 83 ''' 84 if resolver_file: 85 try: 86 with open(resolver_file) as f: 87 resolvers = [line.strip() for line in f if line.strip()] 88 if resolvers: 89 return resolvers 90 except Exception as e: 91 debug(f'Error loading resolvers from {resolver_file}: {str(e)}') 92 93 async with aiohttp.ClientSession() as session: 94 async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response: 95 resolvers = await response.text() 96 if not SILENT_MODE: 97 info(f'Loaded {len(resolvers.splitlines()):,} resolvers.') 98 return [resolver.strip() for resolver in resolvers.splitlines()]