httpz

- Hyper-fast HTTP Scraping Tool
git clone git://git.acid.vegas/httpz.git
Log | Files | Refs | Archive | README | LICENSE

commit 173b7e3cf0f91d91161123ba8558f8200f948ee1
parent df2a309a0d19608858df87110381744fa0168b4f
Author: acidvegas <acid.vegas@acid.vegas>
Date: Mon, 10 Feb 2025 01:01:37 -0500

Cleanup output and colors

Diffstat:
Mhttpz.py | 110++++++++++++++++++++++++++++++++++++++++---------------------------------------

1 file changed, 56 insertions(+), 54 deletions(-)

diff --git a/httpz.py b/httpz.py
@@ -66,6 +66,8 @@ class Colors:
 	LIGHT_RED  = '\033[38;5;203m' # Light red
 	DARK_GREEN = '\033[38;5;22m'  # Dark green
 	PINK       = '\033[38;5;198m' # Bright pink
+	GRAY       = '\033[90m'  # Gray color
+	CYAN       = '\033[96m'  # Cyan color
 
 
 _SILENT_MODE = False
@@ -86,15 +88,17 @@ def info(msg: str) -> None:
 		logging.info(msg)
 
 
-async def resolve_dns(domain: str) -> tuple:
+async def resolve_dns(domain: str, timeout: int = 5) -> tuple:
 	'''
 	Resolve A, AAAA, and CNAME records for a domain
 	
 	:param domain: domain to resolve
+	:param timeout: timeout in seconds
 	:return: tuple of (ips, cname)
 	'''
 
 	resolver = dns.asyncresolver.Resolver()
+	resolver.lifetime = timeout
 	ips      = []
 	cname    = None
 
@@ -183,7 +187,7 @@ async def get_cert_info(session: aiohttp.ClientSession, url: str) -> dict:
 				return None
 				
 			cert_bin = ssl_object.getpeercert(binary_form=True)
-			cert      = x509.load_der_x509_certificate(cert_bin)
+			cert     = x509.load_der_x509_certificate(cert_bin)
 			
 			# Get certificate details
 			cert_info = {
@@ -221,10 +225,10 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir
 
 	if not domain.startswith(('http://', 'https://')):
 		protocols = ['https://', 'http://']
-		base_domain = domain
+		base_domain = domain.rstrip('/')
 	else:
 		protocols = [domain]
-		base_domain = domain.split('://')[-1].split('/')[0]
+		base_domain = domain.split('://')[-1].split('/')[0].rstrip('/')
 
 	result = {
 		'domain'         : base_domain,
@@ -235,6 +239,7 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir
 		'url'            : f"https://{base_domain}" if base_domain else domain,
 		'ips'            : [],
 		'cname'          : None,
+		'nameservers'    : [],
 		'favicon_hash'   : None,
 		'headers'        : {},
 		'content_length' : None,
@@ -243,7 +248,16 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir
 	}
 
 	# Resolve DNS records
-	result['ips'], result['cname'] = await resolve_dns(base_domain)
+	result['ips'], result['cname'] = await resolve_dns(base_domain, timeout)
+
+	# After DNS resolution, add nameserver lookup:
+	try:
+		resolver = dns.asyncresolver.Resolver()
+		resolver.lifetime = timeout
+		ns_records = await resolver.resolve(base_domain, 'NS')
+		result['nameservers'] = [str(ns).rstrip('.') for ns in ns_records]
+	except Exception as e:
+		debug(f'Error getting nameservers for {base_domain}: {str(e)}')
 
 	for protocol in protocols:
 		url = f'{protocol}{base_domain}'
@@ -270,10 +284,10 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir
 					html = (await response.text())[:1024*1024]
 					soup = bs4.BeautifulSoup(html, 'html.parser')
 					if soup.title:
-						title = ' '.join(soup.title.string.strip().split()) if soup.title.string else ''
+						title = ' '.join(soup.title.string.strip().split()).rstrip('.') if soup.title.string else ''
 						result['title'] = title[:300]
 					if soup.get_text():
-						body = ' '.join(soup.get_text().split())
+						body = ' '.join(soup.get_text().split()).rstrip('.')
 						result['body'] = body[:500]
 					result['favicon_hash'] = await get_favicon_hash(session, url, html)
 					break
@@ -283,7 +297,7 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir
 			continue
 
 	if check_axfr:
-		await try_axfr(base_domain)
+		await try_axfr(base_domain, timeout)
 
 	return result
 
@@ -401,7 +415,7 @@ def format_status_output(result: dict, debug: bool = False, show_fields: dict = 
 		headers_text = []
 		for k, v in result['headers'].items():
 			headers_text.append(f"{k}: {v}")
-		parts.append(f"{Colors.LIGHT_RED}[{', '.join(headers_text)}]{Colors.RESET}")
+		parts.append(f"{Colors.CYAN}[{', '.join(headers_text)}]{Colors.RESET}")
 	else:
 		# Only show content-type and content-length if headers aren't shown
 		if show_fields['content_type'] and result['content_type']:
@@ -438,20 +452,6 @@ def format_status_output(result: dict, debug: bool = False, show_fields: dict = 
 	return ' '.join(parts)
 
 
-def count_domains(input_source: str = None) -> int:
-	'''
-	Count total number of domains from file or stdin
-	
-	:param input_source: path to file containing domains, or None for stdin
-	'''
-	if input_source == '-' or input_source is None:
-		# Can't count lines from stdin without consuming them
-		return 0
-	else:
-		with open(input_source, 'r') as f:
-			return sum(1 for line in f if line.strip())
-
-
 async def process_domains(input_source: str = None, debug: bool = False, concurrent_limit: int = 100, show_fields: dict = None, output_file: str = None, jsonl: bool = None, timeout: int = 5, match_codes: set = None, exclude_codes: set = None, show_progress: bool = False, check_axfr: bool = False):
 	'''
 	Process domains from a file or stdin with concurrent requests
@@ -471,15 +471,12 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
 	if input_source and input_source != '-' and not Path(input_source).exists():
 		raise FileNotFoundError(f'Domain file not found: {input_source}')
 
-	# Get total domain count if showing progress (only works for files)
-	total_domains     = count_domains(input_source) if show_progress else 0
-	processed_domains = 0
-
 	# Clear the output file if specified
 	if output_file:
 		open(output_file, 'w').close()
 
 	tasks = set()
+	processed_domains = 0  # Simple counter for all processed domains
 	
 	async def write_result(result: dict):
 		'''Write a single result to the output file'''
@@ -505,6 +502,8 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
 			output_dict['redirect_chain'] = result['redirect_chain']
 		if result['tls']:
 			output_dict['tls'] = result['tls']
+		if result['nameservers']:
+			output_dict['nameservers'] = result['nameservers']
 
 		# Get formatted output based on filters
 		formatted = format_status_output(result, debug, show_fields, match_codes, exclude_codes)
@@ -518,22 +517,16 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
 			
 			# Console output
 			if jsonl:
-				# Pure JSON Lines output without any logging prefixes
 				print(json.dumps(output_dict))
 			else:
+				processed_domains += 1  # Increment counter for each domain processed
 				if show_progress:
-					processed_domains += 1
-					info(f"{Colors.BOLD}[{processed_domains}/{total_domains}]{Colors.RESET} {formatted}")
-				else:
-					info(formatted)
+					info(f"{Colors.GRAY}[{processed_domains}]{Colors.RESET} {formatted}")
 
 	async with aiohttp.ClientSession() as session:
 		# Start initial batch of tasks
 		for domain in itertools.islice(domain_generator(input_source), concurrent_limit):
-			task = asyncio.create_task(check_domain(session, domain, 
-												  follow_redirects=show_fields['follow_redirects'], 
-												  timeout=timeout,
-												  check_axfr=check_axfr))
+			task = asyncio.create_task(check_domain(session, domain, follow_redirects=show_fields['follow_redirects'], timeout=timeout, check_axfr=check_axfr))
 			tasks.add(task)
 		
 		# Process remaining domains, maintaining concurrent_limit active tasks
@@ -548,10 +541,7 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
 				result = await task
 				await write_result(result)
 			
-			task = asyncio.create_task(check_domain(session, domain, 
-												  follow_redirects=show_fields['follow_redirects'], 
-												  timeout=timeout,
-												  check_axfr=check_axfr))
+			task = asyncio.create_task(check_domain(session, domain, follow_redirects=show_fields['follow_redirects'], timeout=timeout, check_axfr=check_axfr))
 			tasks.add(task)
 		
 		# Wait for remaining tasks
@@ -562,11 +552,12 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
 				await write_result(result)
 
 
-async def try_axfr(domain: str) -> None:
+async def try_axfr(domain: str, timeout: int = 5) -> None:
 	'''
 	Try AXFR transfer for a domain against all its nameservers
 	
 	:param domain: Domain to attempt AXFR transfer
+	:param timeout: timeout in seconds
 	'''
 	
 	try:
@@ -575,25 +566,36 @@ async def try_axfr(domain: str) -> None:
 		
 		# Get nameservers
 		resolver = dns.asyncresolver.Resolver()
-		nameservers = await resolver.resolve(domain, 'NS')
+		resolver.lifetime = timeout
+		ns_records  = await resolver.resolve(domain, 'NS')
+		nameservers = [str(ns).rstrip('.') for ns in ns_records]
 		
-		# Try AXFR against each nameserver
-		for ns in nameservers:
-			ns_host = str(ns).rstrip('.')
+		# Try AXFR against each nameserver's IPs
+		for ns_host in nameservers:
 			try:
-				# Get nameserver IP
-				ns_ips = await resolver.resolve(ns_host, 'A')
-				for ns_ip in ns_ips:
-					ns_ip = str(ns_ip)
+				# Get A records
+				a_ips = []
+				try:
+					a_records = await resolver.resolve(ns_host, 'A')
+					a_ips.extend(str(ip) for ip in a_records)
+				except Exception as e:
+					debug(f'Failed to get A records for {ns_host}: {str(e)}')
+
+				# Get AAAA records
+				try:
+					aaaa_records = await resolver.resolve(ns_host, 'AAAA')
+					a_ips.extend(str(ip) for ip in aaaa_records)
+				except Exception as e:
+					debug(f'Failed to get AAAA records for {ns_host}: {str(e)}')
+
+				# Try AXFR against each IP
+				for ns_ip in a_ips:
 					try:
-						# Attempt zone transfer
-						zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain))
-						
-						# Save successful transfer
+						zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout+10))
 						filename = f'axfrout/{domain}_{ns_ip}.zone'
 						with open(filename, 'w') as f:
 							zone.to_text(f)
-						info(f'{Colors.RED}[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip}){Colors.RESET}')
+						info(f'{Colors.GREEN}[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip}){Colors.RESET}')
 					except Exception as e:
 						debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}')
 			except Exception as e: