Blog
June 24, 2019 Marie H.

Integrating with the Athena Health API

Integrating with the Athena Health API

Integrating with the Athena Health API

Privia Health is a physician enablement company — it manages practice operations for medical groups. That means integrating with EHR (electronic health record) systems, and one of the major ones we worked with was Athenahealth. I spent a good chunk of time building the Python integrations for appointment data, patient records, and scheduling. Here's what that actually looked like.

What the Athenahealth API Is

Athenahealth is an EHR platform used by thousands of medical practices. Their REST API exposes resources for appointments, patient demographics, clinical documents, scheduling, and billing. For backend integrations — server-to-server, no user login flow — you use the OAuth2 client credentials grant.

The API is practice-scoped. Every URL includes a practiceid segment, and every credential set is authorized against specific practices. If you're building for a multi-practice group, you need to handle multiple practice IDs in your client.

OAuth2 Client Credentials Flow

import time
import requests
from typing import Optional


class AthenaAuthClient:
    """Handles OAuth2 client credentials token lifecycle."""

    TOKEN_URL_PROD = "https://api.platform.athenahealth.com/oauth2/v1/token"
    TOKEN_URL_SANDBOX = "https://api.preview.platform.athenahealth.com/oauth2/v1/token"

    def __init__(self, client_id: str, client_secret: str, sandbox: bool = False):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = self.TOKEN_URL_SANDBOX if sandbox else self.TOKEN_URL_PROD
        self._token: Optional[str] = None
        self._token_expiry: float = 0

    def get_token(self) -> str:
        """Returns a valid access token, refreshing if necessary."""
        if self._token and time.time() < self._token_expiry - 60:
            return self._token

        response = requests.post(
            self.token_url,
            data={"grant_type": "client_credentials"},
            auth=(self.client_id, self.client_secret),
            timeout=10,
        )
        response.raise_for_status()
        data = response.json()
        self._token = data["access_token"]
        self._token_expiry = time.time() + data["expires_in"]
        return self._token

Token expiry is typically 3600 seconds. The 60-second buffer before expiry prevents edge cases where the token expires between being fetched and used.

The Full API Client: Retry, Rate Limiting, Pagination

import logging
import time
from typing import Generator, Any

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

logger = logging.getLogger(__name__)


class AthenaClient:
    BASE_URL_PROD = "https://api.platform.athenahealth.com/v1"
    BASE_URL_SANDBOX = "https://api.preview.platform.athenahealth.com/v1"

    def __init__(self, client_id: str, client_secret: str, sandbox: bool = False):
        self.base_url = self.BASE_URL_SANDBOX if sandbox else self.BASE_URL_PROD
        self.auth = AthenaAuthClient(client_id, client_secret, sandbox)
        self.session = self._build_session()

    def _build_session(self) -> requests.Session:
        session = requests.Session()
        retry = Retry(
            total=3,
            backoff_factor=2,
            status_forcelist=[500, 502, 503, 504],
            allowed_methods=["GET", "POST"],
        )
        adapter = HTTPAdapter(max_retries=retry)
        session.mount("https://", adapter)
        return session

    def _request(self, method: str, path: str, **kwargs) -> requests.Response:
        url = f"{self.base_url}{path}"
        headers = {"Authorization": f"Bearer {self.auth.get_token()}"}

        for attempt in range(4):
            response = self.session.request(
                method, url, headers=headers, timeout=30, **kwargs
            )

            if response.status_code == 429:
                # Rate limited — respect Retry-After header if present
                retry_after = int(response.headers.get("Retry-After", 10))
                logger.warning(
                    "Rate limited by Athena API. Waiting %d seconds. "
                    "Path: %s",
                    retry_after,
                    path,
                )
                time.sleep(retry_after)
                continue

            if response.status_code == 401:
                # Token may have expired unexpectedly — force refresh and retry once
                self.auth._token = None
                headers["Authorization"] = f"Bearer {self.auth.get_token()}"
                continue

            response.raise_for_status()
            return response

        raise RuntimeError(f"Failed after retries: {method} {path}")

    def get_paginated(
        self, path: str, params: dict = None
    ) -> Generator[dict, None, None]:
        """
        Yields individual records from a paginated Athena API endpoint.
        Handles offset-based pagination automatically.
        """
        params = params.copy() if params else {}
        params.setdefault("limit", 100)
        params["offset"] = 0

        while True:
            response = self._request("GET", path, params=params)
            data = response.json()

            # Athena wraps results in a key that varies by endpoint
            # totalcount tells us how many total records exist
            total = data.get("totalcount", 0)
            records_key = self._find_records_key(data)

            if not records_key:
                break

            records = data[records_key]
            yield from records

            params["offset"] += len(records)
            if params["offset"] >= total or not records:
                break

    @staticmethod
    def _find_records_key(data: dict) -> str | None:
        """Find the key containing the list of records in a paginated response."""
        skip_keys = {"totalcount", "next", "previous"}
        for key, value in data.items():
            if key not in skip_keys and isinstance(value, list):
                return key
        return None

Practice-Scoped Endpoints

Every Athena API call includes the practice ID:

def get_appointments(
    self,
    practice_id: int,
    start_date: str,
    end_date: str,
) -> Generator[dict, None, None]:
    """
    Fetch booked appointments for a practice within a date range.
    start_date, end_date: MM/DD/YYYY format (Athena's required format)
    """
    path = f"/{practice_id}/appointments/booked"
    params = {
        "startdate": start_date,
        "enddate": end_date,
        "scheduledstartdate": start_date,
        "scheduledenddate": end_date,
    }
    yield from self.get_paginated(path, params=params)


def get_patient(self, practice_id: int, patient_id: int) -> dict:
    """Fetch a single patient record."""
    path = f"/{practice_id}/patients/{patient_id}"
    response = self._request("GET", path)
    return response.json()

Rate Limiting

Athena enforces per-practice rate limits. The response headers tell you where you stand:

X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 847
X-RateLimit-Reset: 1561392000

When you hit the limit, you get a 429 Too Many Requests. The Retry-After header tells you how many seconds to wait. In practice, the limits are generous for single-practice integrations but become a real concern if you're running bulk historical data pulls across many practices simultaneously. We serialized bulk pulls per-practice and added a small sleep between requests during heavy batch jobs.

HIPAA Considerations

This is healthcare data. PHI (Protected Health Information) includes patient names, dates of birth, appointment times, diagnoses, and anything that can identify a patient in conjunction with their health status.

Rules we enforced in code:

  1. No PHI in log messages. Log patient IDs and appointment IDs for tracing, not names, DOBs, or clinical notes.
# Wrong
logger.info("Fetched appointment for John Smith, DOB 01/15/1965")

# Right
logger.info("Fetched appointment patient_id=%s appt_id=%s", patient_id, appt_id)
  1. Audit trail. Every API call that fetches patient data was logged to an append-only audit table with timestamp, user/service identity, practice ID, patient ID, and the action taken.

  2. No caching PHI in Redis without encryption. We didn't cache patient records. Appointment slot availability (non-PHI) was cacheable.

Sandbox vs. Production

Athena provides a sandbox environment at a different base URL with test practice IDs and synthetic patient data. Development and testing should always use the sandbox — the production environment contains real patient data.

# Sandbox
client = AthenaClient(
    client_id="sandbox_client_id",
    client_secret="sandbox_secret",
    sandbox=True,
)

# Test practice IDs in sandbox are documented in Athena's developer portal
appointments = list(client.get_appointments(
    practice_id=195900,  # Athena's standard sandbox practice ID
    start_date="06/01/2019",
    end_date="06/30/2019",
))

The sandbox has its own credentials — your production credentials don't work there, and vice versa. Keep them in separate environment variables and be explicit about which environment you're targeting.

Webhooks vs. Polling

Athena supports webhooks for real-time event notifications — appointment status changes, new patient check-ins, clinical document updates. For event-driven architectures this is the right approach.

The setup: you register a callback URL with Athena, they send POST requests with event payloads when things happen. You need to respond with a 200 within a few seconds, so process the payload asynchronously (push to a queue, return 200, process from the queue).

We used polling rather than webhooks for our initial integration because we were running batch jobs on a schedule — nightly appointment syncs, daily patient data reconciliation. Webhooks would have been better for appointment reminder workflows, where latency matters. We planned to move to webhooks in a follow-up phase, which is worth noting: get polling working first to understand the data model, then move to webhooks once you have confidence in your event handling infrastructure.