tww/tww.py
2019-09-23 20:50:24 +02:00

285 lines
8.9 KiB
Python

#!/usr/bin/env python
"""
Find time now, in the past or future in any timezone or location.
"""
import argparse
import logging
import os
import dateparser
from datetime import datetime
from pytz.exceptions import UnknownTimeZoneError
FUZZ_THRESHOLD = 70
ISO_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
DEFAULT_FORMAT = '%Y-%m-%d %H:%M:%S%z'
basepath = os.path.dirname(os.path.abspath(__file__))
logging.basicConfig()
logger = logging.getLogger()
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('query', nargs='*', default="now", help="<datetime-like> to <timezone-like or location string>")
parser.add_argument('--format', dest='format', default=DEFAULT_FORMAT)
parser.add_argument('--iso', dest='iso', action='store_true')
parser.add_argument('--debug', dest='debug', action='store_true')
args = parser.parse_args()
return args
def setup_logging_level(debug=False):
log_level = logging.DEBUG if debug else logging.ERROR
logger.setLevel(log_level)
logger.debug("Debugging enabled")
class Location:
"""
Represents a location with name, latitude and longitude
"""
def __init__(self, name: str, latitude: float, longitude: float):
self.name = name
self.latitude = latitude
self.longitude = longitude
def __lt__(self, other):
return self.name < other.name
def __str__(self):
return "{} {} {}".format(self.name, self.latitude, self.longitude)
def normalize_words_to_number(query):
"""
Converts queries like "in one hour" -> "in 1 hour"
Assumes one-word numbers used
"""
from word2number import w2n
normal_list = []
for word in query.split():
try:
normal_list.append(str(w2n.word_to_num(word)))
except ValueError:
normal_list.append(word)
normal = ' '.join(normal_list)
logger.debug("Normalized dt query: {} -> {}".format(query, normal))
return normal
def timezone_to_normal(query):
"""
Makes a timezone written in wrong capitalization to correct one
as expected by IANA. E.g.:
america/new_york -> America/New_York
"""
import re
# The magic in the regex is that it splits by either / OR _ OR -
# where the | are OR; and then the parens ( ) keep the splitting
# entries in the list so that we can join later
normal = ''.join(x.capitalize() for x in re.split('(/|_|-)', query))
logger.debug("Normalized timezone: {} -> {}".format(query, normal))
return normal
def create_if_not_exists(fname):
try:
fh = open(fname, 'r')
except FileNotFoundError:
import pathlib
path = pathlib.Path(fname)
path.parent.mkdir(parents=True, exist_ok=True)
fh = open(fname, 'w')
fh.close()
def write_to_cache(query, location):
import csv
logger.debug("Writing location to cache")
with open(os.path.join(basepath, "data", ".cache.csv"), 'a+') as wf:
cachewriter = csv.writer(wf)
cachewriter.writerow([query,
location.latitude,
location.longitude])
def row_to_location(row):
"""
Row from a csv file to location class
"""
latitude, longitude = float(row[1]), float(row[2])
return Location(row[0], latitude, longitude)
def resolve_location_local(query):
"""
Find a location by searching in local db of countries and cities
"""
import csv
from heapq import heappush, heappop
from fuzzywuzzy import fuzz
query = query.lower()
create_if_not_exists(os.path.join(basepath, "data", ".cache.csv"))
# location hypothesis heap
heap = []
for fname in [".cache", "countries", "cities"]:
with open(os.path.join(basepath, "data", "{}.csv".format(fname))) as f:
cfile = csv.reader(f)
for row in cfile:
entry = row[0]
if fname == ".cache" and entry == query:
location = row_to_location(row)
logger.debug("Location (from cache): {}".format(location))
return location
fuzz_ratio = fuzz.ratio(query, entry)
if fuzz_ratio > FUZZ_THRESHOLD:
location = row_to_location(row)
logger.debug("Location hyp ({} {}): {}".format(fuzz_ratio, fname, location))
# need to push negative result as heapq is min heap
heappush(heap, (-fuzz_ratio, location))
try:
result = heappop(heap)
except IndexError:
logger.critical("Could not find location {}".format(query))
exit(1)
ratio, location = result
logger.debug("Location result ({}): {}".format(-ratio, location))
write_to_cache(query, location)
return location
def resolve_location_remote(query):
import random
import string
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut
user_agent = ''.join(random.choices(string.ascii_uppercase + string.digits, k=20))
geolocator = Nominatim(user_agent=user_agent)
try:
location = geolocator.geocode(query)
write_to_cache(query, location)
return location
except GeocoderTimedOut:
logger.critical("Timed out resolving location. Try specifying a timezone directly")
exit(1)
def parse_query(query):
"""
Parses the user query to the datetime, tz/loc parts
"""
query = ' '.join(query)
query = query.strip()
if not query:
logger.critical("Use a query like <datetime-like> ['to' <timezone or location>]")
exit(1)
to_query = query.split(" to ")
logger.debug("to_query: {}".format(to_query))
if len(to_query) == 1:
# only datetime
human_dt, human_tz_loc = to_query[0], None
elif len(to_query) == 2:
# datetime to timezone
human_dt, human_tz_loc = to_query
else:
logger.critical("There can be only one 'to' in the query string")
exit(1)
logger.debug("raw human_dt: {}".format(human_dt))
logger.debug("raw human_tz_loc: {}".format(human_tz_loc))
human_dt = normalize_words_to_number(human_dt)
return human_dt, human_tz_loc
def resolve_timezone(query):
result = ""
try:
dateparser.parse(str(datetime.now()), settings={'TO_TIMEZONE': query})
# if the human_tz_loc contains /, assume it's a timezone which could be
# incorrectly written with small letters - need Continent/City
if "/" in query:
result = timezone_to_normal(query)
except UnknownTimeZoneError:
from timezonefinder import TimezoneFinder
logger.debug("No timezone: {}".format(query))
# if the human_tz_loc contains /, assume it's a timezone
# the timezone could still be guessed badly, attempt to get the city
# e.g.america/dallas
if "/" in query:
logger.debug("Assuming wrongly guessed tz {}".format(query))
query = query.split('/')[-1]
logger.debug("Try city {}".format(query))
# we don't know this timezone one, assume location
# Try to get from local file first
location = resolve_location_local(query)
if not location:
# finally go to remote
location = resolve_location_remote(query)
tzf = TimezoneFinder()
loc_tz = tzf.timezone_at(lat=location.latitude, lng=location.longitude)
logger.debug("Timezone: {}".format(loc_tz))
result = loc_tz
return result
def solve_query(human_dt, human_tz_loc):
try:
# first try parsing the timezone from user input
result = dateparser.parse(human_dt, settings={'RETURN_AS_TIMEZONE_AWARE': True})
logger.debug("human_dt result: {}".format(result))
if human_tz_loc:
human_tz_loc = resolve_timezone(human_tz_loc)
isofmt = result.isoformat()
logger.debug("human_dt isofmt: {}".format(isofmt))
result = dateparser.parse(isofmt, settings={'TO_TIMEZONE': human_tz_loc})
logger.debug("human_dt to_timezone result: {}".format(result))
except UnknownTimeZoneError:
loc_tz = resolve_timezone(human_tz_loc)
result = dateparser.parse(human_dt, settings={'TO_TIMEZONE': loc_tz})
return result
def format_result(result, fmt):
if result is None:
logger.critical("Could not solve query")
exit(1)
logger.debug("Format: {}".format(fmt))
format_result = result.strftime(fmt)
logger.debug("Formated result: {} -> {}".format(result, format_result))
return format_result
def query_to_format_result(query, fmt=DEFAULT_FORMAT):
human_dt, human_tz_loc = parse_query(query)
result = solve_query(human_dt, human_tz_loc)
formated_result = format_result(result, fmt)
return formated_result
def main(args):
fmt = ISO_FORMAT if args.iso else args.format
formated_result = query_to_format_result(args.query, fmt)
print(formated_result)
if __name__ == "__main__":
args = parse_args()
setup_logging_level(args.debug)
main(args)