|
|
""" |
|
|
HTTP utility functions for making synchronous and asynchronous HTTP requests. |
|
|
|
|
|
This module provides a unified interface for HTTP operations using httpx, |
|
|
with proper error handling, timeout configuration, and retry logic. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import os |
|
|
import time |
|
|
import uuid |
|
|
from typing import Any, Dict, Optional |
|
|
|
|
|
import httpx |
|
|
|
|
|
from utils.bio_logger import bio_logger as logger |
|
|
|
|
|
|
|
|
class HTTPError(Exception): |
|
|
"""Custom exception for HTTP-related errors.""" |
|
|
|
|
|
def __init__(self, status_code: int, message: str, url: str): |
|
|
self.status_code = status_code |
|
|
self.message = message |
|
|
self.url = url |
|
|
super().__init__(f"HTTP {status_code}: {message} for {url}") |
|
|
|
|
|
|
|
|
def _create_timeout(timeout: float = 10.0) -> httpx.Timeout: |
|
|
"""Create a timeout configuration for HTTP requests.""" |
|
|
return httpx.Timeout(timeout, connect=5.0) |
|
|
|
|
|
|
|
|
def _handle_response(response: httpx.Response, url: str) -> Any: |
|
|
"""Handle HTTP response and raise appropriate exceptions.""" |
|
|
if response.status_code == 200: |
|
|
return response.json() |
|
|
|
|
|
logger.error(f"HTTP request failed: {response.status_code} for {url}") |
|
|
raise HTTPError( |
|
|
status_code=response.status_code, |
|
|
message=f"Request failed with status {response.status_code}", |
|
|
url=url, |
|
|
) |
|
|
|
|
|
|
|
|
async def async_http_get( |
|
|
url: str, |
|
|
params: Optional[Dict[str, Any]] = None, |
|
|
timeout: float = 10.0, |
|
|
headers: Optional[Dict[str, str]] = None, |
|
|
) -> Any: |
|
|
""" |
|
|
Make an asynchronous HTTP GET request. |
|
|
|
|
|
Args: |
|
|
url: The URL to make the request to |
|
|
params: Query parameters to include in the request |
|
|
timeout: Request timeout in seconds |
|
|
headers: Optional headers to include in the request |
|
|
|
|
|
Returns: |
|
|
The JSON response from the server |
|
|
|
|
|
Raises: |
|
|
HTTPError: If the request fails |
|
|
httpx.RequestError: If there's a network error |
|
|
""" |
|
|
timeout_config = _create_timeout(timeout) |
|
|
start_time = time.time() |
|
|
|
|
|
async with httpx.AsyncClient(timeout=timeout_config) as client: |
|
|
response = await client.get(url=url, params=params, headers=headers) |
|
|
duration = time.time() - start_time |
|
|
|
|
|
|
|
|
logger.log_api_call("GET", url, response.status_code, duration) |
|
|
|
|
|
return _handle_response(response, url) |
|
|
|
|
|
|
|
|
def http_get( |
|
|
url: str, |
|
|
params: Optional[Dict[str, Any]] = None, |
|
|
timeout: float = 10.0, |
|
|
headers: Optional[Dict[str, str]] = None, |
|
|
) -> Any: |
|
|
""" |
|
|
Make a synchronous HTTP GET request. |
|
|
|
|
|
Args: |
|
|
url: The URL to make the request to |
|
|
params: Query parameters to include in the request |
|
|
timeout: Request timeout in seconds |
|
|
headers: Optional headers to include in the request |
|
|
|
|
|
Returns: |
|
|
The JSON response from the server |
|
|
|
|
|
Raises: |
|
|
HTTPError: If the request fails |
|
|
httpx.RequestError: If there's a network error |
|
|
""" |
|
|
timeout_config = _create_timeout(timeout) |
|
|
start_time = time.time() |
|
|
|
|
|
with httpx.Client(timeout=timeout_config) as client: |
|
|
response = client.get(url=url, params=params, headers=headers) |
|
|
duration = time.time() - start_time |
|
|
|
|
|
|
|
|
logger.log_api_call("GET", url, response.status_code, duration) |
|
|
|
|
|
return _handle_response(response, url) |
|
|
|
|
|
|
|
|
def http_post( |
|
|
url: str, data: Any, headers: Optional[Dict[str, Any]] = None, timeout: float = 10.0 |
|
|
) -> Any: |
|
|
""" |
|
|
Make a synchronous HTTP POST request. |
|
|
|
|
|
Args: |
|
|
url: The URL to make the request to |
|
|
data: The data to send in the request body |
|
|
headers: Optional headers to include in the request |
|
|
timeout: Request timeout in seconds |
|
|
|
|
|
Returns: |
|
|
The JSON response from the server |
|
|
|
|
|
Raises: |
|
|
HTTPError: If the request fails |
|
|
httpx.RequestError: If there's a network error |
|
|
""" |
|
|
timeout_config = _create_timeout(timeout) |
|
|
start_time = time.time() |
|
|
|
|
|
with httpx.Client(timeout=timeout_config) as client: |
|
|
response = client.post(url=url, json=data, headers=headers) |
|
|
duration = time.time() - start_time |
|
|
|
|
|
|
|
|
logger.log_api_call("POST", url, response.status_code, duration) |
|
|
|
|
|
return _handle_response(response, url) |
|
|
|
|
|
|
|
|
async def async_http_post( |
|
|
url: str, |
|
|
data: Any, |
|
|
headers: Optional[Dict[str, Any]] = None, |
|
|
timeout: float = 10.0, |
|
|
max_retries: int = 3, |
|
|
retry_delay: float = 0.5, |
|
|
) -> Any: |
|
|
""" |
|
|
Make an asynchronous HTTP POST request with retry logic. |
|
|
|
|
|
Args: |
|
|
url: The URL to make the request to |
|
|
data: The data to send in the request body |
|
|
headers: Optional headers to include in the request |
|
|
timeout: Request timeout in seconds |
|
|
max_retries: Maximum number of retry attempts |
|
|
retry_delay: Delay between retries in seconds |
|
|
|
|
|
Returns: |
|
|
The JSON response from the server |
|
|
|
|
|
Raises: |
|
|
HTTPError: If the request fails after all retries |
|
|
httpx.RequestError: If there's a network error |
|
|
""" |
|
|
timeout_config = _create_timeout(timeout) |
|
|
|
|
|
async with httpx.AsyncClient(timeout=timeout_config) as client: |
|
|
for attempt in range(1, max_retries + 1): |
|
|
try: |
|
|
start_time = time.time() |
|
|
response = await client.post(url=url, json=data, headers=headers) |
|
|
duration = time.time() - start_time |
|
|
|
|
|
|
|
|
logger.log_api_call("POST", url, response.status_code, duration) |
|
|
|
|
|
if response.status_code == 200: |
|
|
return response.json() |
|
|
else: |
|
|
logger.error( |
|
|
f"HTTP POST failed (attempt {attempt}/{max_retries}): " |
|
|
f"{response.status_code} for {url}" |
|
|
) |
|
|
if attempt < max_retries: |
|
|
await asyncio.sleep(retry_delay) |
|
|
else: |
|
|
raise HTTPError( |
|
|
status_code=response.status_code, |
|
|
message=f"Request failed after {max_retries} attempts", |
|
|
url=url, |
|
|
) |
|
|
except httpx.RequestError as e: |
|
|
logger.error(f"Network error on attempt {attempt}: {e}") |
|
|
if attempt < max_retries: |
|
|
await asyncio.sleep(retry_delay) |
|
|
else: |
|
|
raise HTTPError( |
|
|
status_code=0, |
|
|
message=f"Network error after {max_retries} attempts: {str(e)}", |
|
|
url=url, |
|
|
) from e |
|
|
|
|
|
raise HTTPError( |
|
|
status_code=0, |
|
|
message=f"Failed to fetch data from {url} after {max_retries} attempts", |
|
|
url=url, |
|
|
) |
|
|
|
|
|
|
|
|
def download_file( |
|
|
file_url: str, directory_path: str, timeout: int = 60, verify_ssl: bool = True |
|
|
) -> Optional[str]: |
|
|
""" |
|
|
Download a file from a URL to a local directory. |
|
|
|
|
|
Args: |
|
|
file_url: The URL of the file to download |
|
|
directory_path: The directory to save the file in |
|
|
timeout: Request timeout in seconds |
|
|
verify_ssl: Whether to verify SSL certificates |
|
|
|
|
|
Returns: |
|
|
The path to the downloaded file, or None if download failed |
|
|
""" |
|
|
|
|
|
file_extension = file_url.split(".")[-1].split("?")[0] |
|
|
random_filename = f"{uuid.uuid4()}.{file_extension}" |
|
|
|
|
|
|
|
|
os.makedirs(directory_path, exist_ok=True) |
|
|
file_path = os.path.join(directory_path, random_filename) |
|
|
|
|
|
try: |
|
|
with httpx.Client(timeout=timeout, verify=verify_ssl) as client: |
|
|
with client.stream("GET", file_url) as response: |
|
|
if response.status_code == 200: |
|
|
with open(file_path, "wb") as file: |
|
|
for chunk in response.iter_bytes(chunk_size=8192): |
|
|
file.write(chunk) |
|
|
logger.info(f"Successfully downloaded file to {file_path}") |
|
|
return file_path |
|
|
else: |
|
|
logger.error( |
|
|
f"Download failed with status code: {response.status_code}" |
|
|
) |
|
|
return None |
|
|
except httpx.TimeoutException: |
|
|
logger.error("Download request timed out") |
|
|
return None |
|
|
except httpx.RequestError as e: |
|
|
logger.error(f"Download request failed: {e}") |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error during download: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
async def async_http_post_legacy(url: str, params: dict) -> Any: |
|
|
""" |
|
|
Legacy async HTTP POST function for backward compatibility. |
|
|
|
|
|
This function maintains the old interface but uses the new implementation. |
|
|
""" |
|
|
return await async_http_post(url=url, data=params) |
|
|
|