Source code for opentargets._retry

"""Exponential backoff retry logic (no third-party dependencies)."""

from __future__ import annotations

import asyncio
import time
from collections.abc import Callable, Coroutine
from dataclasses import dataclass, field
from typing import Any, TypeVar

from .exceptions import RateLimitError

_T = TypeVar("_T")

# ---------------------------------------------------------------------------
# Legacy module-level constants — kept for backwards-compat; config is
# now the canonical source of truth.
# ---------------------------------------------------------------------------
_RETRYABLE_STATUS = frozenset({429, 500, 502, 503, 504})
_MAX_RETRIES = 3
_BASE_DELAY = 1.0
_MAX_DELAY = 60.0


[docs] @dataclass(frozen=True) class RetryConfig: """Immutable configuration for retry behavior. Args: max_retries: Maximum number of retry attempts (0 = no retries). base_delay: Initial backoff delay in seconds. max_delay: Maximum backoff delay in seconds. retryable_statuses: Set of HTTP status codes that trigger a retry. respect_retry_after: When ``True`` (default) honour the ``Retry-After`` response header value for 429 responses. When ``False`` use exponential back-off instead. """ max_retries: int = 3 base_delay: float = 1.0 max_delay: float = 60.0 retryable_statuses: frozenset = field( # type: ignore[type-arg] default_factory=lambda: frozenset({429, 500, 502, 503, 504}) ) respect_retry_after: bool = True
DEFAULT_RETRY_CONFIG = RetryConfig() def with_retry( fn: Callable[[], _T], config: RetryConfig = DEFAULT_RETRY_CONFIG, ) -> _T: """Execute *fn* with exponential backoff on retryable HTTP errors. Retries on :class:`~opentargets.exceptions.RateLimitError` and :class:`~opentargets.exceptions.APIError` whose ``status_code`` is in ``config.retryable_statuses``. Args: fn: Zero-argument callable that performs the HTTP request. config: Retry configuration; defaults to :data:`DEFAULT_RETRY_CONFIG`. Returns: The return value of *fn* on success. Raises: The last exception raised by *fn* after all retries are exhausted. """ from .exceptions import APIError last_exc: Exception | None = None for attempt in range(config.max_retries + 1): try: return fn() except RateLimitError as exc: last_exc = exc if config.respect_retry_after and exc.retry_after is not None: delay = exc.retry_after else: delay = _backoff(attempt, config) _sleep(delay) except APIError as exc: if exc.status_code not in config.retryable_statuses: raise last_exc = exc _sleep(_backoff(attempt, config)) assert last_exc is not None raise last_exc async def with_retry_async( fn: Callable[[], Coroutine[Any, Any, _T]], config: RetryConfig = DEFAULT_RETRY_CONFIG, ) -> _T: """Async version of :func:`with_retry`. Args: fn: Zero-argument async callable that performs the HTTP request. config: Retry configuration; defaults to :data:`DEFAULT_RETRY_CONFIG`. Returns: The return value of *fn* on success. Raises: The last exception raised by *fn* after all retries are exhausted. """ from .exceptions import APIError last_exc: Exception | None = None for attempt in range(config.max_retries + 1): try: return await fn() except RateLimitError as exc: last_exc = exc if config.respect_retry_after and exc.retry_after is not None: delay = exc.retry_after else: delay = _backoff(attempt, config) await asyncio.sleep(delay) except APIError as exc: if exc.status_code not in config.retryable_statuses: raise last_exc = exc await asyncio.sleep(_backoff(attempt, config)) assert last_exc is not None raise last_exc def _backoff(attempt: int, config: RetryConfig = DEFAULT_RETRY_CONFIG) -> float: """Return delay in seconds for the given attempt index (0-based).""" delay = config.base_delay * (2.0**attempt) return delay if delay < config.max_delay else config.max_delay def _sleep(seconds: float) -> None: time.sleep(seconds)