import csv import ipaddress import threading from collections import defaultdict from datetime import datetime from time import sleep import sys import requests from lexicon.client import Client from lexicon.config import ConfigResolver from oshipka.util.process import process_exp_backoff from secrets import HOVER_USERNAME, HOVER_PASSWORD SLEEP_SECONDS = 60 * 30 # 30 min ipv4_sites = [ "http://www.icanhazip.com", "http://ipecho.net/plain", "http://checkip.amazonaws.com", "http://ipinfo.io/ip", "http://ifconfig.me" ] ipv6_sites = [ "http://bot.whatismyipaddress.com", "https://diagnostic.opendns.com/myip", "https://ifconfig.co/ip", ] dns_to_check = [] with open('dns_addresses.csv') as csvf: csvreader = csv.reader(csvf) for row in csvreader: dns_to_check.append(row) ip_checkers = ipv4_sites + ipv6_sites results = [] real = defaultdict(dict) def test_ip(ip): try: ipaddress_info = ipaddress.ip_address(ip) except ValueError as e: print(e) return None if ipaddress_info.version == 4: version = 'A' elif ipaddress_info.version == 6: version = 'AAAA' else: return None return { "addr": ipaddress_info.exploded, "version": version, } def fetch_ip(url): try: resp = requests.get(url) ip = resp.text.strip() ip_info = test_ip(ip) if ip_info: results.append(ip_info) except Exception as e: print("Error fetching {}: {}".format(url, e)) tso, tsd = [], [] def check_observed(): for url in ip_checkers: t = threading.Thread(target=fetch_ip, args=(url,)) tso.append(t) t.start() def check_dns(): for domain, ipv in dns_to_check: t = threading.Thread(target=get_dns, args=(domain, ipv,)) tsd.append(t) t.start() def get_hover_results(domain, ipv): dns_config = ConfigResolver() dns_config.with_dict({ 'provider_name': 'hover', 'action': 'list', 'type': ipv, 'domain': domain, 'hover': { 'auth_username': HOVER_USERNAME, 'auth_password': HOVER_PASSWORD, } }) client = Client(dns_config) return client.execute() def set_hover_results(domain, ipv, new): dns_config = ConfigResolver() dns_config.with_dict({ 'provider_name': 'hover', 'action': 'update', 'type': ipv, 'domain': domain, 'name': domain, 'content': new, 'hover': { 'auth_username': HOVER_USERNAME, 'auth_password': HOVER_PASSWORD, } }) client = Client(dns_config) return client.execute() def get_dns(domain, ipv): results = process_exp_backoff(get_hover_results, func_args=[domain, ipv]) res = [x.get('content') for x in results if x.get('name') == domain] if len(res) == 1: print("Real {}: {}".format(ipv, res[0])) real[domain][ipv] = res[0] def set_dns(domain, ipv, new): return process_exp_backoff(set_hover_results, func_args=[domain, ipv, new]) def get_uniq(_list): if len(_list) > 1 and len(set(_list)) == 1: return _list[0] def main(): check_dns() check_observed() for t in tso: t.join() for t in tsd: t.join() for domain, version in dns_to_check: ip_obs = get_uniq([x['addr'] for x in results if x['version'] == version]) print("{}/{}: {}".format(domain, version, ip_obs)) ip_real = real.get(domain).get(version) sys.stdout.flush() if ip_obs == ip_real: print('{}/{} is same!'.format(domain, version)) else: print("{}/{} diff on dns: real: {}, obs: {}".format(domain, version, ip_real, ip_obs)) if set_dns(domain, version, ip_real): print("update successful {}/{} -> {}".format(domain, version, ip_real)) else: print("update failed: {}/{} on dns is {}, but obs is {}".format( domain, version, ip_real, ip_obs )) if __name__ == "__main__": while True: print("{}: Waking up".format(datetime.utcnow())) sys.stdout.flush() main() print("{}: Sleeping for {}".format(datetime.utcnow(), SLEEP_SECONDS)) sys.stdout.flush() sleep(SLEEP_SECONDS)