Source code for numerapi.utils
""" collection of utility functions"""
import os
import decimal
import logging
import time
import datetime
import uuid
import json
from typing import Optional, Dict
import dateutil.parser
import requests
import tqdm
logger = logging.getLogger(__name__)
[docs]
def parse_datetime_string(string: str) -> Optional[datetime.datetime]:
"""try to parse string to datetime object"""
if string is None:
return None
return dateutil.parser.parse(string)
[docs]
def parse_float_string(string: str) -> Optional[float]:
"""try to parse string to decimal.Decimal object"""
if string is None:
return None
try:
val = decimal.Decimal(string.replace(",", ""))
except decimal.InvalidOperation:
val = None
return val
[docs]
def replace(dictionary: Dict, key: str, function):
"""apply a function to dict item"""
if dictionary is not None and key in dictionary:
dictionary[key] = function(dictionary[key])
[docs]
def download_file(url: str, dest_path: str, show_progress_bars: bool = True):
"""downloads a file and shows a progress bar. allow resuming a download"""
file_size = 0
req = requests.get(url, stream=True, timeout=600)
req.raise_for_status()
# Total size in bytes.
total_size = int(req.headers.get('content-length', 0))
temp_path = dest_path + ".temp"
if os.path.exists(dest_path):
logger.info("target file already exists")
file_size = os.stat(dest_path).st_size # File size in bytes
if file_size == total_size:
# Download complete
logger.info("download complete")
return dest_path
if os.path.exists(temp_path):
file_size = os.stat(temp_path).st_size # File size in bytes
if file_size < total_size:
# Download incomplete
logger.info("resuming download")
resume_header = {'Range': f'bytes={file_size}-'}
req = requests.get(url, headers=resume_header, stream=True,
verify=False, allow_redirects=True, timeout=600)
else:
# Error, delete file and restart download
logger.error("deleting file and restarting")
os.remove(temp_path)
file_size = 0
else:
# File does not exist, starting download
logger.info("starting download")
# write dataset to file and show progress bar
pbar = tqdm.tqdm(total=total_size, unit='B', unit_scale=True,
desc=dest_path, disable=not show_progress_bars)
# Update progress bar to reflect how much of the file is already downloaded
pbar.update(file_size)
with open(temp_path, "ab") as dest_file:
for chunk in req.iter_content(1024):
dest_file.write(chunk)
pbar.update(1024)
# move temp file to target destination
os.replace(temp_path, dest_path)
return dest_path
[docs]
def post_with_err_handling(url: str, body: str, headers: Dict,
timeout: Optional[int] = None,
retries: int = 3, delay: int = 1, backoff: int = 2
) -> Dict:
"""send `post` request and handle (some) errors that might occur"""
try:
resp = requests.post(url, json=body, headers=headers, timeout=timeout)
while 500 <= resp.status_code < 600 and retries > 1:
time.sleep(delay)
delay *= backoff
retries -= 1
resp = requests.post(url, json=body,
headers=headers, timeout=timeout)
resp.raise_for_status()
return resp.json()
except requests.exceptions.HTTPError as err:
logger.error(f"Http Error: {err}")
except requests.exceptions.ConnectionError as err:
logger.error(f"Error Connecting: {err}")
except requests.exceptions.Timeout as err:
logger.error(f"Timeout Error: {err}")
except requests.exceptions.RequestException as err:
logger.error(f"Oops, something went wrong: {err}")
except json.decoder.JSONDecodeError as err:
logger.error(f"Did not receive a valid JSON: {err}")
return {}
[docs]
def is_valid_uuid(val: str) -> bool:
""" check if the given string is a valid UUID """
try:
uuid.UUID(str(val))
return True
except ValueError:
return False