πŸš€ Building Resilient Azure Functions: Mastering Microsoft Graph API Calls with Python

Interacting with external APIs like Microsoft Graph is a common task for Azure Functions. Whether you’re fetching SharePoint list data, managing users, or sending emails, ensuring your function can handle transient network issues, API throttling, and other temporary errors is crucial for reliability. This article explores how to fortify your Python Azure Functions for robust communication with Microsoft Graph, focusing on implementing effective retry mechanisms and handling API throttling gracefully.

We’ll break down the enhancements made to a Python Azure Function designed to fetch and process SharePoint list data, turning it into a more resilient and production-ready solution.

Note: This is the continuation of a series of post. Please go through the below post to better understand the full context.

Part 1 – Build a Python Azure Function to Connect with SharePoint Online via Microsoft Graph API

Part 2 – Secure Python Azure Function Using Azure Key Vault and Managed Identity

Part 3 – Improving Python Azure Function Performance: Handling Large SharePoint Lists with Throttling, Caching, and Batch Requests

Part 4 – Robust Authentication with Microsoft Graph API (Using MSAL and Service Principals)

The Challenge: The Unpredictable Nature of API Calls ⚠️

When your Azure Function calls an API like Microsoft Graph, several issues can arise:

  • Network Glitches 🌐: Temporary network interruptions can cause requests to fail.
  • API Throttling ⏳: Services like Microsoft Graph enforce rate limits. If your function makes too many requests in a short period, it will be throttled (often receiving a 429 Too Many Requests error).
  • Transient Server Errors πŸ› οΈ: The API service itself might experience temporary issues, returning 5xx server errors (e.g., 503 Service Unavailable).

Without proper handling, these issues can lead to failed function executions, incomplete data processing, and a poor user experience.

The Solution: A Centralized, Intelligent Retry Strategy πŸ›‘οΈ

The core of our resilient solution lies in a dedicated function, perform_request_with_retry. This function wraps around standard requests calls and incorporates several best practices:

  • Automatic Retries: Retries failed requests a configurable number of times.
  • Exponential Backoff: Gradually increases the delay between retries (e.g., 1s, 2s, 4s…) to avoid overwhelming the API.
  • Throttling Awareness: Specifically listens for 429, 503, and 504 status codes.
  • Retry-After Header Support: If the API provides a Retry-After header (common with 429 responses), the function respects this specific delay.
  • Session Management: Utilizes requests.Session() for connection pooling, improving performance for multiple calls to the same host.
  • Clear Logging: Provides informative logs for each retry attempt and error.

Let’s look at this key function and how it’s integrated.

Code Deep Dive: Implementing Resiliency πŸ’»

The perform_request_with_retry Function

This is the workhorse for all Graph API calls.

def perform_request_with_retry(method: str, url: str, access_token: str, params: dict = None, json_payload: dict = None, max_retries: int = 5, session: requests.Session = None) -> requests.Response:
    """
    Performs an HTTP request with retries for throttling and transient errors.
    Uses a provided session or creates a new one.
    """
    local_session = session or requests.Session()
    delay = 1 # Initial delay in seconds
    headers = get_headers(access_token)

    for attempt in range(max_retries):
        try:
            response = None
            if method.upper() == "GET":
                response = local_session.get(url, headers=headers, params=params)
            elif method.upper() == "POST":
                response = local_session.post(url, headers=headers, json=json_payload, params=params)
            elif method.upper() == "PATCH":
                response = local_session.patch(url, headers=headers, json=json_payload, params=params)
            elif method.upper() == "DELETE":
                response = local_session.delete(url, headers=headers, params=params)
            else:
                raise ValueError(f"Unsupported HTTP method: {method}")

            if response.status_code in [429, 503, 504]: # Throttling or temporary server issues
                retry_after = int(response.headers.get("Retry-After", delay))
                logging.warning(
                    f"Request to {url} {method} Throttled/Unavailable (Status {response.status_code}). "
                    f"Retrying in {retry_after}s (Attempt {attempt + 1}/{max_retries})"
                )
                time.sleep(retry_after)
                delay = min(delay * 2, 60) # Exponential backoff, max 60s
                continue # Retry the loop

            response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) not handled above
            return response

        except requests.exceptions.RequestException as e: # Catches network errors, timeouts, etc.
            logging.warning(
                f"Request to {url} {method} failed: {e} (Attempt {attempt + 1}/{max_retries})"
            )
            if attempt < max_retries - 1:
                time.sleep(delay)
                delay = min(delay * 2, 60)
            else:
                logging.error(
                    f"Max retries reached for {url} {method}. Last error: {e}"
                )
                raise # Re-raise the last exception after max retries
        finally:
            if not session and local_session: # If session was created locally, close it
                local_session.close()
                
    raise Exception(f"Request {method} {url} failed after {max_retries} retries without a conclusive response or error.")

Integrating Resiliency into Helper Functions πŸ”—

Previously, functions like get_sharepoint_site_id or get_list_id might have made direct requests.get() calls. Now, they all leverage perform_request_with_retry:

Example: get_sharepoint_site_id

def get_sharepoint_site_id(hostname, site_path, access_token, session: requests.session = None):
    if site_path:
        endpoint = f"{GRAPH_API_BASE}/sites/{hostname}:{'/' + site_path if not site_path.startswith('/') else site_path}"
    else:
        endpoint = f"{GRAPH_API_BASE}/sites/{hostname}"

    response = perform_request_with_retry(method="GET", access_token=access_token, url=endpoint, session=session)
    response.raise_for_status()
    return response.json().get("id")

Efficient Session Management in main βš™οΈ

# In the main function:
# ...
graph_session = requests.Session()
try:
    # ...
    access_token = acquire_graph_api_token(tenant_id, client_id, client_secret, [scope])
    site_id = get_sharepoint_site_id(hostname, site_path, session=graph_session, access_token=access_token)
    list_id = get_list_id(site_id, list_name, access_token=access_token, session=graph_session)
    # ...
finally:
    if graph_session:
        graph_session.close()
# ...

Benefits of This Approach βœ…

  • Increased Reliability: The function can automatically recover from many common transient issues.
  • Improved Robustness: Less prone to failures due to API throttling.
  • Better Performance:requests.Session reuses connections, reducing latency. Batching further optimizes data retrieval.
  • Simplified Code: Centralizes retry logic, making individual API call functions cleaner.
  • Enhanced Logging: Provides better insights into API interactions and any issues encountered.

Full Updated Code πŸ“„

import json
import logging
import os
import requests
from msal import ConfidentialClientApplication
import azure.functions as func
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
import time

GRAPH_API_BASE = "https://graph.microsoft.com/v1.0"
# Set Key Vault URL
KEYVAULT_URL = "https://kvpythonwo.vault.azure.net/"
# Cache for lookup values
lookup_cache = {}

# Global variable to hold the MSAL app instance
_msal_app_instance = None

# --- Authentication Functions ---
def _initialize_confidential_client_app(tenant_id: str, client_id: str, client_secret: str) -> ConfidentialClientApplication:
    """
    Initializes and returns a ConfidentialClientApplication instance.
    This instance is cached globally to leverage MSAL's in-memory token caching.
    """
    global _msal_app_instance
    if _msal_app_instance is None:
        logging.info("Initializing MSAL ConfidentialClientApplication.")
        authority = f"https://login.microsoftonline.com/{tenant_id}"
        _msal_app_instance = ConfidentialClientApplication(
            client_id,
            authority=authority,
            client_credential=client_secret
        )
        # For more advanced scenarios, you could configure token cache serialization here
        # if you need token persistence beyond the lifetime of this specific function instance
        # (e.g., using a distributed cache like Redis).
        # For Azure Functions, the in-memory cache is per warm instance.
    return _msal_app_instance

def acquire_graph_api_token(tenant_id: str, client_id: str, client_secret: str, scopes: list) -> str:
    """
    Acquires an access token for Microsoft Graph API using the client credentials flow.
    It leverages a globally cached ConfidentialClientApplication instance for efficiency.

    Args:
        tenant_id: Your Azure AD tenant ID.
        client_id: The Application (client) ID of your Azure AD app registration.
        client_secret: The client secret for your Azure AD app registration.
        scopes: A list of scopes, e.g., ["https://graph.microsoft.com/.default"].

    Returns:
        The access token string.

    Raises:
        Exception: If token acquisition fails.
    """
    if not scopes:
        raise ValueError("Scopes list cannot be empty. Typically, use ['https://graph.microsoft.com/.default'].")

    app = _initialize_confidential_client_app(tenant_id, client_id, client_secret)

    # The ConfidentialClientApplication object will automatically cache the token.
    # Subsequent calls to acquire_token_for_client (with the same scopes) on the same
    # app instance will return the cached token if it's still valid, or acquire a new one if needed.
    result = app.acquire_token_for_client(scopes=scopes)

    if "access_token" in result:
        # Log successful acquisition; MSAL handles the actual caching.
        # You can inspect result['expires_in'] or result.get('token_type') if needed.
        logging.info("Successfully acquired Graph API token.")
        return result['access_token']
    else:
        error = result.get("error")
        error_description = result.get("error_description", "No error description provided.")
        correlation_id = result.get("correlation_id") # Crucial for troubleshooting with Microsoft support
        
        log_message = (
            f"Failed to acquire Graph API token. "
            f"Error: {error}, Description: {error_description}, "
            f"Correlation ID: {correlation_id}. "
            f"Check an AAD sign-in log for more details."
        )
        logging.error(log_message)
        # You might want to include more details from the 'result' object if available and helpful
        # For example, some errors might include 'claims' or specific 'error_codes'.
        raise Exception(f"Graph API token acquisition failed: {error_description} (Correlation ID: {correlation_id})")

# --- End of Authentication Functions ---
    
def get_headers(access_token):
    return {
        "Authorization": f"Bearer {access_token}",
        "Accept": "application/json",
        "Content-Type": "application/json"
    }
    
def get_sharepoint_site_id(hostname, site_path, access_token, session: requests.session = None):
    if site_path:
        endpoint = f"{GRAPH_API_BASE}/sites/{hostname}:{'/' + site_path if not site_path.startswith('/') else site_path}"
    else:
        endpoint = f"{GRAPH_API_BASE}/sites/{hostname}"

    response = perform_request_with_retry(method="GET", access_token=access_token, url=endpoint, session=session)
    response.raise_for_status()
    return response.json().get("id")

def get_list_id(site_id, list_name, access_token, session: requests.session = None):
    url = f"{GRAPH_API_BASE}/sites/{site_id}/lists"
    response = perform_request_with_retry(method="GET", url=url, access_token=access_token, session=session)
    response.raise_for_status()
    lists = response.json().get("value", [])
    for lst in lists:
        if lst.get("name") == list_name:
            return lst.get("id")
    raise Exception(f"List '{list_name}' not found.")

def perform_request_with_retry_Old(method, url, access_token, json_payload=None, max_retries=5):
    session = requests.Session()
    delay = 1
    for attempt in range(max_retries):
        try:
            if method == "GET":
                response = session.get(url, headers={"Authorization": f"Bearer {access_token}"})
            elif method == "POST":
                response = session.post(url, headers={"Authorization": f"Bearer {access_token}"}, json=json_payload)
            else:
                raise ValueError("Unsupported HTTP method")

            if response.status_code in [429, 503, 504]:
                retry_after = int(response.headers.get("Retry-After", delay))
                logging.warning(f"Throttled. Retrying in {retry_after}s (Attempt {attempt + 1})")
                time.sleep(retry_after)
                delay = min(delay * 2, 60)
                continue

            response.raise_for_status()
            return response
        except requests.exceptions.RequestException as e:
            logging.warning(f"Request error: {e} (Attempt {attempt + 1})")
            if attempt < max_retries - 1:
                time.sleep(delay)
                delay = min(delay * 2, 60)
            else:
                raise

def perform_request_with_retry(method: str, url: str, access_token: str, params: dict = None, json_payload: dict = None, max_retries: int = 5, session: requests.Session = None) -> requests.Response:
    """
    Performs an HTTP request with retries for throttling and transient errors.
    Uses a provided session or creates a new one.
    """
    local_session = session or requests.Session()
    delay = 1 # Initial delay in seconds
    headers = get_headers(access_token)

    for attempt in range(max_retries):
        try:
            response = None
            if method.upper() == "GET":
                response = local_session.get(url, headers=headers, params=params)
            elif method.upper() == "POST":
                response = local_session.post(url, headers=headers, json=json_payload, params=params)
            elif method.upper() == "PATCH":
                response = local_session.patch(url, headers=headers, json=json_payload, params=params)
            elif method.upper() == "DELETE":
                response = local_session.delete(url, headers=headers, params=params)
            else:
                raise ValueError(f"Unsupported HTTP method: {method}")

            if response.status_code in [429, 503, 504]: # Throttling or temporary server issues
                retry_after = int(response.headers.get("Retry-After", delay))
                logging.warning(
                    f"Request to {url} {method} Throttled/Unavailable (Status {response.status_code}). "
                    f"Retrying in {retry_after}s (Attempt {attempt + 1}/{max_retries})"
                )
                time.sleep(retry_after)
                delay = min(delay * 2, 60) # Exponential backoff, max 60s
                continue # Retry the loop

            response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) not handled above
            return response

        except requests.exceptions.RequestException as e: # Catches network errors, timeouts, etc.
            logging.warning(
                f"Request to {url} {method} failed: {e} (Attempt {attempt + 1}/{max_retries})"
            )
            if attempt < max_retries - 1:
                time.sleep(delay)
                delay = min(delay * 2, 60)
            else:
                logging.error(
                    f"Max retries reached for {url} {method}. Last error: {e}"
                )
                raise # Re-raise the last exception after max retries
        finally:
            if not session and local_session: # If session was created locally, close it
                local_session.close()
                
    raise Exception(f"Request {method} {url} failed after {max_retries} retries without a conclusive response or error.")

def batch_lookup_request(site_id, list_id, ids, access_token, session: requests.Session = None):
    batch_requests = []
    for i, lookup_id in enumerate(ids):
        if lookup_id not in lookup_cache:
            batch_requests.append({
                "id": str(i),
                "method": "GET",
                "url": f"/sites/{site_id}/lists/{list_id}/items/{lookup_id}?expand=fields"
            })

    if not batch_requests:
        return

    payload = {"requests": batch_requests}
    response = perform_request_with_retry("POST", f"{GRAPH_API_BASE}/$batch", json_payload=payload, access_token=access_token, session=session)
    for resp in response.json().get("responses", []):
        if resp.get("status") == 200:
            item = resp["body"]
            lookup_id = str(item["id"])
            lookup_cache[lookup_id] = item.get("fields", {}).get("CaseNo")

def get_large_list_items(site_id, list_id, access_token, top=4000, session: requests.Session = None):
    items = []
    url = f"{GRAPH_API_BASE}/sites/{site_id}/lists/{list_id}/items?$top={top}&$select=id,fields&$expand=fields"
    masterlist_id = get_list_id(site_id, "Patient Details", access_token=access_token, session=session)
    while url:
        resp = perform_request_with_retry("GET", url, access_token=access_token, session=session)
        data = resp.json()
        items.extend(data.get("value", []))

        # Extract lookup IDs for batch resolve
        lookup_ids = set()
        for item in data.get("value", []):
            fields = item.get("fields", {})
            if "CaseNoLookupId" in fields:
                lookup_ids.add(fields["CaseNoLookupId"])

        # Batch resolve lookups
        batch_lookup_request(site_id, masterlist_id, lookup_ids, access_token)

        url = data.get("@odata.nextLink")

    return items

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Fetching SharePoint list data using Graph API.')
    graph_session = requests.Session()
    try:
        credential = DefaultAzureCredential()
        client = SecretClient(vault_url=KEYVAULT_URL, credential=credential)

        tenant_id = client.get_secret('tenant-id').value
        client_id = client.get_secret('client-id').value
        client_secret = client.get_secret('client-secret').value
        scope = client.get_secret('scope').value
        hostname = client.get_secret('sp-hostname').value
        site_path = client.get_secret('sp-sitepath').value
        list_name = client.get_secret("sp-listname").value

        access_token = acquire_graph_api_token(tenant_id, client_id, client_secret, [scope])

        site_id = get_sharepoint_site_id(hostname, site_path, session=graph_session, access_token=access_token)
        list_id = get_list_id(site_id, list_name, access_token=access_token, session=graph_session)

        try:
            items = get_large_list_items(site_id, list_id, top=4000, session=graph_session, access_token=access_token)
            print(f"Lookup cache: {lookup_cache}")
            print(f"Items fetched: {len(items)}")
            for item in items:
                case_id = item.get("fields", {}).get("CaseNoLookupId")
                case_value = lookup_cache.get(case_id, "Unknown")
                item["fields"]["CaseNo"] = case_value

            return func.HttpResponse(json.dumps(items), mimetype="application/json")

        except Exception as e:
            logging.error(f"Error: {e}")
            return func.HttpResponse(f"Error: {str(e)}", status_code=500)

    except Exception as e:
        logging.error(str(e))
        return func.HttpResponse(
            f"Error: {str(e)}",
            status_code=500
        )
    finally:
        if graph_session:
            graph_session.close()

🏁 Conclusion

By implementing a centralized retry mechanism with exponential backoff, respecting Retry-After headers, and utilizing session management, we’ve significantly enhanced the resilience of our Python Azure Function when interacting with Microsoft Graph API. This approach ensures that your serverless functions are more robust, reliable, and better equipped to handle the dynamic nature of cloud services.

Leave a comment