workdays and hours calculated

This commit is contained in:
Daniel Tsvetkov 2020-02-12 11:42:48 +01:00
parent ea18b83628
commit d78a50c7d7
4 changed files with 352 additions and 242 deletions

View File

@ -1,34 +1,39 @@
backcall==0.1.0 backcall==0.1.0
chardet==3.0.4
Click==7.0 Click==7.0
dateparser==0.7.2 dateparser==0.7.2
decorator==4.4.0 DateTimeRange==0.6.1
decorator==4.4.1
Flask==1.1.1 Flask==1.1.1
freezegun==0.3.12 freezegun==0.3.14
fuzzywuzzy==0.17.0 fuzzywuzzy==0.17.0
geographiclib==1.49 geographiclib==1.50
geopy==1.20.0 geopy==1.21.0
importlib-resources==1.0.2 importlib-resources==1.0.2
ipython==7.8.0 ipython==7.12.0
ipython-genutils==0.2.0 ipython-genutils==0.2.0
isodate==0.6.0
itsdangerous==1.1.0 itsdangerous==1.1.0
jedi==0.15.1 jedi==0.16.0
Jinja2==2.10.3 Jinja2==2.11.1
MarkupSafe==1.1.1 MarkupSafe==1.1.1
mbstrdecoder==0.8.4
numpy==1.18.1 numpy==1.18.1
parso==0.5.1 parso==0.6.1
pexpect==4.7.0 pexpect==4.8.0
pickleshare==0.7.5 pickleshare==0.7.5
prompt-toolkit==2.0.9 prompt-toolkit==3.0.3
ptyprocess==0.6.0 ptyprocess==0.6.0
Pygments==2.4.2 Pygments==2.5.2
python-dateutil==2.8.1 python-dateutil==2.8.1
python-Levenshtein==0.12.0 python-Levenshtein==0.12.0
pytz==2019.3 pytz==2019.3
regex==2019.11.1 regex==2020.1.8
six==1.14.0 six==1.14.0
timezonefinder==4.1.0 timezonefinder==4.2.0
traitlets==4.3.2 traitlets==4.3.3
typepy==0.6.5
tzlocal==2.0.0 tzlocal==2.0.0
wcwidth==0.1.7 wcwidth==0.1.8
Werkzeug==0.16.0 Werkzeug==1.0.0
word2number==1.1 word2number==1.1

129
src/tww/localization.py Normal file
View File

@ -0,0 +1,129 @@
import contextlib
import csv
import locale
import os
from fuzzywuzzy import fuzz
from tww import basepath
@contextlib.contextmanager
def setlocale(*args, **kw):
saved = locale.setlocale(locale.LC_ALL)
yield locale.setlocale(*args, **kw)
locale.setlocale(locale.LC_ALL, saved)
def find_country_alias(locale_s):
with open(os.path.join(basepath, "data", "country_codes.csv")) as f:
cfile = csv.reader(f)
for row in cfile:
country, alpha2, alpha3 = row[0:3]
country, alpha2, alpha3 = country.lower(), alpha2.lower(), alpha3.lower()
if locale_s in [country, alpha2, alpha3]:
return country, alpha2, alpha3
fuzz_ratio = fuzz.ratio(locale_s, country)
if fuzz_ratio > 90:
return country, alpha2, alpha3
return None, None, None
def find_language_alias(locale_s):
with open(os.path.join(basepath, "data", "language_codes.csv")) as f:
cfile = csv.reader(f)
for row in cfile:
name, native_name, a2, a3 = row[1:5]
name, native_name, a2, a3 = name.lower(), native_name.lower(), a2.lower(), a3.lower()
if locale_s in [a2, a3, name, native_name]:
return name, native_name, a2, a3
fuzz_ratio = fuzz.ratio(locale_s, name)
if fuzz_ratio > 90:
return name, native_name, a2, a3
fuzz_ratio = fuzz.ratio(locale_s, native_name)
if fuzz_ratio > 80:
return name, native_name, a2, a3
return None, None, None, None
def lc_time_to_codes(lc_time):
country_lang, encoding = lc_time.split('.')
country_code, lang_code = country_lang.split('_')
return country_code, lang_code, encoding
def get_default_locale():
default_locale = locale.getlocale()
if type(default_locale) == tuple:
default_locale = "{}.{}".format(*default_locale)
country_code, lang_code, encoding = lc_time_to_codes(default_locale)
return country_code, lang_code, encoding, default_locale
def resolve_locale(locale_s):
country_code, lang_code, encoding, default_locale = get_default_locale()
rv = dict(
query=locale_s,
country_code=country_code,
lang_code=lang_code,
encoding=encoding,
lc_time=default_locale,
)
default_encoding = 'utf-8'
if not locale_s:
return rv
if '.' in locale_s:
country_lang, encoding = locale_s.split('.')
else:
country_lang, encoding = locale_s, default_encoding
if '_' in country_lang:
country_code, lang_code = country_lang.split('_')
if len(country_code) == 2 and len(lang_code) == 2:
try:
lc_time = "{}_{}.{}".format(country_code, lang_code, encoding)
locale.setlocale(locale.LC_TIME, lc_time)
rv["country_code"] = country_code
rv["lang_code"] = lang_code
rv["encoding"] = encoding
rv["lc_time"] = lc_time
return rv
except:
...
locale_s = locale_s.strip().lower()
country, alpha2, alpha3 = find_country_alias(locale_s)
lang_name, lang_native_name, lang2, lang3 = find_language_alias(locale_s)
if alpha2:
locale_hypotheses = {k: v for k, v in locale.locale_alias.items() if k.startswith(alpha2)}
for k, v in locale_hypotheses.items():
lower = k.lower()
if 'utf-8' in lower:
rv["lc_time"] = v
break
else:
if locale_hypotheses:
lc_time = locale_hypotheses.get(alpha2)
if lc_time:
country_code, lang_code, encoding = lc_time_to_codes(lc_time)
rv["country_code"] = country_code
rv["lang_code"] = lang_code
rv["encoding"] = encoding
rv["lc_time"] = lc_time
return rv
if lang2:
locale_hypotheses = {k: v for k, v in locale.locale_alias.items() if k.startswith(lang2)}
for k, v in locale_hypotheses.items():
lower = k.lower()
if 'utf-8' in lower:
rv["lc_time"] = v
break
else:
if locale_hypotheses:
lc_time = locale_hypotheses.get(lang2)
if lc_time:
country_code, lang_code, encoding = lc_time_to_codes(lc_time)
rv["country_code"] = country_code
rv["lang_code"] = lang_code
rv["encoding"] = encoding
rv["lc_time"] = lc_time
return rv
return rv

View File

@ -1,16 +1,13 @@
import contextlib
import csv
import json import json
import locale import locale
import os
import re import re
import sys import sys
from datetime import datetime from datetime import datetime
from fuzzywuzzy import fuzz
from pygments import highlight, lexers, formatters from pygments import highlight, lexers, formatters
from tww import ISO_FORMAT, time_to_emoji, time_ago, basepath from localization import setlocale, resolve_locale
from tww import ISO_FORMAT, time_to_emoji, time_ago, workday_diff, workhours_diff, td_remainders, td_totals, td_iso8601
from tww import resolve_timezone, dateparser_parse_dt, get_utcnow, get_s_since_epoch, get_ms_since_epoch, \ from tww import resolve_timezone, dateparser_parse_dt, get_utcnow, get_s_since_epoch, get_ms_since_epoch, \
dt_tz_translation, get_local_now, query_to_format_result dt_tz_translation, get_local_now, query_to_format_result
@ -25,6 +22,12 @@ r_time_in = re.compile('(?:time)?\s*in\s*(.*)', flags=re.IGNORECASE)
r_time_since = re.compile('(?:time)?\s*since\s*(.*)', flags=re.IGNORECASE) r_time_since = re.compile('(?:time)?\s*since\s*(.*)', flags=re.IGNORECASE)
r_time_until = re.compile('(?:time)?\s*until\s*(.*)', flags=re.IGNORECASE) r_time_until = re.compile('(?:time)?\s*until\s*(.*)', flags=re.IGNORECASE)
r_time_between = re.compile('(?:time)?\s*between\s*(.*)\s*and\s*(.*)', flags=re.IGNORECASE) r_time_between = re.compile('(?:time)?\s*between\s*(.*)\s*and\s*(.*)', flags=re.IGNORECASE)
r_workdays_since = re.compile('(?:workdays|work days)?\s*since\s*(.*)', flags=re.IGNORECASE)
r_workdays_until = re.compile('(?:workdays|work days)?\s*until\s*(.*)', flags=re.IGNORECASE)
r_workdays_between = re.compile('(?:workdays|work days)?\s*between\s*(.*)\s*and\s*(.*)', flags=re.IGNORECASE)
r_workhours_since = re.compile('(?:workhours|work hours)?\s*since\s*(.*)', flags=re.IGNORECASE)
r_workhours_until = re.compile('(?:workhours|work hours)?\s*until\s*(.*)', flags=re.IGNORECASE)
r_workhours_between = re.compile('(?:workhours|work hours)?\s*between\s*(.*)\s*and\s*(.*)', flags=re.IGNORECASE)
r_timezone_translation = re.compile('(.*)?\s(?:in|to)\s(.*)', flags=re.IGNORECASE) r_timezone_translation = re.compile('(.*)?\s(?:in|to)\s(.*)', flags=re.IGNORECASE)
r_timezone = re.compile('(.*)?\s(?:timezone|timezones|tz)', flags=re.IGNORECASE) r_timezone = re.compile('(.*)?\s(?:timezone|timezones|tz)', flags=re.IGNORECASE)
r_timezone_2 = re.compile('(?:timezone in|timezones in|tz in|timezone|timezones|tz)\s(.*)?', flags=re.IGNORECASE) r_timezone_2 = re.compile('(?:timezone in|timezones in|tz in|timezone|timezones|tz)\s(.*)?', flags=re.IGNORECASE)
@ -65,6 +68,28 @@ def handler_time_since_until(start_dt_s: str) -> dict:
return handler_time_diff(dateparser_parse_dt(start_dt_s), get_local_now()) return handler_time_diff(dateparser_parse_dt(start_dt_s), get_local_now())
def handler_workdays_diff(start_dt: datetime, end_dt: datetime) -> dict:
diff = workday_diff(start_dt, end_dt)
return dict(start=dt_pretty(start_dt),
end=dt_pretty(end_dt),
diff=td_pretty(diff))
def handler_workdays_since_until(start_dt_s: str) -> dict:
return handler_workdays_diff(dateparser_parse_dt(start_dt_s), get_local_now())
def handler_workhours_diff(start_dt: datetime, end_dt: datetime) -> dict:
diff = workhours_diff(start_dt, end_dt)
return dict(start=dt_pretty(start_dt),
end=dt_pretty(end_dt),
diff=td_pretty(diff))
def handler_workhours_since_until(start_dt_s: str) -> dict:
return handler_workhours_diff(dateparser_parse_dt(start_dt_s), get_local_now())
def handler_timezone_translation(dt_s: str, timezone_like_s: str) -> dict: def handler_timezone_translation(dt_s: str, timezone_like_s: str) -> dict:
src_dt = dateparser_parse_dt(dt_s) src_dt = dateparser_parse_dt(dt_s)
tz = resolve_timezone(timezone_like_s) tz = resolve_timezone(timezone_like_s)
@ -106,6 +131,12 @@ regex_handlers = [
(r_time_since, handler_time_since_until, QUERY_TYPE_TD), (r_time_since, handler_time_since_until, QUERY_TYPE_TD),
(r_time_until, handler_time_since_until, QUERY_TYPE_TD), (r_time_until, handler_time_since_until, QUERY_TYPE_TD),
(r_time_between, handler_time_diff, QUERY_TYPE_TD), (r_time_between, handler_time_diff, QUERY_TYPE_TD),
(r_workdays_since, handler_workdays_since_until, QUERY_TYPE_TD),
(r_workdays_until, handler_workdays_since_until, QUERY_TYPE_TD),
(r_workdays_between, handler_workdays_diff, QUERY_TYPE_TD),
(r_workhours_since, handler_workhours_since_until, QUERY_TYPE_TD),
(r_workhours_until, handler_workhours_since_until, QUERY_TYPE_TD),
(r_workhours_between, handler_workhours_diff, QUERY_TYPE_TD),
(r_time_in, handler_time_in_parser, QUERY_TYPE_DT), (r_time_in, handler_time_in_parser, QUERY_TYPE_DT),
(r_timezone, handler_timezone, QUERY_TYPE_TZ), (r_timezone, handler_timezone, QUERY_TYPE_TZ),
(r_timezone_2, handler_timezone, QUERY_TYPE_TZ), (r_timezone_2, handler_timezone, QUERY_TYPE_TZ),
@ -137,158 +168,12 @@ def tokenize(s):
return solutions return solutions
def test():
test_strings = [
None,
"",
"s",
" ",
"Time since 2019-05-12",
"Since yesterday",
"time between yesterday and tomorrow",
"time until 25 december",
"time sinc",
"now in milliseconds",
"seconds since epoch",
"1992-01-27 to epoch",
"milliseconds since 1992-01-27",
"now in sofia",
"now in PST",
"2 hours ago to Sydney",
"now in +03:00",
"now in dublin",
]
for s in test_strings:
print("{} -> {}".format(s, tokenize(s)))
def pretty_print_dict(obj): def pretty_print_dict(obj):
formatted_json = json.dumps(obj, indent=2, ensure_ascii=False) formatted_json = json.dumps(obj, indent=2, ensure_ascii=False)
colorful_json = highlight(formatted_json, lexers.JsonLexer(), formatters.TerminalFormatter()) colorful_json = highlight(formatted_json, lexers.JsonLexer(), formatters.TerminalFormatter())
print(colorful_json) print(colorful_json)
@contextlib.contextmanager
def setlocale(*args, **kw):
saved = locale.setlocale(locale.LC_ALL)
yield locale.setlocale(*args, **kw)
locale.setlocale(locale.LC_ALL, saved)
def find_country_alias(locale_s):
with open(os.path.join(basepath, "data", "country_codes.csv")) as f:
cfile = csv.reader(f)
for row in cfile:
country, alpha2, alpha3 = row[0:3]
country, alpha2, alpha3 = country.lower(), alpha2.lower(), alpha3.lower()
if locale_s in [country, alpha2, alpha3]:
return country, alpha2, alpha3
fuzz_ratio = fuzz.ratio(locale_s, country)
if fuzz_ratio > 90:
return country, alpha2, alpha3
return None, None, None
def find_language_alias(locale_s):
with open(os.path.join(basepath, "data", "language_codes.csv")) as f:
cfile = csv.reader(f)
for row in cfile:
name, native_name, a2, a3 = row[1:5]
name, native_name, a2, a3 = name.lower(), native_name.lower(), a2.lower(), a3.lower()
if locale_s in [a2, a3, name, native_name]:
return name, native_name, a2, a3
fuzz_ratio = fuzz.ratio(locale_s, name)
if fuzz_ratio > 90:
return name, native_name, a2, a3
fuzz_ratio = fuzz.ratio(locale_s, native_name)
if fuzz_ratio > 80:
return name, native_name, a2, a3
return None, None, None, None
def lc_time_to_codes(lc_time):
country_lang, encoding = lc_time.split('.')
country_code, lang_code = country_lang.split('_')
return country_code, lang_code, encoding
def get_default_locale():
default_locale = locale.getlocale()
if type(default_locale) == tuple:
default_locale = "{}.{}".format(*default_locale)
country_code, lang_code, encoding = lc_time_to_codes(default_locale)
return country_code, lang_code, encoding, default_locale
def resolve_locale(locale_s):
country_code, lang_code, encoding, default_locale = get_default_locale()
rv = dict(
query=locale_s,
country_code=country_code,
lang_code=lang_code,
encoding=encoding,
lc_time=default_locale,
)
default_encoding = 'utf-8'
if not locale_s:
return rv
if '.' in locale_s:
country_lang, encoding = locale_s.split('.')
else:
country_lang, encoding = locale_s, default_encoding
if '_' in country_lang:
country_code, lang_code = country_lang.split('_')
if len(country_code) == 2 and len(lang_code) == 2:
try:
lc_time = "{}_{}.{}".format(country_code, lang_code, encoding)
locale.setlocale(locale.LC_TIME, lc_time)
rv["country_code"] = country_code
rv["lang_code"] = lang_code
rv["encoding"] = encoding
rv["lc_time"] = lc_time
return rv
except:
...
locale_s = locale_s.strip().lower()
country, alpha2, alpha3 = find_country_alias(locale_s)
lang_name, lang_native_name, lang2, lang3 = find_language_alias(locale_s)
if alpha2:
locale_hypotheses = {k: v for k, v in locale.locale_alias.items() if k.startswith(alpha2)}
for k, v in locale_hypotheses.items():
lower = k.lower()
if 'utf-8' in lower:
rv["lc_time"] = v
break
else:
if locale_hypotheses:
lc_time = locale_hypotheses.get(alpha2)
if lc_time:
country_code, lang_code, encoding = lc_time_to_codes(lc_time)
rv["country_code"] = country_code
rv["lang_code"] = lang_code
rv["encoding"] = encoding
rv["lc_time"] = lc_time
return rv
if lang2:
locale_hypotheses = {k: v for k, v in locale.locale_alias.items() if k.startswith(lang2)}
for k, v in locale_hypotheses.items():
lower = k.lower()
if 'utf-8' in lower:
rv["lc_time"] = v
break
else:
if locale_hypotheses:
lc_time = locale_hypotheses.get(lang2)
if lc_time:
country_code, lang_code, encoding = lc_time_to_codes(lc_time)
rv["country_code"] = country_code
rv["lang_code"] = lang_code
rv["encoding"] = encoding
rv["lc_time"] = lc_time
return rv
return rv
def dt_pretty(dt): def dt_pretty(dt):
rv = {} rv = {}
global custom_locale global custom_locale
@ -313,63 +198,6 @@ def dt_pretty(dt):
return rv return rv
def td_remainders(td):
# split seconds to larger units
seconds = td.total_seconds()
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
months, days = divmod(days, 30.42)
years, months = divmod(months, 12)
years, months, days, hours, minutes, seconds = map(int, (years, months, days, hours, minutes, seconds))
years, months, days, hours, minutes, seconds = map(abs, (years, months, days, hours, minutes, seconds))
return dict(
seconds=seconds,
minutes=minutes,
hours=hours,
days=days,
months=months,
years=years,
)
def td_totals(td):
seconds = td.total_seconds()
minutes = seconds // 60
hours = seconds // (60 * 60)
days = seconds // (24 * 60 * 60)
weeks = seconds // (7 * 24 * 60 * 60)
months = seconds // (30 * 24 * 60 * 60)
years = seconds // (365 * 24 * 60 * 60)
years, months, weeks, days, hours, minutes, seconds = map(abs,
(years, months, weeks, days, hours, minutes, seconds))
return dict(
seconds=seconds,
minutes=minutes,
hours=hours,
days=days,
weeks=weeks,
months=months,
years=years,
)
def td_iso8601(td):
"""P[n]Y[n]M[n]DT[n]H[n]M[n]S"""
rem = td_remainders(td)
fmt = "P"
for short, timeframe in [("Y", "years"), ("M", "months"), ("D", "days")]:
if rem[timeframe]:
fmt += "{}{}".format(rem[timeframe], short)
hms = [("H", "hours"), ("M", "minutes"), ("S", "seconds")]
if any([rem[t[1]] for t in hms]):
fmt += "T"
for short, timeframe in hms:
if rem[timeframe]:
fmt += "{}{}".format(rem[timeframe], short)
return fmt
def td_pretty(td): def td_pretty(td):
rv = { rv = {
"sign": '-' if td.days < 0 else '+', "sign": '-' if td.days < 0 else '+',
@ -401,24 +229,54 @@ def resolve_query(query):
handler, results, query_type = solution handler, results, query_type = solution
element["handler"] = handler element["handler"] = handler
element["query_type"] = query_type element["query_type"] = query_type
if query_type == QUERY_TYPE_DT: try:
element["dt"] = dt_pretty(results) if query_type == QUERY_TYPE_DT:
elif query_type == QUERY_TYPE_DT_TR: element["dt"] = dt_pretty(results)
element["src_dt"] = dt_pretty(results[0]) elif query_type == QUERY_TYPE_DT_TR:
element["dst_dt"] = dt_pretty(results[1]) element["src_dt"] = dt_pretty(results[0])
element["tz"] = results[2] element["dst_dt"] = dt_pretty(results[1])
elif query_type == QUERY_TYPE_TZ: element["tz"] = results[2]
element["tz"] = results elif query_type == QUERY_TYPE_TZ:
elif query_type == QUERY_TYPE_TD: element["tz"] = results
element["timedelta"] = results elif query_type == QUERY_TYPE_TD:
rv["solutions"].append(element) element["timedelta"] = results
rv["solutions"].append(element)
except Exception:
...
return rv return rv
def test():
test_strings = [
None,
"",
"s",
" ",
"Time since 2019-05-12",
"Since yesterday",
"time between yesterday and tomorrow",
"time until 25 december",
"time sinc",
"now in milliseconds",
"seconds since epoch",
"1992-01-27 to epoch",
"milliseconds since 1992-01-27",
"now in sofia",
"now in PST",
"2 hours ago to Sydney",
"now in +03:00",
"now in dublin",
"workdays since 2/07/2020 12:00",
"workhours since 2/07/2020 12:00",
]
for s in test_strings:
print("{} -> {}".format(s, resolve_query(s)))
if __name__ == "__main__": if __name__ == "__main__":
query = ' '.join(sys.argv[1:])
# query = "now in india"
custom_locale = "" custom_locale = ""
custom_locale = resolve_locale(custom_locale) custom_locale = resolve_locale(custom_locale)
query = ' '.join(sys.argv[1:])
# query = "workhours until 2/12/2020 12:00"
result = resolve_query(query) result = resolve_query(query)
pretty_print_dict(result) pretty_print_dict(result)

View File

@ -14,6 +14,7 @@ from datetime import datetime, timedelta
import pytz import pytz
from dateparser import parse as parse_dt from dateparser import parse as parse_dt
from datetimerange import DateTimeRange
from dateutil.parser import parse as dutil_parse from dateutil.parser import parse as dutil_parse
from dateparser.timezone_parser import StaticTzInfo from dateparser.timezone_parser import StaticTzInfo
from dateutil.tz import gettz, tzlocal from dateutil.tz import gettz, tzlocal
@ -183,7 +184,7 @@ def resolve_location_local(query):
try: try:
result = heappop(heap) result = heappop(heap)
except IndexError: except IndexError:
logger.error("Could not find location {}".format(query)) logger.info("Could not find location {}".format(query))
return "" return ""
ratio, location = result ratio, location = result
logger.debug("Location result ({}): {}".format(-ratio, location)) logger.debug("Location result ({}): {}".format(-ratio, location))
@ -205,17 +206,18 @@ def resolve_location_remote(query):
write_to_cache(query, location) write_to_cache(query, location)
return location return location
except GeocoderTimedOut: except GeocoderTimedOut:
logger.error("Timed out resolving location. Try specifying a timezone directly") logger.info("Timed out resolving location. Try specifying a timezone directly")
def parse_query(query): def parse_query(query):
""" """
TODO: DEPRECATE THIS
Parses the user query to the datetime, tz/loc parts Parses the user query to the datetime, tz/loc parts
""" """
# query = ' '.join(query) # query = ' '.join(query)
query = query.strip() query = query.strip()
if not query: if not query:
logger.error("Use a query like <datetime-like> ['to' <timezone or location>]") logger.info("TO DEPRECATE: Use a query like <datetime-like> ['to' <timezone or location>]")
to_query = query.split(" to ") to_query = query.split(" to ")
logger.debug("to_query: {}".format(to_query)) logger.debug("to_query: {}".format(to_query))
if len(to_query) == 1: if len(to_query) == 1:
@ -225,7 +227,7 @@ def parse_query(query):
# datetime to timezone # datetime to timezone
human_dt, human_tz_loc = to_query human_dt, human_tz_loc = to_query
else: else:
logger.error("There can be only one 'to' in the query string") logger.info("TO DEPRECATE: There can be only one 'to' in the query string")
logger.debug("raw human_dt: {}".format(human_dt)) logger.debug("raw human_dt: {}".format(human_dt))
logger.debug("raw human_tz_loc: {}".format(human_tz_loc)) logger.debug("raw human_tz_loc: {}".format(human_tz_loc))
@ -358,7 +360,7 @@ def solve_query(human_dt, human_tz_loc):
def format_result(result, fmt): def format_result(result, fmt):
if result is None: if result is None:
logger.error("Could not solve query") logger.info("Could not solve query")
logger.debug("Format: {}".format(fmt)) logger.debug("Format: {}".format(fmt))
format_result = result.strftime(fmt) format_result = result.strftime(fmt)
logger.debug("Formated result: {} -> {}".format(result, format_result)) logger.debug("Formated result: {} -> {}".format(result, format_result))
@ -577,3 +579,119 @@ def time_to_emoji(dt):
seconds = get_local_s_since_epoch(dt) seconds = get_local_s_since_epoch(dt)
a = int((seconds / 900 - 3) / 2 % 24) a = int((seconds / 900 - 3) / 2 % 24)
return chr(128336 + a // 2 + a % 2 * 12) return chr(128336 + a // 2 + a % 2 * 12)
def workday_diff(start, end, workdays=None):
"""
Calculates the difference between two dates excluding weekends
# TODO: doesn't work with Until (i.e. future calculation)
"""
if not workdays:
workdays = range(0, 5)
td = end - start
daygenerator = (start + timedelta(x + 1) for x in range(td.days))
weekdays = sum(1 for day in daygenerator if day.weekday() in workdays)
return timedelta(days=weekdays)
def workhours_diff(start, end, workhour_begin="09:00", workhour_end="17:00", workdays=None):
"""
Calculates the difference between two dates excluding non-workhours
This can potentially be very slow for long ranges as it calculates per minute resolution.
# TODO: doesn't work with Until (i.e. future calculation)
"""
if not workdays:
workdays = range(0, 5)
workday_start_h, workday_start_m = map(int, workhour_begin.split(':'))
workday_end_h, workday_end_m = map(int, workhour_end.split(':'))
# assume night shift if next workday starts after
day_diff = 1 if workday_end_h < workday_start_h else 0
prev_dt_minute, dt_minute = start, start + timedelta(minutes=1)
summins = 0
while dt_minute < end:
if dt_minute.weekday() not in workdays:
prev_dt_minute, dt_minute = prev_dt_minute + timedelta(days=1), dt_minute + timedelta(days=1)
continue
this_day_workhours_begin = datetime(year=dt_minute.year, month=dt_minute.month, day=dt_minute.day,
hour=workday_start_h, minute=workday_start_m, tzinfo=dt_minute.tzinfo)
this_day_workhours_end = datetime(year=dt_minute.year, month=dt_minute.month, day=dt_minute.day,
hour=workday_end_h, minute=workday_end_m, tzinfo=dt_minute.tzinfo)
# calc if night shift
this_day_workhours_end += timedelta(days=day_diff)
# test if this minute is within workhours with daterange
this_day_workhours = DateTimeRange(this_day_workhours_begin, this_day_workhours_end)
time_range = DateTimeRange(prev_dt_minute, dt_minute)
if time_range in this_day_workhours:
# we are in workhours, add all the minutes here until the end (now) or end of workday - whichever is smaller
end_delta = end if end < this_day_workhours_end else this_day_workhours_end
summins += (end_delta - prev_dt_minute).total_seconds() // 60
prev_dt_minute = end_delta
else:
# skip until next workday - naively add one day; it could be weekend, but it will be caught above
prev_dt_minute = this_day_workhours_begin + timedelta(days=1)
dt_minute = prev_dt_minute + timedelta(minutes=1)
return timedelta(seconds=int(summins * 60))
def td_remainders(td):
# split seconds to larger units
seconds = td.total_seconds()
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
months, days = divmod(days, 30.42)
years, months = divmod(months, 12)
years, months, days, hours, minutes, seconds = map(int, (years, months, days, hours, minutes, seconds))
years, months, days, hours, minutes, seconds = map(abs, (years, months, days, hours, minutes, seconds))
return dict(
seconds=seconds,
minutes=minutes,
hours=hours,
days=days,
months=months,
years=years,
)
def td_totals(td):
seconds = td.total_seconds()
minutes = seconds // 60
hours = seconds // (60 * 60)
days = seconds // (24 * 60 * 60)
weeks = seconds // (7 * 24 * 60 * 60)
months = seconds // (30 * 24 * 60 * 60)
years = seconds // (365 * 24 * 60 * 60)
years, months, weeks, days, hours, minutes, seconds = map(abs,
(years, months, weeks, days, hours, minutes, seconds))
return dict(
seconds=seconds,
minutes=minutes,
hours=hours,
days=days,
weeks=weeks,
months=months,
years=years,
)
def td_iso8601(td):
"""P[n]Y[n]M[n]DT[n]H[n]M[n]S"""
rem = td_remainders(td)
fmt = "P"
for short, timeframe in [("Y", "years"), ("M", "months"), ("D", "days")]:
if rem[timeframe]:
fmt += "{}{}".format(rem[timeframe], short)
hms = [("H", "hours"), ("M", "minutes"), ("S", "seconds")]
if any([rem[t[1]] for t in hms]):
fmt += "T"
for short, timeframe in hms:
if rem[timeframe]:
fmt += "{}{}".format(rem[timeframe], short)
return fmt