httpz

- Hyper-fast HTTP Scraping Tool
git clone git://git.acid.vegas/httpz.git
Log | Files | Refs | Archive | README | LICENSE

cli.py (8371B)

      1 #!/usr/bin/env python3
      2 # HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
      3 # httpz_scanner/cli.py
      4 
      5 import argparse
      6 import asyncio
      7 import json
      8 import logging
      9 import os
     10 import sys
     11 
     12 from datetime import datetime
     13 
     14 from .colors     import Colors
     15 from .formatters import format_console_output
     16 from .parsers    import parse_status_codes, parse_shard
     17 from .scanner    import HTTPZScanner
     18 from .utils      import SILENT_MODE, info
     19 
     20 
     21 def setup_logging(level='INFO', log_to_disk=False):
     22     '''
     23     Setup logging configuration
     24     
     25     :param level: Logging level (INFO or DEBUG)
     26     :param log_to_disk: Whether to also log to file
     27     '''
     28 
     29     class ColoredFormatter(logging.Formatter):
     30         def formatTime(self, record):
     31             dt = datetime.fromtimestamp(record.created)
     32             return f'{Colors.GRAY}{dt.strftime("%m-%d %H:%M")}{Colors.RESET}'
     33         
     34         def format(self, record):
     35             return f'{self.formatTime(record)} {record.getMessage()}'
     36     
     37     # Setup logging handlers
     38     handlers = []
     39     
     40     # Console handler
     41     console = logging.StreamHandler()
     42     console.setFormatter(ColoredFormatter())
     43     handlers.append(console)
     44     
     45     # File handler
     46     if log_to_disk:
     47         os.makedirs('logs', exist_ok=True)
     48         file_handler = logging.FileHandler(f'logs/httpz.log')
     49         file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
     50         handlers.append(file_handler)
     51     
     52     # Setup logger
     53     logging.basicConfig(level=getattr(logging, level.upper()), handlers=handlers)
     54 
     55 
     56 async def main():
     57     parser = argparse.ArgumentParser(description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter)
     58 
     59     # Add arguments
     60     parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin')
     61     parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags')
     62     parser.add_argument('-d',   '--debug', action='store_true', help='Show error states and debug information')
     63     parser.add_argument('-c',   '--concurrent', type=int, default=100, help='Number of concurrent checks')
     64     parser.add_argument('-j',   '--jsonl', action='store_true', help='Output JSON Lines format to console')
     65     parser.add_argument('-o',   '--output', help='Output file path (JSONL format)')
     66     
     67     # Output field flags
     68     parser.add_argument('-b',   '--body', action='store_true', help='Show body preview')
     69     parser.add_argument('-cn',  '--cname', action='store_true', help='Show CNAME records')
     70     parser.add_argument('-cl',  '--content-length', action='store_true', help='Show content length')
     71     parser.add_argument('-ct',  '--content-type', action='store_true', help='Show content type')
     72     parser.add_argument('-f',   '--favicon', action='store_true', help='Show favicon hash')
     73     parser.add_argument('-fr',  '--follow-redirects', action='store_true', help='Follow redirects (max 10)')
     74     parser.add_argument('-hr',  '--show-headers', action='store_true', help='Show response headers')
     75     parser.add_argument('-i',   '--ip', action='store_true', help='Show IP addresses')
     76     parser.add_argument('-sc',  '--status-code', action='store_true', help='Show status code')
     77     parser.add_argument('-ti',  '--title', action='store_true', help='Show page title')
     78     parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information')
     79     
     80     # Other arguments
     81     parser.add_argument('-ax', '--axfr', action='store_true', help='Try AXFR transfer against nameservers')
     82     parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)')
     83     parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)')
     84     parser.add_argument('-p',  '--progress', action='store_true', help='Show progress counter')
     85     parser.add_argument('-pd', '--post-data', help='Send POST request with this data')
     86     parser.add_argument('-r',  '--resolvers', help='File containing DNS resolvers (one per line)')
     87     parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds')
     88     
     89     # Add shard argument
     90     parser.add_argument('-sh','--shard', type=parse_shard, help='Shard index and total shards (e.g., 1/3)')
     91 
     92     # Add this to the argument parser section
     93     parser.add_argument('-pa', '--paths', help='Additional paths to check (comma-separated, e.g., ".git/config,.env")')
     94     
     95     # Add these arguments in the parser section
     96     parser.add_argument('-hd', '--headers', help='Custom headers to send with each request (format: "Header1: value1,Header2: value2")')
     97     
     98     # If no arguments provided, print help and exit
     99     if len(sys.argv) == 1:
    100         parser.print_help()
    101         sys.exit(0)
    102     
    103     args = parser.parse_args()
    104 
    105     # Setup logging based on arguments
    106     global SILENT_MODE
    107     SILENT_MODE = args.jsonl
    108 
    109     if not SILENT_MODE:
    110         if args.debug:
    111             setup_logging(level='DEBUG', log_to_disk=True)
    112         else:
    113             setup_logging(level='INFO')
    114 
    115         if args.file == '-':
    116             info('Reading domains from stdin')
    117         else:
    118             info(f'Processing file: {args.file}')
    119 
    120     # Setup show_fields
    121     show_fields = {
    122         'status_code'      : args.all_flags or args.status_code,
    123         'content_type'     : args.all_flags or args.content_type,
    124         'content_length'   : args.all_flags or args.content_length,
    125         'title'            : args.all_flags or args.title,
    126         'body'             : args.all_flags or args.body,
    127         'ip'               : args.all_flags or args.ip,
    128         'favicon'          : args.all_flags or args.favicon,
    129         'headers'          : args.all_flags or args.show_headers,
    130         'follow_redirects' : args.all_flags or args.follow_redirects,
    131         'cname'            : args.all_flags or args.cname,
    132         'tls'              : args.all_flags or args.tls_info
    133     }
    134 
    135     # If no fields specified show all
    136     if not any(show_fields.values()):
    137         show_fields = {k: True for k in show_fields}
    138 
    139     try:
    140         scanner = HTTPZScanner(
    141             concurrent_limit=args.concurrent,
    142             timeout=args.timeout,
    143             follow_redirects=args.all_flags or args.follow_redirects,
    144             check_axfr=args.axfr,
    145             resolver_file=args.resolvers,
    146             output_file=args.output,
    147             show_progress=args.progress,
    148             debug_mode=args.debug,
    149             jsonl_output=args.jsonl,
    150             show_fields=show_fields,
    151             match_codes=args.match_codes,
    152             exclude_codes=args.exclude_codes,
    153             shard=args.shard,
    154             paths=args.paths.split(',') if args.paths else None,
    155             custom_headers=dict(h.split(': ', 1) for h in args.headers.split(',')) if args.headers else None,
    156             post_data=args.post_data
    157         )
    158 
    159         count = 0
    160         async for result in scanner.scan(args.file):
    161             # Write to output file if specified
    162             if args.output:
    163                 with open(args.output, 'a') as f:
    164                     f.write(json.dumps(result) + '\n')
    165                     f.flush()  # Ensure file output is immediate
    166             
    167             # Handle JSON output separately
    168             if args.jsonl:
    169                 print(json.dumps(result), flush=True)  # Force flush
    170                 continue
    171 
    172             # Only output and increment counter if we have content to show for normal output
    173             formatted = format_console_output(result, args.debug, show_fields, args.match_codes, args.exclude_codes)
    174             if formatted:
    175                 if args.progress:
    176                     count += 1
    177                     info(f"[{count}] {formatted}")
    178                     sys.stdout.flush()  # Force flush after each domain
    179                 else:
    180                     print(formatted, flush=True)  # Force flush
    181 
    182     except KeyboardInterrupt:
    183         logging.warning('Process interrupted by user')
    184         sys.exit(1)
    185     except Exception as e:
    186         logging.error(f'Unexpected error: {str(e)}')
    187         sys.exit(1)
    188 
    189 
    190 def run():
    191     '''Entry point for the CLI'''
    192     asyncio.run(main())
    193 
    194 
    195 
    196 if __name__ == '__main__':
    197     run()