D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imh-python
/
lib
/
python2.7
/
site-packages
/
rads
/
Filename :
common.py
back
Copy
"""General code common to many RADS scripts""" from __future__ import print_function from collections import defaultdict import termios import platform import pwd import os import smtplib import sys import socket import logging from logging.handlers import WatchedFileHandler import re import sqlite3 from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from rads.shared import is_cpanel_user import arrow def errprint(*errmsgs, **kwargs): """Python3 print() except all text will be in red and sent to stderr. The optional param 'fatal' will make the program exit after printing. 'fatal' may be True to use a standard error return code, or an integer to specify a specific one""" errmsgs = [strcolor('red', x) for x in errmsgs] try: fatal = kwargs.pop('fatal') except KeyError: fatal = False print(*errmsgs, file=sys.stderr, **kwargs) if fatal is True: sys.exit(1) elif fatal is False: return else: sys.exit(fatal) def getlogin(root_str='root'): """Obtain who logged in. If root_str is defined, this is the string which will display instead of root if root was the login""" try: blame = os.getlogin() except OSError: blame = pwd.getpwuid(os.geteuid()).pw_name return root_str if blame == 'root' else blame def send_email( to_addr, subject, body, html=None, sender=None, ssl=False, server=('localhost', 0), login=None ): """Sends an email. to_addr: may be either a string address or list of string addresses subject: subject line body: plaintext body of email html: an optional HTML version of the email to be shown if supported on the recipient end. sender can be a string to override it or None to use the current user ssl: true or false, to use SMTPS server: optional (host, port) tuple to connect to login: optional (user, pass) tuple to connect as""" if sender is None: sender = '%s@%s' % ( pwd.getpwuid(os.getuid()).pw_name, platform.node() ) if isinstance(to_addr, (str, unicode)): to_addr = [to_addr] msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = sender msg['To'] = ", ".join(to_addr) if isinstance(body, unicode): msg.attach(MIMEText(body, 'plain', 'UTF-8')) else: msg.attach(MIMEText(body, 'plain')) if html: if isinstance(html, unicode): msg.attach(MIMEText(html, 'html', 'UTF-8')) else: msg.attach(MIMEText(html, 'html')) smtp_class = smtplib.SMTP_SSL if ssl else smtplib.SMTP try: smtp_obj = smtp_class(server[0], server[1]) if login: smtp_obj.login(login[0], login[1]) smtp_obj.sendmail(sender, to_addr, msg.as_string()) except (smtplib.SMTPException, socket.error): return False return True def colors(style=None): """Return a colorization dict based on provided style""" if style == 'html': return { 'red': '<span style="font-weight: bold; color: red">', 'green': '<span style="font-weight: bold; color: green">', # yellow is illegible in pdesk. Use orange 'yellow': '<span style="font-weight: bold; color: orange">', 'blue': '<span style="font-weight: bold; color: blue">', 'bold': '<span style="font-weight: bold">', 'purple': '<span style="font-weight: bold; color: #880088">', 'none': '</span>' } if style == 'bleach': return defaultdict(str) return { 'red': '\33[91;1m', 'green': '\033[92;1m', 'yellow': '\033[93;1m', 'blue': '\033[94;1m', 'bold': '\033[1m', 'purple': '\033[95m', 'invert': '\033[7m', 'none': '\033[0m' } def strcolor(color, string): """return a string in a given color derived from colors(). This makes the assumption you're coloring it for a linux terminal, which will be the case 99% of the time.""" col = colors() return '%s%s%s' % (col[color], string, col['none']) def get_key(conv=True): """Get a single keypress, the ugly POSIX way""" fileno = sys.stdin.fileno() old = termios.tcgetattr(fileno) new = termios.tcgetattr(fileno) new[3] = new[3] & ~termios.ICANON & ~termios.ECHO new[6][termios.VMIN] = 1 new[6][termios.VTIME] = 0 termios.tcsetattr(fileno, termios.TCSANOW, new) key = None try: key = os.read(fileno, 4) finally: termios.tcsetattr(fileno, termios.TCSAFLUSH, old) if not conv: return key keylist = { '\t': 'TAB', '\n': 'ENTER', '\x7f': 'BACKSPACE', '\x1b': 'ESC', '\x1b[Z': 'BACKTAB', '\x1bOH': 'HOME', '\x1bOP': 'F1', '\x1bOQ': 'F2', '\x1bOR': 'F3', '\x1bOS': 'F4', '\x1b[15': 'F5', '\x1b[17': 'F6', '\x1b[18': 'F7', '\x1b[19': 'F8', '\x1b[20': 'F9', '\x1b[21': 'F10', '\x1b[23': 'F11', '\x1b[24': 'F12', '\x1b[A': 'ARROW_UP', '\x1b[B': 'ARROW_DN', '\x1b[C': 'ARROW_RT', '\x1b[D': 'ARROW_LT', } return key if key not in keylist else keylist[key] def yes_or_no(question): """Prompt for a yes or no answer""" valid = {"yes": True, "y": True, "no": False, "n": False} while True: print(question, '[y/n]') try: choice = raw_input().lower() except KeyboardInterrupt: print("\nCancelled") sys.exit(1) if choice in valid: return valid[choice] else: print('Invalid answer. Try again: %(bold)s[y/n]%(none)s' % colors()) def choose(prompt, options_list, default=None): """ Prompt to select an item from a list of options. Return None on failure. """ while True: print(prompt, end='\n\n') for index, option in enumerate(options_list): print(index, option, sep=') ') print("\nEnter number or selection: ", end='') try: choice = raw_input() except KeyboardInterrupt: print("\nCancelled") return None if default is not None and choice == '': return default if choice in options_list: return choice elif choice.isdigit() and int(choice) < len(options_list): return options_list[int(choice)] else: print('\nInvalid answer. Try again.\n') def get_string( prompt, string_filter=r'[a-zA-Z0-9._/-]+$', hint='regex', default=None ): """ Prompt to request a string, and require it to match a regex. If string fails to match, give a hint, which by default is just the regex. If no matching string is obtained, return None. If empty string is entered, return default if any exists. Defined filters: alpha, digits, email, cpuser, database, url """ # Predefined filters if string_filter is None: string_filter = '.*' hint = 'Sorry, that should have matched.' elif string_filter == 'alpha': string_filter = '[a-zA-Z0-9]+$' if hint == 'regex': hint = 'Must be only alphanumeric characters.' elif string_filter == 'digits': string_filter = '[0-9.]+' if hint == 'regex': hint = 'Must be only digits.' elif string_filter == 'email': string_filter = ( r'[a-z0-9._-]+@[a-z0-9._-]+' r'\.([a-z]{2,15}|xn--[a-z0-9]{2,30})$' ) if hint == 'regex': hint = 'Must be a valid email address.' elif string_filter == 'cpuser': string_filter = '[a-z0-9]{1,14}$' if hint == 'regex': hint = ( 'Must be a valid cPanel user: ' 'letters and numbers, under 14 characters.' ) elif string_filter == 'database': # This one is not precise, but provided for convenience. string_filter = '[a-z0-9]{1,8}_[a-z0-9]{1,12}$' if hint == 'regex': hint = ( 'Must be a valid database user: ' 'letters and numbers, single underscore.' ) elif string_filter == 'url': string_filter = ( r'([a-z]{3,}://)?' r'([a-z0-9_-]+.){1,}([a-z]{2,15}|xn--[a-z0-9]{2,30})(:[0-9]+)?' r'((/[a-zA-Z0-9/.%_-]*)(\?[a-zA-Z0-9/.%=;_-]+)?)?$' ) if hint == 'regex': hint = 'Must be a valid URL.' while True: print(prompt, end='\n\n') try: choice = raw_input() except KeyboardInterrupt: print("\nCancelled") return None if default is not None and choice == '': return default elif re.match(string_filter, choice) is not None: return choice else: print('\nInvalid answer. ',) if hint == 'regex': print('\nString must match the pattern: /', string_filter, '/', sep='') elif hint is None: print(' ', end='') else: print(hint) print('Try again.\n') def header(string, char='~', color='blue', output=False): if isinstance(string, str) and isinstance(char, str): holder = (80 - len(string)) spacer = holder/2 / (len(char)) * char headerstring = "%s %s %s\n" % (spacer, string, spacer) if output is False: print(strcolor(color, headerstring)) else: try: with open(output, 'a') as f: f.write(strcolor(color, headerstring)) except IOError: print('The file should exist before proceeding. Will not write to', output) else: sys.exit('Usage: header.print_header("Header")') def setup_logging( logfile='/var/log/messages', fmt='%(asctime)s %(name)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', loglevel=logging.DEBUG, print_out=False, handler=None, handler_kwargs={} ): """ Usage: setup_logging() # defaults or, specify settings: setup_logging( logfile='/path/to/file', loglevel=logging.CRITICAL, print_out=sys.stderr ) Args: `logfile`: destination log file. default is /var/log/messages If you specify a custom handler, you must instead specify this in handler_args `fmt`: logging format. default is: %(asctime)s %(name)s %(levelname)s %(message)s `datfmt`: date format. default is %Y-%m-%d %H:%M:%S `loglevel`: log level from logging module (logging.DEBUG, etc) `print_out`: set this to an output file object (sys.stdout, sys.stderr) `handler`: logging handler for the main log. the print_out handler is always logging.StreamHandler. If not specified, the main log file will use logging.FileHandler. `handler_kwargs`: set the keyword args for your custom `handler` here in a dict """ root = logging.getLogger() formatter = logging.Formatter(fmt=fmt, datefmt=datefmt) if handler is not None: main_handler = handler(**handler_kwargs) else: main_handler = WatchedFileHandler(logfile) main_handler.setFormatter(formatter) main_handler.setLevel(loglevel) root.addHandler(main_handler) if print_out is not False: print_handler = logging.StreamHandler(stream=print_out) print_handler.setFormatter(formatter) print_handler.setLevel(loglevel) root.addHandler(print_handler) root.setLevel(loglevel) def get_bandwidth(user, daysago): if not isinstance(daysago, int): return "daysago variable must be integer" if not is_cpanel_user(user): return "not a cpanel user" banddict = {} now = arrow.now() daysago = now.replace(days=-daysago) timestamp = daysago.timestamp conn = sqlite3.connect('/var/cpanel/bandwidth/%s.sqlite' % user) c = conn.cursor() info = c.execute("select * from bandwidth_daily;").fetchall() for item in info: if item[2] > timestamp: if str(item[1]) in banddict: banddict[str(item[1])] += item[3] else: banddict[str(item[1])] = item[3] return banddict