kltn20133118's picture
Upload 337 files
dbaa71b verified
import json
import math
import time
import dateparser
from datetime import datetime, timezone
from importlib import import_module
from typing import Any, Dict, Optional, Union
from bs4 import BeautifulSoup
from bs4.element import Comment
from dateutil.relativedelta import relativedelta
DATETIME_STRING_PATTERN = "%Y-%m-%dT%H:%M:%SZ"
DEFAULT_LOOKUP_PERIOD = "1h"
# Used from https://stackoverflow.com/a/52081812 and modified
def flatten_dict(
dictionary: Dict[str, Any],
round_the_float: bool = True,
float_round_format_str: str = ".2f",
separator: str = "_",
) -> Dict[str, Any]:
out: Dict[str, Any] = {}
for key, val in dictionary.items():
if isinstance(val, dict):
val = [val]
if isinstance(val, list):
for sub_dict in val:
deeper = flatten_dict(sub_dict).items()
out.update({key + separator + key2: val2 for key2, val2 in deeper})
elif isinstance(val, float) and round_the_float:
out[key] = format(val, float_round_format_str)
else:
out[key] = val
return out
def obj_to_json(obj: Any, sort_keys: bool = False, indent: Optional[int] = None) -> Union[bytes, None]:
if obj is None:
return None
return json.dumps(
obj,
default=datetime_handler,
ensure_ascii=False,
sort_keys=sort_keys,
indent=indent,
).encode("utf8")
def obj_to_markdown(
obj: Any,
level: int = 1,
str_enclose_start: Optional[str] = None,
str_enclose_end: Optional[str] = None,
) -> str:
key_prefix = "*" * level
markdowns = []
if is_collection(obj):
add_key = True
if hasattr(obj, "__dict__"):
item_view = obj.__dict__.items()
elif isinstance(obj, dict):
item_view = obj.items()
else:
add_key = False
item_view = enumerate(obj)
for key, val in item_view:
if add_key:
header = f"{key_prefix} {key}"
else:
header = key_prefix
if is_collection(val):
child_markdown = obj_to_markdown(
obj=val,
level=level + 1,
str_enclose_start=str_enclose_start,
str_enclose_end=str_enclose_end,
)
markdowns.append(f"{header}\n{child_markdown}")
elif str_enclose_start is not None and isinstance(val, str):
markdowns.append(
f"{header}:\n{str_enclose_start}{val}{str_enclose_end}"
)
else:
markdowns.append(f"{header}: {val}")
elif str_enclose_start is not None and isinstance(obj, str):
markdowns.append(f"{key_prefix}:\n{str_enclose_start}{obj}{str_enclose_end}")
else:
markdowns.append(f"{key_prefix}: {obj}")
return "\n".join(markdowns)
def is_collection(obj: Any) -> bool:
return isinstance(obj, (dict, list)) or hasattr(obj, "__dict__")
# Copied from searchtweets-v2 and bit modified
def convert_utc_time(datetime_str: str) -> datetime:
"""
Handles datetime argument conversion to the Labs API format, which is
`YYYY-MM-DDTHH:mm:ssZ`.
Flexible passing of date formats in the following types::
- YYYYmmDDHHMM
- YYYY-mm-DD
- YYYY-mm-DD HH:MM
- YYYY-mm-DDTHH:MM
- 2m (set start_time to two months ago)
- 3d (set start_time to three days ago)
- 12h (set start_time to twelve hours ago)
- 15m (set start_time to fifteen minutes ago)
Args:
datetime_str (str): valid formats are listed above.
Returns:
string of ISO formatted date.
"""
try:
if len(datetime_str) <= 5:
_date = datetime.utcnow()
# parse out numeric character.
num = int(datetime_str[:-1])
if "d" in datetime_str:
_date = _date + relativedelta(days=-num)
elif "h" in datetime_str:
_date = _date + relativedelta(hours=-num)
elif "m" in datetime_str:
_date = _date + relativedelta(minutes=-num)
elif "M" in datetime_str:
_date = _date + relativedelta(months=-num)
elif "Y" in datetime_str:
_date = _date + relativedelta(years=-num)
elif not {"-", ":"} & set(datetime_str):
_date = datetime.strptime(datetime_str, "%Y%m%d%H%M")
elif "T" in datetime_str:
_date = datetime.strptime(datetime_str, DATETIME_STRING_PATTERN)
else:
_date = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M")
except ValueError:
_date = datetime.strptime(datetime_str, "%Y-%m-%d")
return _date.replace(tzinfo=timezone.utc)
def convert_datetime_str_to_epoch(datetime_str: str) -> Optional[int]:
parsed_datetime = dateparser.parse(datetime_str)
if not parsed_datetime:
return None
unix_timestamp = time.mktime(parsed_datetime.timetuple())
return math.trunc(unix_timestamp)
def tag_visible(element: Any) -> bool:
if element.parent.name in [
"style",
"script",
"head",
"title",
"meta",
"[document]",
]:
return False
if isinstance(element, Comment):
return False
return True
def text_from_html(body: Union[str, bytes]) -> str:
soup = BeautifulSoup(body, "html.parser")
texts = soup.findAll(text=True)
visible_texts = filter(tag_visible, texts)
return " ".join(t.strip() for t in visible_texts)
def dict_to_object(
dictionary: Dict[str, Any],
class_name_key: Optional[str] = "_target_",
full_class_name: Optional[str] = None,
) -> Any:
new_dict: Dict[str, Any] = dict()
for k, v in dictionary.items():
if k == class_name_key:
full_class_name = v
elif isinstance(v, Dict):
new_dict[k] = dict_to_object(dictionary=v, class_name_key=class_name_key)
else:
new_dict[k] = v
if full_class_name is None:
return new_dict
module_name, class_name = tuple(full_class_name.rsplit(".", 1))
module = import_module(module_name)
class_ref = getattr(module, class_name)
return class_ref(**new_dict)
def datetime_handler(x: Any) -> Optional[Any]:
if x is None:
return None
elif isinstance(x, datetime):
return x.isoformat()
return vars(x) if hasattr(x, "__dict__") else x