D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
custom-ns
/
Filename :
named.py
back
Copy
"""functions for configuring named for custom-ns""" import fileinput import shlex import shutil import string import random import re import os from subprocess import run, CalledProcessError import netaddr from blessings import Terminal from cpapis import whmapi1, CpAPIError TERM = Terminal() def disable_recursion(): """Disable DNS recursion in named.conf""" new_conf = [] new_options = ' allow-recursion {"none";};\n recursion no;' try: with open('/etc/named.conf', encoding='utf-8') as named_file: named_conf = named_file.read().splitlines() except OSError: print( TERM.bold_red( 'Unable to disable recursion in named.conf ' '(named.conf is not readable)' ) ) return parsing_options = False options_re = re.compile(r'options\s*\{') # for each line in the original conf for line in named_conf: # if 'options {' is encountered, we are within the options section if options_re.search(line) is not None: parsing_options = True # if in the options section and there is a recursion setting, remove it if 'recursion' in line and parsing_options: continue # once the end of options section is hit, insert new_options if parsing_options and line.strip() == '};': new_conf.append(new_options) parsing_options = False new_conf.append(line) temp_path = f'/tmp/.{_randstr(20)}' with open(temp_path, 'w', encoding='utf-8') as temp_file: temp_file.write('%s\n' % '\n'.join(new_conf)) try: run(['named_checkconf', temp_path], check=True) except (CalledProcessError, FileNotFoundError): print( TERM.bold_red( 'Unable to disable recursion in named.conf ' 'due to syntax error. Reverting named.conf' ) ) os.remove(temp_path) return try: shutil.move(temp_path, '/etc/named.conf') except OSError: print( TERM.bold_red( f'could not rename {temp_path} to /etc/named.conf. ' 'Do this manually.' ) ) def _randstr(num: int) -> str: chars = string.ascii_lowercase + string.digits return ''.join(random.choice(chars) for _ in range(num)) def set_soa_record(domain, full_ns1): """Edit SOA records for a domain""" try: zone = whmapi1('dumpzone', {'domain': domain}, check=True) zone = zone['data']['zone'][0]['record'] except (TypeError, KeyError, IndexError, CpAPIError): print( TERM.bold_red( f'Could not read existing zone for {domain} to edit SOA ' 'records' ) ) return for zrecord in zone: if zrecord['type'] == 'SOA': zrecord['mname'] = full_ns1.strip('.') zrecord['domain'] = domain try: whmapi1('editzonerecord', zrecord, check=True) except CpAPIError as err: print(err) def set_ns_records(domain, ns1_name, ns1_addr, ns2_name, ns2_addr): """Add A or AAAA records to appropriate zone files""" full_ns1 = f'{ns1_name}.{domain}.' full_ns2 = f'{ns2_name}.{domain}.' # delete any existing records try: zone = whmapi1('dumpzone', {'domain': domain}, check=True) zone = zone['data']['zone'][0]['record'] except (TypeError, KeyError, IndexError): print( TERM.bold_red( f'Could not read existing zone for {domain} to delete ' 'any existing ns records' ) ) zone = [] except CpAPIError as err: print(err) zdel = [] for zrecord in zone: if zrecord['type'] not in ('A', 'AAAA'): continue if zrecord['name'] in (full_ns1, full_ns2): zdel.append(zrecord['Line']) all_removed = True while len(zdel) > 0: try: whmapi1( 'removezonerecord', {'domain': domain, 'line': zdel.pop(-1)}, check=True, ) except CpAPIError as err: print(err) all_removed = False if not all_removed: print( TERM.bold_red( 'failed to remove at least one old A record from ' f'{domain} for ns1 or ns2 - check the zone manually' ) ) # add the new 'A' or 'AAAA" records for ns1/ns2 for name, addr in ((ns1_name, ns1_addr), (ns2_name, ns2_addr)): if netaddr.valid_ipv4(addr): addr_type = 'A' else: addr_type = 'AAAA' try: whmapi1( 'addzonerecord', { 'name': name, 'domain': domain, 'type': addr_type, 'address': addr, }, check=True, ) except CpAPIError as err: print( TERM.bold_red( f'Could not set {addr_type} record for {name}.{domain}' ) ) print(err) # go through all other zones and edit their 'IN NS' records mass_fix_ns_records(full_ns1, full_ns2) def mass_fix_ns_records(full_ns1, full_ns2): """Go through every zone on the server and adjust their 'IN NS' records""" print( "Adjusting all existing zones' NS and SOA records", "(this may take a while)", ) try: all_zones = [ x['domain'] for x in whmapi1('listzones', check=True)['data']['zone'] ] except (TypeError, KeyError): all_zones = [] print( TERM.bold_red( 'could not retrieve a list of all zones on the server - ' 'they will all need their NS and SOA records adjusted manually.' ) ) except CpAPIError as err: all_zones = [] print(err) for domain in all_zones: set_soa_record(domain, full_ns1) try: zone = whmapi1('dumpzone', {'domain': domain}, check=True) zone = zone['data']['zone'][0]['record'] except (TypeError, IndexError, KeyError): zone = [] print( TERM.bold_red( f'Could not list contents of zone {domain} to adjust ' 'its NS records - do this manually' ) ) except CpAPIError as err: zone = [] print(err) zdel = [] for zrecord in zone: if zrecord['type'] == 'NS': zdel.append(zrecord['Line']) all_removed = True while len(zdel) > 0: try: whmapi1( 'removezonerecord', {'domain': domain, 'line': zdel.pop(-1)}, check=True, ) except CpAPIError as err: print(err) all_removed = False if not all_removed: print( TERM.bold_red( 'failed to remove old NS record from ' f'{domain} - check the zone manually' ) ) for new_record in (full_ns1, full_ns2): try: whmapi1( 'addzonerecord', { 'nsdname': new_record.strip('.'), 'domain': domain, 'name': f'{domain}.', 'type': 'NS', }, check=True, ) except CpAPIError as err: print( TERM.bold_red( f'Failed to add NS record of {new_record} ' f'to {domain}.db' ) ) print(err) def enable_in_chkservd(): """Enable BIND in chkservd""" if not os.path.isfile('/etc/chkserv.d/chkservd.conf'): print(TERM.bold_red('Cannot enable named in chkservd')) return with open('/etc/chkserv.d/chkservd.conf', encoding='utf-8') as chkserv: data = chkserv.read().splitlines() found = False for index, line in enumerate(data): if line.startswith('named:'): found = True data[index] = 'named:1' if not found: data.append('named:1') with open('/etc/chkserv.d/chkservd.conf', 'w', encoding='utf-8') as chkserv: chkserv.write('%s\n' % '\n'.join(data)) with open('/etc/chkserv.d/named', 'w', encoding='utf-8') as named_chk: named_chk.write( 'service[named]=x,x,x,' '/usr/local/cpanel/scripts/restartsrv_named,' 'named|bind|nsd|mydns,named|bind|nsd|mydns\n' ) def enable_bind(): """Run /scripts/setupnameserver""" cmd = ['/usr/local/cpanel/scripts/setupnameserver', 'bind'] try: run(cmd, check=True) except (FileNotFoundError, CalledProcessError): print(TERM.bold_red(f'Unable to run {shlex.join(cmd)}')) def rndc_reload(): """Try to run rndc reload and print a red error if it fails""" try: run(['rndc', 'reload'], check=True) except (FileNotFoundError, CalledProcessError): print(TERM.bold_red('Error running rndc reload')) def allow_query_all(): """Remove line from /etc/named.conf that restricts hosts allowed to query nameservers""" for line in fileinput.input(files='/etc/named.conf', inplace=True): if "allow-query" not in line: print(line, end=' ')