""" """ import os import argparse import logging import sys import dateparser from pytz.exceptions import UnknownTimeZoneError FUZZ_THRESHOLD = 70 ISO_FORMAT = '%Y-%m-%dT%H:%M:%S%z' DEFAULT_FORMAT = '%Y-%m-%d %H:%M:%S%z' basepath = os.path.dirname(os.path.abspath(__file__)) logging.basicConfig() logger = logging.getLogger() def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('query', nargs='*', help=" to ") parser.add_argument('--format', dest='format', default=DEFAULT_FORMAT) parser.add_argument('--iso', dest='iso', action='store_true') parser.add_argument('--debug', dest='debug', action='store_true') args = parser.parse_args() return args def setup_logging_level(debug=False): log_level = logging.DEBUG if debug else logging.ERROR logger.setLevel(log_level) logger.debug("Debugging enabled") class Location: """ Represents a location with name, latitude and longitude """ def __init__(self, name:str, latitude: float, longitude: float): self.name = name self.latitude = latitude self.longitude = longitude def __lt__(self, other): return self.name < other.name def __str__(self): return "{} {} {}".format(self.name, self.latitude, self.longitude) def normalize_words_to_number(query): """ Converts queries like "in one hour" -> "in 1 hour" Assumes one-word numbers used """ from word2number import w2n normal_list = [] for word in query.split(): try: normal_list.append(str(w2n.word_to_num(word))) except ValueError: normal_list.append(word) normal = ' '.join(normal_list) logger.debug("Normalized dt query: {} -> {}".format(query, normal)) return normal def timezone_to_normal(query): """ Makes a timezone written in wrong capitalization to correct one as expected by IANA. E.g.: america/new_york -> America/New_York """ import re # The magic in the regex is that it splits by either / OR _ OR - # where the | are OR; and then the parens ( ) keep the splitting # entries in the list so that we can join later normal = ''.join(x.capitalize() for x in re.split('(/|_|-)', query)) logger.debug("Normalized timezone: {} -> {}".format(query, normal)) return normal def create_if_not_exists(fname): try: fh = open(fname, 'r') except FileNotFoundError: fh = open(fname, 'w') fh.close() def write_to_cache(query, location): import csv logger.debug("Writing location to cache") with open(os.path.join(basepath, "data",".cache.csv"), 'a+') as wf: cachewriter = csv.writer(wf) cachewriter.writerow([query, location.latitude, location.longitude]) def row_to_location(row): """ Row from a csv file to location class """ latitude, longitude = float(row[1]), float(row[2]) return Location(row[0], latitude, longitude) def resolve_location_local(query): """ Find a location by searching in local db of countries and cities """ import csv from heapq import heappush, heappop from fuzzywuzzy import fuzz query = query.lower() create_if_not_exists(os.path.join(basepath, "data",".cache.csv")) # location hypothesis heap heap = [] for fname in [".cache", "countries", "cities"]: with open(os.path.join(basepath, "data", "{}.csv".format(fname))) as f: cfile = csv.reader(f) for row in cfile: entry = row[0] if fname == ".cache" and entry == query: location = row_to_location(row) logger.debug("Location (from cache): {}".format(location)) return location fuzz_ratio = fuzz.ratio(query, entry) if fuzz_ratio > FUZZ_THRESHOLD: location = row_to_location(row) logger.debug("Location hyp ({} {}): {}".format(fuzz_ratio, fname, location)) # need to push negative result as heapq is min heap heappush(heap, (-fuzz_ratio, location)) try: result = heappop(heap) except IndexError: logger.critical("Could not find location {}".format(query)) exit(1) ratio, location = result logger.debug("Location result ({}): {}".format(-ratio, location)) write_to_cache(query, location) return location def resolve_location_remote(query): import random import string from geopy.geocoders import Nominatim from geopy.exc import GeocoderTimedOut user_agent = ''.join(random.choices(string.ascii_uppercase + string.digits, k=20)) geolocator = Nominatim(user_agent=user_agent) try: location = geolocator.geocode(query) write_to_cache(query, location) return location except GeocoderTimedOut: logger.critical("Timed out resolving location. Try specifying a timezone directly") exit(1) def parse_query(query): """ Parses the user query to the datetime, tz/loc parts """ query = ' '.join(query) query = query.strip() if not query: logger.critical("Use a query like ['to' ]") exit(1) to_query = query.split(" to ") logger.debug("to_query: {}".format(to_query)) if len(to_query) == 1: # only datetime human_dt, human_tz_loc = to_query[0], None elif len(to_query) == 2: # datetime to timezone human_dt, human_tz_loc = to_query else: logger.critical("There can be only one 'to' in the query string") exit(1) logger.debug("raw human_dt: {}".format(human_dt)) logger.debug("raw human_tz_loc: {}".format(human_tz_loc)) human_dt = normalize_words_to_number(human_dt) return human_dt, human_tz_loc def solve_query(human_dt, human_tz_loc): try: # first try parsing the timezone from user input result = dateparser.parse(human_dt, settings={'RETURN_AS_TIMEZONE_AWARE': True}) logger.debug("human_dt result: {}".format(result)) if human_tz_loc: # if the human_tz_loc contains /, assume it's a timezone which could be # incorrectly written with small letters - need Continent/City if "/" in human_tz_loc: human_tz_loc = timezone_to_normal(human_tz_loc) isofmt = result.isoformat() logger.debug("human_dt isofmt: {}".format(isofmt)) result = dateparser.parse(isofmt, settings={'TO_TIMEZONE': human_tz_loc}) logger.debug("human_dt to_timezone result: {}".format(result)) except UnknownTimeZoneError: from timezonefinder import TimezoneFinder logger.debug("No timezone: {}".format(human_tz_loc)) # if the human_tz_loc contains /, assume it's a timezone # the timezone could still be guessed badly, attempt to get the city # e.g.america/dallas if "/" in human_tz_loc: logger.debug("Assuming wrongly guessed tz {}".format(human_tz_loc)) human_tz_loc = human_tz_loc.split('/')[-1] logger.debug("Try city {}".format(human_tz_loc)) # we don't know this timezone one, assume location # Try to get from local file first location = resolve_location_local(human_tz_loc) if not location: # finally go to remote location = resolve_location_remote(human_tz_loc) tzf = TimezoneFinder() loc_tz = tzf.timezone_at(lat=location.latitude, lng=location.longitude) logger.debug("Timezone: {}".format(loc_tz)) result = dateparser.parse(human_dt, settings={'TO_TIMEZONE': loc_tz}) return result def format_result(result, fmt): if result is None: logger.critical("Could not solve query") exit(1) logger.debug("Format: {}".format(fmt)) format_result = result.strftime(fmt) logger.debug("Formated result: {} -> {}".format(result, format_result)) return format_result def query_to_format_result(query, fmt=DEFAULT_FORMAT): human_dt, human_tz_loc = parse_query(query) result = solve_query(human_dt, human_tz_loc) formated_result = format_result(result, fmt) return formated_result def main(args): fmt = ISO_FORMAT if args.iso else args.format formated_result = query_to_format_result(args.query, fmt) print(formated_result) if __name__ == "__main__": args = parse_args() setup_logging_level(args.debug) main(args)