tww/tww.py

269 lines
8.6 KiB
Python
Raw Normal View History

2019-03-16 18:52:32 +01:00
#!/usr/bin/env python
"""
2019-03-16 18:52:32 +01:00
Find time now, in the past or future in any timezone or location.
"""
2019-03-16 18:52:32 +01:00
2019-03-13 23:02:54 +01:00
import argparse
import logging
2019-04-20 23:23:30 +02:00
import os
2019-03-13 23:02:54 +01:00
import dateparser
from pytz.exceptions import UnknownTimeZoneError
FUZZ_THRESHOLD = 70
2019-03-15 02:25:05 +01:00
ISO_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
DEFAULT_FORMAT = '%Y-%m-%d %H:%M:%S%z'
2019-03-13 23:02:54 +01:00
2019-03-14 06:17:34 +01:00
basepath = os.path.dirname(os.path.abspath(__file__))
logging.basicConfig()
logger = logging.getLogger()
2019-03-13 23:02:54 +01:00
2019-04-20 23:23:30 +02:00
def parse_args():
parser = argparse.ArgumentParser()
2019-03-16 18:52:32 +01:00
parser.add_argument('query', nargs='*', default="now", help="<datetime-like> to <timezone-like or location string>")
parser.add_argument('--format', dest='format', default=DEFAULT_FORMAT)
2019-03-15 02:25:05 +01:00
parser.add_argument('--iso', dest='iso', action='store_true')
parser.add_argument('--debug', dest='debug', action='store_true')
args = parser.parse_args()
return args
2019-03-13 23:02:54 +01:00
def setup_logging_level(debug=False):
log_level = logging.DEBUG if debug else logging.ERROR
logger.setLevel(log_level)
logger.debug("Debugging enabled")
class Location:
"""
Represents a location with name, latitude and longitude
"""
2019-04-20 23:23:30 +02:00
def __init__(self, name: str, latitude: float, longitude: float):
self.name = name
self.latitude = latitude
self.longitude = longitude
2019-04-20 23:23:30 +02:00
def __lt__(self, other):
return self.name < other.name
2019-04-20 23:23:30 +02:00
def __str__(self):
return "{} {} {}".format(self.name, self.latitude, self.longitude)
def normalize_words_to_number(query):
"""
Converts queries like "in one hour" -> "in 1 hour"
Assumes one-word numbers used
"""
from word2number import w2n
2019-04-20 23:23:30 +02:00
normal_list = []
2019-04-20 23:23:30 +02:00
for word in query.split():
try:
normal_list.append(str(w2n.word_to_num(word)))
except ValueError:
normal_list.append(word)
normal = ' '.join(normal_list)
logger.debug("Normalized dt query: {} -> {}".format(query, normal))
return normal
def timezone_to_normal(query):
"""
Makes a timezone written in wrong capitalization to correct one
as expected by IANA. E.g.:
america/new_york -> America/New_York
"""
import re
2019-04-20 23:23:30 +02:00
# The magic in the regex is that it splits by either / OR _ OR -
# where the | are OR; and then the parens ( ) keep the splitting
# entries in the list so that we can join later
normal = ''.join(x.capitalize() for x in re.split('(/|_|-)', query))
logger.debug("Normalized timezone: {} -> {}".format(query, normal))
return normal
def create_if_not_exists(fname):
try:
fh = open(fname, 'r')
except FileNotFoundError:
fh = open(fname, 'w')
fh.close()
def write_to_cache(query, location):
import csv
2019-04-20 23:23:30 +02:00
logger.debug("Writing location to cache")
2019-04-20 23:23:30 +02:00
with open(os.path.join(basepath, "data", ".cache.csv"), 'a+') as wf:
cachewriter = csv.writer(wf)
cachewriter.writerow([query,
location.latitude,
location.longitude])
def row_to_location(row):
"""
Row from a csv file to location class
"""
latitude, longitude = float(row[1]), float(row[2])
return Location(row[0], latitude, longitude)
def resolve_location_local(query):
"""
Find a location by searching in local db of countries and cities
"""
import csv
from heapq import heappush, heappop
from fuzzywuzzy import fuzz
query = query.lower()
2019-04-20 23:23:30 +02:00
create_if_not_exists(os.path.join(basepath, "data", ".cache.csv"))
# location hypothesis heap
heap = []
2019-04-20 23:23:30 +02:00
for fname in [".cache", "countries", "cities"]:
2019-03-14 06:17:34 +01:00
with open(os.path.join(basepath, "data", "{}.csv".format(fname))) as f:
cfile = csv.reader(f)
for row in cfile:
entry = row[0]
if fname == ".cache" and entry == query:
location = row_to_location(row)
logger.debug("Location (from cache): {}".format(location))
return location
fuzz_ratio = fuzz.ratio(query, entry)
if fuzz_ratio > FUZZ_THRESHOLD:
location = row_to_location(row)
logger.debug("Location hyp ({} {}): {}".format(fuzz_ratio, fname, location))
# need to push negative result as heapq is min heap
heappush(heap, (-fuzz_ratio, location))
try:
result = heappop(heap)
except IndexError:
logger.critical("Could not find location {}".format(query))
exit(1)
ratio, location = result
logger.debug("Location result ({}): {}".format(-ratio, location))
write_to_cache(query, location)
return location
def resolve_location_remote(query):
2019-03-13 23:02:54 +01:00
import random
import string
2019-03-13 23:02:54 +01:00
from geopy.geocoders import Nominatim
2019-03-13 23:28:05 +01:00
from geopy.exc import GeocoderTimedOut
2019-03-13 23:02:54 +01:00
user_agent = ''.join(random.choices(string.ascii_uppercase + string.digits, k=20))
geolocator = Nominatim(user_agent=user_agent)
2019-03-13 23:28:05 +01:00
try:
location = geolocator.geocode(query)
write_to_cache(query, location)
return location
2019-03-13 23:28:05 +01:00
except GeocoderTimedOut:
logger.critical("Timed out resolving location. Try specifying a timezone directly")
2019-03-13 23:28:05 +01:00
exit(1)
def parse_query(query):
"""
Parses the user query to the datetime, tz/loc parts
"""
2019-03-14 18:35:19 +01:00
query = ' '.join(query)
query = query.strip()
if not query:
logger.critical("Use a query like <datetime-like> ['to' <timezone or location>]")
exit(1)
to_query = query.split(" to ")
logger.debug("to_query: {}".format(to_query))
if len(to_query) == 1:
# only datetime
human_dt, human_tz_loc = to_query[0], None
elif len(to_query) == 2:
# datetime to timezone
human_dt, human_tz_loc = to_query
else:
logger.critical("There can be only one 'to' in the query string")
exit(1)
logger.debug("raw human_dt: {}".format(human_dt))
logger.debug("raw human_tz_loc: {}".format(human_tz_loc))
2019-04-20 23:23:30 +02:00
human_dt = normalize_words_to_number(human_dt)
2019-04-20 23:23:30 +02:00
return human_dt, human_tz_loc
def solve_query(human_dt, human_tz_loc):
try:
# first try parsing the timezone from user input
2019-03-15 02:25:05 +01:00
result = dateparser.parse(human_dt, settings={'RETURN_AS_TIMEZONE_AWARE': True})
logger.debug("human_dt result: {}".format(result))
if human_tz_loc:
# if the human_tz_loc contains /, assume it's a timezone which could be
# incorrectly written with small letters - need Continent/City
if "/" in human_tz_loc:
human_tz_loc = timezone_to_normal(human_tz_loc)
isofmt = result.isoformat()
logger.debug("human_dt isofmt: {}".format(isofmt))
result = dateparser.parse(isofmt, settings={'TO_TIMEZONE': human_tz_loc})
logger.debug("human_dt to_timezone result: {}".format(result))
except UnknownTimeZoneError:
from timezonefinder import TimezoneFinder
logger.debug("No timezone: {}".format(human_tz_loc))
# if the human_tz_loc contains /, assume it's a timezone
# the timezone could still be guessed badly, attempt to get the city
# e.g.america/dallas
if "/" in human_tz_loc:
logger.debug("Assuming wrongly guessed tz {}".format(human_tz_loc))
human_tz_loc = human_tz_loc.split('/')[-1]
logger.debug("Try city {}".format(human_tz_loc))
# we don't know this timezone one, assume location
# Try to get from local file first
location = resolve_location_local(human_tz_loc)
if not location:
# finally go to remote
location = resolve_location_remote(human_tz_loc)
tzf = TimezoneFinder()
loc_tz = tzf.timezone_at(lat=location.latitude, lng=location.longitude)
logger.debug("Timezone: {}".format(loc_tz))
result = dateparser.parse(human_dt, settings={'TO_TIMEZONE': loc_tz})
return result
def format_result(result, fmt):
if result is None:
2019-03-14 18:35:19 +01:00
logger.critical("Could not solve query")
exit(1)
logger.debug("Format: {}".format(fmt))
format_result = result.strftime(fmt)
logger.debug("Formated result: {} -> {}".format(result, format_result))
return format_result
def query_to_format_result(query, fmt=DEFAULT_FORMAT):
human_dt, human_tz_loc = parse_query(query)
result = solve_query(human_dt, human_tz_loc)
formated_result = format_result(result, fmt)
return formated_result
def main(args):
2019-03-15 02:25:05 +01:00
fmt = ISO_FORMAT if args.iso else args.format
formated_result = query_to_format_result(args.query, fmt)
print(formated_result)
2019-03-13 23:21:33 +01:00
if __name__ == "__main__":
args = parse_args()
setup_logging_level(args.debug)
main(args)