Source code for numerapi.utils

""" collection of utility functions"""

import os
import decimal
import logging
import time
import datetime
import uuid
import json

import dateutil.parser
import requests
import tqdm

logger = logging.getLogger(__name__)


[docs] def load_secrets() -> tuple: """load secrets from environment variables or dotenv file""" try: from dotenv import load_dotenv # pylint: disable-msg=import-outside-toplevel load_dotenv() except ImportError: pass public_id = os.getenv("NUMERAI_PUBLIC_ID") secret_key = os.getenv("NUMERAI_SECRET_KEY") return public_id, secret_key
[docs] def parse_datetime_string(string: str) -> datetime.datetime | None: """try to parse string to datetime object""" if string is None: return None return dateutil.parser.parse(string)
[docs] def parse_float_string(string: str) -> decimal.Decimal | None: """try to parse string to decimal.Decimal object""" if string is None: return None try: val = decimal.Decimal(string.replace(",", "")) except decimal.InvalidOperation: val = None return val
[docs] def replace(dictionary: dict, key: str, function): """apply a function to dict item""" if dictionary is not None and key in dictionary: dictionary[key] = function(dictionary[key])
[docs] def download_file(url: str, dest_path: str, show_progress_bars: bool = True): """downloads a file and shows a progress bar. allow resuming a download""" file_size = 0 req = requests.get(url, stream=True, timeout=600) req.raise_for_status() # Total size in bytes. total_size = int(req.headers.get('content-length', 0)) temp_path = dest_path + ".temp" if os.path.exists(dest_path): logger.info("target file already exists") file_size = os.stat(dest_path).st_size # File size in bytes if file_size == total_size: # Download complete logger.info("download complete") return dest_path if os.path.exists(temp_path): file_size = os.stat(temp_path).st_size # File size in bytes if file_size < total_size: # Download incomplete logger.info("resuming download") resume_header = {'Range': f'bytes={file_size}-'} req = requests.get(url, headers=resume_header, stream=True, verify=False, allow_redirects=True, timeout=600) else: # Error, delete file and restart download logger.error("deleting file and restarting") os.remove(temp_path) file_size = 0 else: # File does not exist, starting download logger.info("starting download") # write dataset to file and show progress bar pbar = tqdm.tqdm(total=total_size, unit='B', unit_scale=True, desc=dest_path, disable=not show_progress_bars) # Update progress bar to reflect how much of the file is already downloaded pbar.update(file_size) with open(temp_path, "ab") as dest_file: for chunk in req.iter_content(1024): dest_file.write(chunk) pbar.update(1024) # move temp file to target destination os.replace(temp_path, dest_path) return dest_path
[docs] def post_with_err_handling(url: str, body: dict, headers: dict, *, timeout: int | None = None, retries: int = 3, delay: int = 1, backoff: int = 2 ) -> dict: """send `post` request and handle (some) errors that might occur""" try: resp = requests.post(url, json=body, headers=headers, timeout=timeout) while 500 <= resp.status_code < 600 and retries > 1: time.sleep(delay) delay *= backoff retries -= 1 resp = requests.post(url, json=body, headers=headers, timeout=timeout) resp.raise_for_status() return resp.json() except requests.exceptions.HTTPError as err: logger.error(f"Http Error: {err}") except requests.exceptions.ConnectionError as err: logger.error(f"Error Connecting: {err}") except requests.exceptions.Timeout as err: logger.error(f"Timeout Error: {err}") except requests.exceptions.RequestException as err: logger.error(f"Oops, something went wrong: {err}") except json.decoder.JSONDecodeError as err: logger.error(f"Did not receive a valid JSON: {err}") return {}
[docs] def is_valid_uuid(val: str) -> bool: """ check if the given string is a valid UUID """ try: uuid.UUID(str(val)) return True except ValueError: return False