Source code for opentargets._async_client

"""Async public client for the Open Targets Platform."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Optional, cast

from ._async_graphql import AsyncGraphQLClient
from ._cache import TTLCache, _NoCache
from ._queries.disease import DISEASE_QUERY, DISEASE_TARGETS_QUERY
from ._queries.drug import DRUG_INDICATIONS_QUERY, DRUG_QUERY
from ._queries.search import SEARCH_QUERY
from ._queries.target import (
    TARGET_ASSOCIATIONS_QUERY,
    TARGET_DRUGS_QUERY,
    TARGET_QUERY,
    TARGETS_BATCH_QUERY,
)
from .client import (
    _parse_disease,
    _parse_disease_association,
    _parse_drug,
    _parse_drug_indication,
    _parse_target,
    _parse_target_association,
    _to_dataframe,
)
from .exceptions import NotFoundError
from .models import (
    Association,
    Disease,
    Drug,
    DrugIndication,
    SearchResult,
    Target,
)

if TYPE_CHECKING:
    import pandas as pd

_DEFAULT_URL = "https://api.platform.opentargets.org/api/v4/graphql"


[docs] class AsyncOpenTargetsClient: """Asynchronous client for the Open Targets Platform GraphQL API. Mirrors :class:`~opentargets.OpenTargetsClient` with ``async``/``await`` semantics. Use with ``asyncio.gather`` to query hundreds of targets concurrently. Args: base_url: GraphQL endpoint. Override for self-hosted instances. timeout: HTTP timeout in seconds. cache: Set to ``False`` to disable in-memory caching. cache_ttl: Cache entry lifetime in seconds (default 5 min). Example:: import asyncio from opentargets import AsyncOpenTargetsClient async def main(): async with AsyncOpenTargetsClient() as client: target = await client.get_target("EGFR") print(target.approved_name) asyncio.run(main()) """ def __init__( self, base_url: str = _DEFAULT_URL, timeout: float = 30.0, cache: bool = True, cache_ttl: float = 300.0, ) -> None: self._gql = AsyncGraphQLClient(base_url=base_url, timeout=timeout) _sym: TTLCache[str, str] = TTLCache(ttl=cache_ttl) if cache else _NoCache() _res: TTLCache[str, Any] = TTLCache(ttl=cache_ttl) if cache else _NoCache() self._symbol_cache = _sym self._result_cache = _res # ------------------------------------------------------------------ # Target queries # ------------------------------------------------------------------
[docs] async def get_target(self, target_id: str) -> Target: """Fetch a single target by Ensembl ID or gene symbol. Args: target_id: Ensembl gene ID (``ENSG…``) or HGNC symbol (``EGFR``). Returns: A :class:`~opentargets.models.Target` instance. Raises: NotFoundError: If no target matches *target_id*. """ ensembl_id = await self._resolve_target(target_id) cache_key = f"target:{ensembl_id}" cached = self._result_cache.get(cache_key) if cached is not None: return cast(Target, cached) data = await self._gql.execute(TARGET_QUERY, {"ensemblId": ensembl_id}) raw = data.get("target") if not raw: raise NotFoundError("target", target_id) target = _parse_target(raw) self._result_cache.set(cache_key, target) return target
[docs] async def get_targets(self, target_ids: list[str]) -> list[Target]: """Fetch multiple targets in a single API request. Args: target_ids: List of Ensembl IDs or gene symbols. Returns: List of :class:`~opentargets.models.Target` instances (same order as input). """ ensembl_ids = [await self._resolve_target(t) for t in target_ids] data = await self._gql.execute(TARGETS_BATCH_QUERY, {"ids": ensembl_ids}) raws: list[dict[str, Any]] = data.get("targets") or [] by_id = {r["id"]: _parse_target(r) for r in raws} return [by_id[eid] for eid in ensembl_ids if eid in by_id]
[docs] async def get_target_associations( self, target_id: str, limit: int = 25, as_dataframe: bool = False, ) -> list[Association] | pd.DataFrame: """Fetch diseases associated with a target. Args: target_id: Ensembl ID or gene symbol. limit: Maximum number of associations to return. as_dataframe: Return a ``pandas.DataFrame`` instead of a list. Returns: List of :class:`~opentargets.models.Association` objects, or a ``DataFrame`` when *as_dataframe* is ``True``. """ ensembl_id = await self._resolve_target(target_id) rows = await self._gql.paginate( TARGET_ASSOCIATIONS_QUERY, {"ensemblId": ensembl_id}, data_path=["target", "associatedDiseases"], size=min(limit, 25), ) rows = rows[:limit] symbol = "" data_raw = await self._gql.execute(TARGET_QUERY, {"ensemblId": ensembl_id}) if data_raw.get("target"): symbol = data_raw["target"].get("approvedSymbol", "") associations = [_parse_target_association(r, ensembl_id, symbol) for r in rows] if as_dataframe: return _to_dataframe(associations) return associations
[docs] async def get_target_drugs(self, target_id: str) -> list[Drug]: """Fetch drugs known to interact with a target. Args: target_id: Ensembl ID or gene symbol. Returns: List of :class:`~opentargets.models.Drug` objects. """ ensembl_id = await self._resolve_target(target_id) data = await self._gql.execute(TARGET_DRUGS_QUERY, {"ensemblId": ensembl_id}) rows = (data.get("target") or {}).get("drugAndClinicalCandidates", {}).get( "rows" ) or [] return [_parse_drug(r["drug"]) for r in rows if "drug" in r]
# ------------------------------------------------------------------ # Disease queries # ------------------------------------------------------------------
[docs] async def get_disease(self, disease_id: str) -> Disease: """Fetch a single disease by EFO identifier. Args: disease_id: EFO identifier (e.g. ``EFO_0003060``). Returns: A :class:`~opentargets.models.Disease` instance. Raises: NotFoundError: If no disease matches *disease_id*. """ cache_key = f"disease:{disease_id}" cached = self._result_cache.get(cache_key) if cached is not None: return cast(Disease, cached) data = await self._gql.execute(DISEASE_QUERY, {"efoId": disease_id}) raw = data.get("disease") if not raw: raise NotFoundError("disease", disease_id) disease = _parse_disease(raw) self._result_cache.set(cache_key, disease) return disease
[docs] async def get_disease_targets( self, disease_id: str, limit: int = 25, as_dataframe: bool = False, ) -> list[Association] | pd.DataFrame: """Fetch targets associated with a disease. Args: disease_id: EFO identifier. limit: Maximum number of associations to return. as_dataframe: Return a ``pandas.DataFrame`` instead of a list. Returns: List of :class:`~opentargets.models.Association` objects or a DataFrame. """ rows = await self._gql.paginate( DISEASE_TARGETS_QUERY, {"efoId": disease_id}, data_path=["disease", "associatedTargets"], size=min(limit, 25), ) rows = rows[:limit] disease_name = "" data_raw = await self._gql.execute(DISEASE_QUERY, {"efoId": disease_id}) if data_raw.get("disease"): disease_name = data_raw["disease"].get("name", "") associations = [ _parse_disease_association(r, disease_id, disease_name) for r in rows ] if as_dataframe: return _to_dataframe(associations) return associations
# ------------------------------------------------------------------ # Drug queries # ------------------------------------------------------------------
[docs] async def get_drug(self, drug_id: str) -> Drug: """Fetch a single drug by ChEMBL identifier. Args: drug_id: ChEMBL ID (e.g. ``CHEMBL939``). Returns: A :class:`~opentargets.models.Drug` instance. Raises: NotFoundError: If no drug matches *drug_id*. """ cache_key = f"drug:{drug_id}" cached = self._result_cache.get(cache_key) if cached is not None: return cast(Drug, cached) data = await self._gql.execute(DRUG_QUERY, {"chemblId": drug_id}) raw = data.get("drug") if not raw: raise NotFoundError("drug", drug_id) drug = _parse_drug(raw) self._result_cache.set(cache_key, drug) return drug
[docs] async def get_drug_indications(self, drug_id: str) -> list[DrugIndication]: """Fetch disease indications for a drug. Args: drug_id: ChEMBL ID. Returns: List of :class:`~opentargets.models.DrugIndication` objects. """ data = await self._gql.execute(DRUG_INDICATIONS_QUERY, {"chemblId": drug_id}) rows = (data.get("drug") or {}).get("indications", {}).get("rows") or [] return [_parse_drug_indication(r) for r in rows]
# ------------------------------------------------------------------ # Search # ------------------------------------------------------------------
[docs] async def search( self, query_string: str, entity_type: Optional[str] = None, limit: int = 10, ) -> list[SearchResult]: """Search the platform for targets, diseases, or drugs. Args: query_string: Free-text search string. entity_type: Filter by ``"target"``, ``"disease"``, or ``"drug"``. Pass ``None`` to search all entity types. limit: Maximum number of results. Returns: List of :class:`~opentargets.models.SearchResult` objects. """ entity_names = [entity_type] if entity_type else [] data = await self._gql.execute( SEARCH_QUERY, { "queryString": query_string, "entityNames": entity_names, "page": {"index": 0, "size": limit}, }, ) hits: list[dict[str, Any]] = (data.get("search") or {}).get("hits") or [] return [SearchResult.model_validate(h) for h in hits]
# ------------------------------------------------------------------ # Association queries # ------------------------------------------------------------------
[docs] async def get_associations( self, target_id: str, disease_id: str, ) -> Optional[Association]: """Fetch the association between a specific target and disease. Args: target_id: Ensembl ID or gene symbol. disease_id: EFO identifier. Returns: An :class:`~opentargets.models.Association` or ``None`` if no association exists. """ ensembl_id = await self._resolve_target(target_id) rows = await self._gql.paginate( TARGET_ASSOCIATIONS_QUERY, {"ensemblId": ensembl_id}, data_path=["target", "associatedDiseases"], size=25, ) match = next( (r for r in rows if (r.get("disease") or {}).get("id") == disease_id), None, ) if match is None: return None symbol = "" d = await self._gql.execute(TARGET_QUERY, {"ensemblId": ensembl_id}) if d.get("target"): symbol = d["target"].get("approvedSymbol", "") return _parse_target_association(match, ensembl_id, symbol)
# ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------
[docs] async def close(self) -> None: """Close the underlying HTTP connection pool.""" await self._gql.close()
async def __aenter__(self) -> AsyncOpenTargetsClient: return self async def __aexit__(self, *_: object) -> None: await self.close() # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ async def _resolve_target(self, target_id: str) -> str: """Return Ensembl ID for *target_id*, resolving gene symbols via search.""" if target_id.upper().startswith("ENSG"): return target_id cached = self._symbol_cache.get(target_id.upper()) if cached is not None: return cached results = await self.search(target_id, entity_type="target", limit=1) if not results: raise NotFoundError("target", target_id) ensembl_id = results[0].id self._symbol_cache.set(target_id.upper(), ensembl_id) return ensembl_id