D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
dedrads
/
Filename :
mail_sources.py
back
Copy
#! /usr/lib/rads/venv/bin/python3 '''List sources of email sent by address and directory.''' import os import sys import re import glob import gzip import datetime from collections import defaultdict from argparse import ArgumentParser __author__ = "Daniel K" __email__ = "danielk@inmotionhosting.com" def email_lines(all_logs=False): '''Return iterable over email log lines''' log_list = [] if all_logs: log_list = glob.glob('/var/log/exim_mainlog?*') for log_file in log_list: if not os.path.exists(log_file): print(f"Could not find log file: {log_file}") sys.exit(1) with gzip.open(log_file, 'r') as mail_log: try: yield from mail_log except OSError as error: print(f"Error reading file '{log_file}': {error}") sys.exit(1) log_file = "/var/log/exim_mainlog" if not os.path.exists(log_file): print(f"Could not find log file: {log_file}") sys.exit(1) with open(log_file, encoding='utf-8') as mail_log: try: yield from mail_log except OSError: print(f"Error reading file {log_file}") sys.exit(1) except UnicodeDecodeError as e: print(f"Received decoding error for {log_file}:") print(f"{e}") print("continuing...") def get_domains(username=''): '''Get domain regex for username''' if username == '': return r'[^@ ]+' domain_list = [] user_file = f"/var/cpanel/users/{username}" if not os.path.exists(user_file): print( "Could not find domains for {}. " "Invalid cPanel user? Cannot find {}".format(username, user_file) ) sys.exit(1) dns_rx = re.compile(r"^DNS[0-9]*=(.*)$") with open(user_file, encoding='utf-8') as mail_log: try: for line in mail_log: dns_match = dns_rx.search(line) if dns_match is not None: domain_list.append(dns_match.groups(1)[0]) except OSError as error: print(f"Error reading file '{user_file}': {error}") sys.exit(1) return '|'.join(domain_list) def get_sources(all_logs=False, username='', time=''): '''Returns touple of dicts of email sources''' email_logins = defaultdict(int) working_directories = defaultdict(int) spoofing = defaultdict(int) domains = get_domains(username) if time == '': date = '' duration = 0 elif '-' in str(time): date = time duration = 0 else: assert isinstance(time, int), "Time is not date or number" date = '' duration = int(time) target = datetime.datetime.now() datetime_rx = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}') login_rx = re.compile( r'(courier|dovecot)_(plain|login):(?P<login>[^@ ]+(@(?P<domain>{}))?) ' r'.*for (?P<for>.*)$'.format(domains) ) spoofing_rx = re.compile( r'<= (?P<sender>[^@]*@[^@ ]+)' r'.*(courier|dovecot)_(plain|login):' r'(?P<login>(?!(?P=sender))[^@ ]+(@(?P<sdom>{}))?)' r'.*for (?P<for>.*)$'.format(domains) ) directory_rx = re.compile(fr'cwd=(?P<directory>/home/{username}[^ ]*)') for line in email_lines(all_logs): if date != '' and not line.startswith(date): continue if not datetime_rx.match(line): continue # If duration is set, skip any lines not within that duration if duration > 0 and not ( duration > ( target - datetime.datetime.strptime(line[:19], "%Y-%m-%d %H:%M:%S") ).total_seconds() ): continue rx_match = spoofing_rx.search(line.lower()) if rx_match: logged_in = "{} as {}".format( rx_match.group('login'), rx_match.group('sender') ) spoofing[logged_in] = spoofing[logged_in] + len( rx_match.group('for').split() ) rx_match = login_rx.search(line.lower()) if rx_match: address = rx_match.group('login') email_logins[address] = email_logins[address] + len( rx_match.group('for').split() ) continue rx_match = directory_rx.search(line) if rx_match: directory = rx_match.group('directory') if '/usr/local/cpanel/' in directory: continue working_directories[directory] = working_directories[directory] + 1 continue return (email_logins, working_directories, spoofing) def print_sorted_dict(dictionary): '''Print a dictionary sorted by values''' for value in sorted(dictionary, key=dictionary.get): print(f"{dictionary[value]:>7}\t{value}") def parse_args(): '''Parse command line aruments''' parser = ArgumentParser(description=__doc__) parser.add_argument( "-a", "--all", action='store_true', help="Search all email logs, rather than only the recent log.", ) parser.add_argument( 'username', metavar='USER', type=str, nargs='?', help="Search for only email from a specific cPanel account", ) time_group = parser.add_mutually_exclusive_group() time_group.add_argument( "-d", "--date", action='store', type=str, default='', help=( "Search for entries from a certain date. " "Must be in the format of YYYY-MM-DD." ), ) time_group.add_argument( "-s", "--seconds", action='store', type=int, default=0, help=( "Search entries which were made within the specified " "number of seconds. Overrides --all." ), ) time_group.add_argument( "-r", "--recent", action='store_true', help=( "Search recent entries, from the last hour. " "This is the same as -s 3600. Also overrides --all" ), ) args = parser.parse_args() all_logs = args.all if args.username is None: username = '' else: username = args.username date_rx = re.compile(r"\d{4}-\d{2}-\d{2}") if args.recent: time = 3600 all_logs = False elif args.date != '': if not date_rx.match(args.date): print(f"Date is not in the correct format: {args.date}") sys.exit(1) time = args.date elif args.seconds > 0: time = args.seconds all_logs = False else: time = '' return all_logs, username, time def main(): '''Main function for script''' (all_logs, username, time) = parse_args() (email_logins, working_directories, spoofing) = get_sources( all_logs, username, time ) print("Email Logins:") print_sorted_dict(email_logins) print("\nSource directories:") print_sorted_dict(working_directories) print("\nPossibly spoofed emails:") if not len(spoofing) == 0: print_sorted_dict(spoofing) else: print("\tNo obvious spoofs found") if __name__ == "__main__": main()