D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
3
/
root
/
opt
/
dedrads
/
Filename :
nlp
back
Copy
#!/usr/lib/rads/venv/bin/python3 """New Apache Log Parser - cli version""" import sys import functools from pathlib import Path from argparse import ArgumentParser, RawDescriptionHelpFormatter from datetime import datetime from textwrap import wrap from tabulate import tabulate from rads.color import magenta NLP_SCRIPTS = Path(__file__).parent / 'nlp_scripts' sys.path.insert(0, str(NLP_SCRIPTS)) import nlp_funcs as nlp # pylint: disable=wrong-import-position AWK_SCRIPT = str(NLP_SCRIPTS / 'nlp-awk.sh') def header(text: str, width: int) -> None: """Prints a pretty header""" hline = "".join("-" for i in range(width - len(text) - 2)) print(magenta(f"{text}-{hline}")) def col_print(data: list[list[str]], width: int): """Prints pretty columns""" for line in tabulate(data, tablefmt='plain').splitlines(): print(line[:width].rstrip()) def inline_print(data: dict[int, int], width: int): """inline-print a dict, word wrapping as necessary""" # abuse the fact that textwrap.wrap can be made to ignore hyphens: # data = {7: 945, 8: 933, 9: 1078} # text = '7--945 8--933 9--1078' # wrap() then wraps on the spaces between, and we # convert the '--' back to ': ' before printing text = ' '.join([f"{k}--{v}" for k, v in data.items()]) lines = [ x.replace('--', ': ') for x in wrap(text, break_on_hyphens=False, width=width) ] print(*lines, sep='\n') def print_out(data: nlp.NLPData, width: int, nlines: int, ptr: bool): """Prints all the things""" col_pr = functools.partial(col_print, width=width) for date_str, hourly_hits in data["hourly_hits"].items(): header(f"Hourly hits ({date_str})", width) inline_print(hourly_hits, width) header("HTTP response codes", width) inline_print(data["http_codes"], width) header("Duplicate requests + response codes", width) col_pr(data["requests"][:nlines]) header("Requests for non-static content", width) col_pr(data["dynamic"][:nlines]) header("Top user agents", width) col_pr(data["user_agents"][:nlines]) src_ips = data["src_ips"][:nlines] if ptr: for ip_line in src_ips: # ip_line = [count, ipaddr]. append PTR ip_line.append(nlp.ptr_lookup(ip_line[1])) header("Top IPs with PTR records", width) else: header("Top IPs", width) col_pr(src_ips) def parse_args(): """parse sys.argv""" parser = ArgumentParser( formatter_class=RawDescriptionHelpFormatter, description=__doc__ ) parser.epilog = f"""Piping stdout to nlp: awk '$7 ~ /php/' /home/userna5/access-logs/* | {parser.prog} -n 50 grep -h 'bot' * | {parser.prog} zcat domain.com.gz domain2.net.gz | {parser.prog} -d 4/20""" # fmt: off parser.add_argument( '-n', type=int, default=10, dest='nlines', metavar='int', help='Number of lines to display in each section. ' 'Specifying -1 will display every possible line of output.', ) parser.add_argument( '-w', type=int, default=100, dest='width', metavar='int', help='Width of output in characters. Specifying -1 will display ' 'output with no limit. Defaults to 100.', ) parser.add_argument( '-d', dest='date', metavar='mm/dd', type=lambda x: datetime.strptime(x, '%m/%d').strftime('%d/%b'), help='Display records only matching a specific date', ) parser.add_argument( '-p', '--with-ptr', dest='ptr', action='store_const', const=True, help='Get PTR records for IPs. PTRs are resolved by default ' 'when 25 or fewer lines are printed. Specifying this option ' 'will override the default behavior and always resolve PTRs.', ) parser.add_argument( '-P', '--no-ptr', dest='ptr', action='store_const', const=False, help='Do not resolve PTRs for IPs', ) parser.add_argument('target', metavar='(log file|cPanel user)', nargs='?') # fmt: on args = parser.parse_args() if args.nlines < -1 or args.width < -1: parser.print_help() sys.exit(1) if args.ptr is None: args.ptr = args.nlines < 25 and args.nlines != -1 return args def main(): args = parse_args() with nlp.open_log(args.target) as log: with nlp.start_nlp(AWK_SCRIPT, args.date, log) as proc: data = nlp.get_input(proc.stdout, args.nlines) print_out(data, args.width, args.nlines, args.ptr) if __name__ == '__main__': try: main() except KeyboardInterrupt: pass