httpz- Hyper-fast HTTP Scraping Tool |
git clone git://git.acid.vegas/httpz.git |
Log | Files | Refs | Archive | README | LICENSE |
cli.py (8371B)
1 #!/usr/bin/env python3 2 # HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz) 3 # httpz_scanner/cli.py 4 5 import argparse 6 import asyncio 7 import json 8 import logging 9 import os 10 import sys 11 12 from datetime import datetime 13 14 from .colors import Colors 15 from .formatters import format_console_output 16 from .parsers import parse_status_codes, parse_shard 17 from .scanner import HTTPZScanner 18 from .utils import SILENT_MODE, info 19 20 21 def setup_logging(level='INFO', log_to_disk=False): 22 ''' 23 Setup logging configuration 24 25 :param level: Logging level (INFO or DEBUG) 26 :param log_to_disk: Whether to also log to file 27 ''' 28 29 class ColoredFormatter(logging.Formatter): 30 def formatTime(self, record): 31 dt = datetime.fromtimestamp(record.created) 32 return f'{Colors.GRAY}{dt.strftime("%m-%d %H:%M")}{Colors.RESET}' 33 34 def format(self, record): 35 return f'{self.formatTime(record)} {record.getMessage()}' 36 37 # Setup logging handlers 38 handlers = [] 39 40 # Console handler 41 console = logging.StreamHandler() 42 console.setFormatter(ColoredFormatter()) 43 handlers.append(console) 44 45 # File handler 46 if log_to_disk: 47 os.makedirs('logs', exist_ok=True) 48 file_handler = logging.FileHandler(f'logs/httpz.log') 49 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) 50 handlers.append(file_handler) 51 52 # Setup logger 53 logging.basicConfig(level=getattr(logging, level.upper()), handlers=handlers) 54 55 56 async def main(): 57 parser = argparse.ArgumentParser(description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter) 58 59 # Add arguments 60 parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin') 61 parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags') 62 parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information') 63 parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks') 64 parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console') 65 parser.add_argument('-o', '--output', help='Output file path (JSONL format)') 66 67 # Output field flags 68 parser.add_argument('-b', '--body', action='store_true', help='Show body preview') 69 parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records') 70 parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length') 71 parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type') 72 parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash') 73 parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)') 74 parser.add_argument('-hr', '--show-headers', action='store_true', help='Show response headers') 75 parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses') 76 parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code') 77 parser.add_argument('-ti', '--title', action='store_true', help='Show page title') 78 parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information') 79 80 # Other arguments 81 parser.add_argument('-ax', '--axfr', action='store_true', help='Try AXFR transfer against nameservers') 82 parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)') 83 parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)') 84 parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter') 85 parser.add_argument('-pd', '--post-data', help='Send POST request with this data') 86 parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)') 87 parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds') 88 89 # Add shard argument 90 parser.add_argument('-sh','--shard', type=parse_shard, help='Shard index and total shards (e.g., 1/3)') 91 92 # Add this to the argument parser section 93 parser.add_argument('-pa', '--paths', help='Additional paths to check (comma-separated, e.g., ".git/config,.env")') 94 95 # Add these arguments in the parser section 96 parser.add_argument('-hd', '--headers', help='Custom headers to send with each request (format: "Header1: value1,Header2: value2")') 97 98 # If no arguments provided, print help and exit 99 if len(sys.argv) == 1: 100 parser.print_help() 101 sys.exit(0) 102 103 args = parser.parse_args() 104 105 # Setup logging based on arguments 106 global SILENT_MODE 107 SILENT_MODE = args.jsonl 108 109 if not SILENT_MODE: 110 if args.debug: 111 setup_logging(level='DEBUG', log_to_disk=True) 112 else: 113 setup_logging(level='INFO') 114 115 if args.file == '-': 116 info('Reading domains from stdin') 117 else: 118 info(f'Processing file: {args.file}') 119 120 # Setup show_fields 121 show_fields = { 122 'status_code' : args.all_flags or args.status_code, 123 'content_type' : args.all_flags or args.content_type, 124 'content_length' : args.all_flags or args.content_length, 125 'title' : args.all_flags or args.title, 126 'body' : args.all_flags or args.body, 127 'ip' : args.all_flags or args.ip, 128 'favicon' : args.all_flags or args.favicon, 129 'headers' : args.all_flags or args.show_headers, 130 'follow_redirects' : args.all_flags or args.follow_redirects, 131 'cname' : args.all_flags or args.cname, 132 'tls' : args.all_flags or args.tls_info 133 } 134 135 # If no fields specified show all 136 if not any(show_fields.values()): 137 show_fields = {k: True for k in show_fields} 138 139 try: 140 scanner = HTTPZScanner( 141 concurrent_limit=args.concurrent, 142 timeout=args.timeout, 143 follow_redirects=args.all_flags or args.follow_redirects, 144 check_axfr=args.axfr, 145 resolver_file=args.resolvers, 146 output_file=args.output, 147 show_progress=args.progress, 148 debug_mode=args.debug, 149 jsonl_output=args.jsonl, 150 show_fields=show_fields, 151 match_codes=args.match_codes, 152 exclude_codes=args.exclude_codes, 153 shard=args.shard, 154 paths=args.paths.split(',') if args.paths else None, 155 custom_headers=dict(h.split(': ', 1) for h in args.headers.split(',')) if args.headers else None, 156 post_data=args.post_data 157 ) 158 159 count = 0 160 async for result in scanner.scan(args.file): 161 # Write to output file if specified 162 if args.output: 163 with open(args.output, 'a') as f: 164 f.write(json.dumps(result) + '\n') 165 f.flush() # Ensure file output is immediate 166 167 # Handle JSON output separately 168 if args.jsonl: 169 print(json.dumps(result), flush=True) # Force flush 170 continue 171 172 # Only output and increment counter if we have content to show for normal output 173 formatted = format_console_output(result, args.debug, show_fields, args.match_codes, args.exclude_codes) 174 if formatted: 175 if args.progress: 176 count += 1 177 info(f"[{count}] {formatted}") 178 sys.stdout.flush() # Force flush after each domain 179 else: 180 print(formatted, flush=True) # Force flush 181 182 except KeyboardInterrupt: 183 logging.warning('Process interrupted by user') 184 sys.exit(1) 185 except Exception as e: 186 logging.error(f'Unexpected error: {str(e)}') 187 sys.exit(1) 188 189 190 def run(): 191 '''Entry point for the CLI''' 192 asyncio.run(main()) 193 194 195 196 if __name__ == '__main__': 197 run()