httpz

- Hyper-fast HTTP Scraping Tool
git clone git://git.acid.vegas/httpz.git
Log | Files | Refs | Archive | README | LICENSE

dns.py (4008B)

      1 #!/usr/bin/env python3
      2 # HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
      3 # httpz/dns.py
      4 
      5 import asyncio
      6 import os
      7 import aiohttp
      8 import dns.asyncresolver
      9 import dns.query
     10 import dns.resolver
     11 import dns.zone
     12 
     13 from .utils import debug, info, SILENT_MODE
     14 
     15 async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, check_axfr: bool = False) -> tuple:
     16     '''
     17     Resolve all DNS records for a domain
     18     
     19     :param domain: Domain to resolve
     20     :param timeout: Timeout in seconds
     21     :param nameserver: Specific nameserver to use
     22     :param check_axfr: Whether to attempt zone transfer
     23     '''
     24     resolver = dns.asyncresolver.Resolver()
     25     resolver.lifetime = timeout
     26     if nameserver:
     27         resolver.nameservers = [nameserver]
     28     
     29     results = await asyncio.gather(*[resolver.resolve(domain, rtype) 
     30                                    for rtype in ('NS', 'A', 'AAAA', 'CNAME')], 
     31                                  return_exceptions=True)
     32     
     33     nameservers = [str(ns).rstrip('.') for ns in results[0]] if isinstance(results[0], dns.resolver.Answer) else []
     34     ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + \
     35           ([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else [])
     36     cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None
     37     
     38     ns_ips = {}
     39     if nameservers:
     40         ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype) 
     41                                           for ns in nameservers 
     42                                           for rtype in ('A', 'AAAA')], 
     43                                         return_exceptions=True)
     44         for i, ns in enumerate(nameservers):
     45             ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2] 
     46                          if isinstance(records, dns.resolver.Answer) 
     47                          for ip in records]
     48 
     49     if check_axfr:
     50         await attempt_axfr(domain, ns_ips, timeout)
     51 
     52     return sorted(set(ips)), cname, nameservers, ns_ips
     53 
     54 async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None:
     55     '''
     56     Attempt zone transfer for a domain
     57     
     58     :param domain: Domain to attempt AXFR transfer
     59     :param ns_ips: Dictionary of nameserver hostnames to their IPs
     60     :param timeout: Timeout in seconds
     61     '''
     62     try:
     63         os.makedirs('axfrout', exist_ok=True)
     64         
     65         for ns_host, ips in ns_ips.items():
     66             for ns_ip in ips:
     67                 try:
     68                     zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout))
     69                     with open(f'axfrout/{domain}_{ns_ip}.zone', 'w') as f:
     70                         zone.to_text(f)
     71                     info(f'[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip})')
     72                 except Exception as e:
     73                     debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}')
     74     except Exception as e:
     75         debug(f'Failed AXFR for {domain}: {str(e)}')
     76 
     77 async def load_resolvers(resolver_file: str = None) -> list:
     78     '''
     79     Load DNS resolvers from file or default source
     80     
     81     :param resolver_file: Path to file containing resolver IPs
     82     :return: List of resolver IPs
     83     '''
     84     if resolver_file:
     85         try:
     86             with open(resolver_file) as f:
     87                 resolvers = [line.strip() for line in f if line.strip()]
     88             if resolvers:
     89                 return resolvers
     90         except Exception as e:
     91             debug(f'Error loading resolvers from {resolver_file}: {str(e)}')
     92 
     93     async with aiohttp.ClientSession() as session:
     94         async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response:
     95             resolvers = await response.text()
     96             if not SILENT_MODE:
     97                 info(f'Loaded {len(resolvers.splitlines()):,} resolvers.')
     98             return [resolver.strip() for resolver in resolvers.splitlines()]