Interacting with external APIs like Microsoft Graph is a common task for Azure Functions. Whether you’re fetching SharePoint list data, managing users, or sending emails, ensuring your function can handle transient network issues, API throttling, and other temporary errors is crucial for reliability. This article explores how to fortify your Python Azure Functions for robust communication with Microsoft Graph, focusing on implementing effective retry mechanisms and handling API throttling gracefully.
We’ll break down the enhancements made to a Python Azure Function designed to fetch and process SharePoint list data, turning it into a more resilient and production-ready solution.
Note: This is the continuation of a series of post. Please go through the below post to better understand the full context.
Part 1 – Build a Python Azure Function to Connect with SharePoint Online via Microsoft Graph API
Part 2 – Secure Python Azure Function Using Azure Key Vault and Managed Identity
Part 4 – Robust Authentication with Microsoft Graph API (Using MSAL and Service Principals)
The Challenge: The Unpredictable Nature of API Calls β οΈ
When your Azure Function calls an API like Microsoft Graph, several issues can arise:
- Network Glitches π: Temporary network interruptions can cause requests to fail.
- API Throttling β³: Services like Microsoft Graph enforce rate limits. If your function makes too many requests in a short period, it will be throttled (often receiving a
429 Too Many Requestserror). - Transient Server Errors π οΈ: The API service itself might experience temporary issues, returning
5xxserver errors (e.g.,503 Service Unavailable).
Without proper handling, these issues can lead to failed function executions, incomplete data processing, and a poor user experience.
The Solution: A Centralized, Intelligent Retry Strategy π‘οΈ
The core of our resilient solution lies in a dedicated function, perform_request_with_retry. This function wraps around standard requests calls and incorporates several best practices:
- Automatic Retries: Retries failed requests a configurable number of times.
- Exponential Backoff: Gradually increases the delay between retries (e.g., 1s, 2s, 4s…) to avoid overwhelming the API.
- Throttling Awareness: Specifically listens for
429,503, and504status codes. Retry-AfterHeader Support: If the API provides aRetry-Afterheader (common with429responses), the function respects this specific delay.- Session Management: Utilizes
requests.Session()for connection pooling, improving performance for multiple calls to the same host. - Clear Logging: Provides informative logs for each retry attempt and error.
Let’s look at this key function and how it’s integrated.
Code Deep Dive: Implementing Resiliency π»
The perform_request_with_retry Function
This is the workhorse for all Graph API calls.
def perform_request_with_retry(method: str, url: str, access_token: str, params: dict = None, json_payload: dict = None, max_retries: int = 5, session: requests.Session = None) -> requests.Response:
"""
Performs an HTTP request with retries for throttling and transient errors.
Uses a provided session or creates a new one.
"""
local_session = session or requests.Session()
delay = 1 # Initial delay in seconds
headers = get_headers(access_token)
for attempt in range(max_retries):
try:
response = None
if method.upper() == "GET":
response = local_session.get(url, headers=headers, params=params)
elif method.upper() == "POST":
response = local_session.post(url, headers=headers, json=json_payload, params=params)
elif method.upper() == "PATCH":
response = local_session.patch(url, headers=headers, json=json_payload, params=params)
elif method.upper() == "DELETE":
response = local_session.delete(url, headers=headers, params=params)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
if response.status_code in [429, 503, 504]: # Throttling or temporary server issues
retry_after = int(response.headers.get("Retry-After", delay))
logging.warning(
f"Request to {url} {method} Throttled/Unavailable (Status {response.status_code}). "
f"Retrying in {retry_after}s (Attempt {attempt + 1}/{max_retries})"
)
time.sleep(retry_after)
delay = min(delay * 2, 60) # Exponential backoff, max 60s
continue # Retry the loop
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) not handled above
return response
except requests.exceptions.RequestException as e: # Catches network errors, timeouts, etc.
logging.warning(
f"Request to {url} {method} failed: {e} (Attempt {attempt + 1}/{max_retries})"
)
if attempt < max_retries - 1:
time.sleep(delay)
delay = min(delay * 2, 60)
else:
logging.error(
f"Max retries reached for {url} {method}. Last error: {e}"
)
raise # Re-raise the last exception after max retries
finally:
if not session and local_session: # If session was created locally, close it
local_session.close()
raise Exception(f"Request {method} {url} failed after {max_retries} retries without a conclusive response or error.")
Integrating Resiliency into Helper Functions π
Previously, functions like get_sharepoint_site_id or get_list_id might have made direct requests.get() calls. Now, they all leverage perform_request_with_retry:
Example: get_sharepoint_site_id
def get_sharepoint_site_id(hostname, site_path, access_token, session: requests.session = None):
if site_path:
endpoint = f"{GRAPH_API_BASE}/sites/{hostname}:{'/' + site_path if not site_path.startswith('/') else site_path}"
else:
endpoint = f"{GRAPH_API_BASE}/sites/{hostname}"
response = perform_request_with_retry(method="GET", access_token=access_token, url=endpoint, session=session)
response.raise_for_status()
return response.json().get("id")
Efficient Session Management in main βοΈ
# In the main function:
# ...
graph_session = requests.Session()
try:
# ...
access_token = acquire_graph_api_token(tenant_id, client_id, client_secret, [scope])
site_id = get_sharepoint_site_id(hostname, site_path, session=graph_session, access_token=access_token)
list_id = get_list_id(site_id, list_name, access_token=access_token, session=graph_session)
# ...
finally:
if graph_session:
graph_session.close()
# ...
Benefits of This Approach β
- Increased Reliability: The function can automatically recover from many common transient issues.
- Improved Robustness: Less prone to failures due to API throttling.
- Better Performance:
requests.Sessionreuses connections, reducing latency. Batching further optimizes data retrieval. - Simplified Code: Centralizes retry logic, making individual API call functions cleaner.
- Enhanced Logging: Provides better insights into API interactions and any issues encountered.
Full Updated Code π
import json
import logging
import os
import requests
from msal import ConfidentialClientApplication
import azure.functions as func
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
import time
GRAPH_API_BASE = "https://graph.microsoft.com/v1.0"
# Set Key Vault URL
KEYVAULT_URL = "https://kvpythonwo.vault.azure.net/"
# Cache for lookup values
lookup_cache = {}
# Global variable to hold the MSAL app instance
_msal_app_instance = None
# --- Authentication Functions ---
def _initialize_confidential_client_app(tenant_id: str, client_id: str, client_secret: str) -> ConfidentialClientApplication:
"""
Initializes and returns a ConfidentialClientApplication instance.
This instance is cached globally to leverage MSAL's in-memory token caching.
"""
global _msal_app_instance
if _msal_app_instance is None:
logging.info("Initializing MSAL ConfidentialClientApplication.")
authority = f"https://login.microsoftonline.com/{tenant_id}"
_msal_app_instance = ConfidentialClientApplication(
client_id,
authority=authority,
client_credential=client_secret
)
# For more advanced scenarios, you could configure token cache serialization here
# if you need token persistence beyond the lifetime of this specific function instance
# (e.g., using a distributed cache like Redis).
# For Azure Functions, the in-memory cache is per warm instance.
return _msal_app_instance
def acquire_graph_api_token(tenant_id: str, client_id: str, client_secret: str, scopes: list) -> str:
"""
Acquires an access token for Microsoft Graph API using the client credentials flow.
It leverages a globally cached ConfidentialClientApplication instance for efficiency.
Args:
tenant_id: Your Azure AD tenant ID.
client_id: The Application (client) ID of your Azure AD app registration.
client_secret: The client secret for your Azure AD app registration.
scopes: A list of scopes, e.g., ["https://graph.microsoft.com/.default"].
Returns:
The access token string.
Raises:
Exception: If token acquisition fails.
"""
if not scopes:
raise ValueError("Scopes list cannot be empty. Typically, use ['https://graph.microsoft.com/.default'].")
app = _initialize_confidential_client_app(tenant_id, client_id, client_secret)
# The ConfidentialClientApplication object will automatically cache the token.
# Subsequent calls to acquire_token_for_client (with the same scopes) on the same
# app instance will return the cached token if it's still valid, or acquire a new one if needed.
result = app.acquire_token_for_client(scopes=scopes)
if "access_token" in result:
# Log successful acquisition; MSAL handles the actual caching.
# You can inspect result['expires_in'] or result.get('token_type') if needed.
logging.info("Successfully acquired Graph API token.")
return result['access_token']
else:
error = result.get("error")
error_description = result.get("error_description", "No error description provided.")
correlation_id = result.get("correlation_id") # Crucial for troubleshooting with Microsoft support
log_message = (
f"Failed to acquire Graph API token. "
f"Error: {error}, Description: {error_description}, "
f"Correlation ID: {correlation_id}. "
f"Check an AAD sign-in log for more details."
)
logging.error(log_message)
# You might want to include more details from the 'result' object if available and helpful
# For example, some errors might include 'claims' or specific 'error_codes'.
raise Exception(f"Graph API token acquisition failed: {error_description} (Correlation ID: {correlation_id})")
# --- End of Authentication Functions ---
def get_headers(access_token):
return {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
def get_sharepoint_site_id(hostname, site_path, access_token, session: requests.session = None):
if site_path:
endpoint = f"{GRAPH_API_BASE}/sites/{hostname}:{'/' + site_path if not site_path.startswith('/') else site_path}"
else:
endpoint = f"{GRAPH_API_BASE}/sites/{hostname}"
response = perform_request_with_retry(method="GET", access_token=access_token, url=endpoint, session=session)
response.raise_for_status()
return response.json().get("id")
def get_list_id(site_id, list_name, access_token, session: requests.session = None):
url = f"{GRAPH_API_BASE}/sites/{site_id}/lists"
response = perform_request_with_retry(method="GET", url=url, access_token=access_token, session=session)
response.raise_for_status()
lists = response.json().get("value", [])
for lst in lists:
if lst.get("name") == list_name:
return lst.get("id")
raise Exception(f"List '{list_name}' not found.")
def perform_request_with_retry_Old(method, url, access_token, json_payload=None, max_retries=5):
session = requests.Session()
delay = 1
for attempt in range(max_retries):
try:
if method == "GET":
response = session.get(url, headers={"Authorization": f"Bearer {access_token}"})
elif method == "POST":
response = session.post(url, headers={"Authorization": f"Bearer {access_token}"}, json=json_payload)
else:
raise ValueError("Unsupported HTTP method")
if response.status_code in [429, 503, 504]:
retry_after = int(response.headers.get("Retry-After", delay))
logging.warning(f"Throttled. Retrying in {retry_after}s (Attempt {attempt + 1})")
time.sleep(retry_after)
delay = min(delay * 2, 60)
continue
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
logging.warning(f"Request error: {e} (Attempt {attempt + 1})")
if attempt < max_retries - 1:
time.sleep(delay)
delay = min(delay * 2, 60)
else:
raise
def perform_request_with_retry(method: str, url: str, access_token: str, params: dict = None, json_payload: dict = None, max_retries: int = 5, session: requests.Session = None) -> requests.Response:
"""
Performs an HTTP request with retries for throttling and transient errors.
Uses a provided session or creates a new one.
"""
local_session = session or requests.Session()
delay = 1 # Initial delay in seconds
headers = get_headers(access_token)
for attempt in range(max_retries):
try:
response = None
if method.upper() == "GET":
response = local_session.get(url, headers=headers, params=params)
elif method.upper() == "POST":
response = local_session.post(url, headers=headers, json=json_payload, params=params)
elif method.upper() == "PATCH":
response = local_session.patch(url, headers=headers, json=json_payload, params=params)
elif method.upper() == "DELETE":
response = local_session.delete(url, headers=headers, params=params)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
if response.status_code in [429, 503, 504]: # Throttling or temporary server issues
retry_after = int(response.headers.get("Retry-After", delay))
logging.warning(
f"Request to {url} {method} Throttled/Unavailable (Status {response.status_code}). "
f"Retrying in {retry_after}s (Attempt {attempt + 1}/{max_retries})"
)
time.sleep(retry_after)
delay = min(delay * 2, 60) # Exponential backoff, max 60s
continue # Retry the loop
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) not handled above
return response
except requests.exceptions.RequestException as e: # Catches network errors, timeouts, etc.
logging.warning(
f"Request to {url} {method} failed: {e} (Attempt {attempt + 1}/{max_retries})"
)
if attempt < max_retries - 1:
time.sleep(delay)
delay = min(delay * 2, 60)
else:
logging.error(
f"Max retries reached for {url} {method}. Last error: {e}"
)
raise # Re-raise the last exception after max retries
finally:
if not session and local_session: # If session was created locally, close it
local_session.close()
raise Exception(f"Request {method} {url} failed after {max_retries} retries without a conclusive response or error.")
def batch_lookup_request(site_id, list_id, ids, access_token, session: requests.Session = None):
batch_requests = []
for i, lookup_id in enumerate(ids):
if lookup_id not in lookup_cache:
batch_requests.append({
"id": str(i),
"method": "GET",
"url": f"/sites/{site_id}/lists/{list_id}/items/{lookup_id}?expand=fields"
})
if not batch_requests:
return
payload = {"requests": batch_requests}
response = perform_request_with_retry("POST", f"{GRAPH_API_BASE}/$batch", json_payload=payload, access_token=access_token, session=session)
for resp in response.json().get("responses", []):
if resp.get("status") == 200:
item = resp["body"]
lookup_id = str(item["id"])
lookup_cache[lookup_id] = item.get("fields", {}).get("CaseNo")
def get_large_list_items(site_id, list_id, access_token, top=4000, session: requests.Session = None):
items = []
url = f"{GRAPH_API_BASE}/sites/{site_id}/lists/{list_id}/items?$top={top}&$select=id,fields&$expand=fields"
masterlist_id = get_list_id(site_id, "Patient Details", access_token=access_token, session=session)
while url:
resp = perform_request_with_retry("GET", url, access_token=access_token, session=session)
data = resp.json()
items.extend(data.get("value", []))
# Extract lookup IDs for batch resolve
lookup_ids = set()
for item in data.get("value", []):
fields = item.get("fields", {})
if "CaseNoLookupId" in fields:
lookup_ids.add(fields["CaseNoLookupId"])
# Batch resolve lookups
batch_lookup_request(site_id, masterlist_id, lookup_ids, access_token)
url = data.get("@odata.nextLink")
return items
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Fetching SharePoint list data using Graph API.')
graph_session = requests.Session()
try:
credential = DefaultAzureCredential()
client = SecretClient(vault_url=KEYVAULT_URL, credential=credential)
tenant_id = client.get_secret('tenant-id').value
client_id = client.get_secret('client-id').value
client_secret = client.get_secret('client-secret').value
scope = client.get_secret('scope').value
hostname = client.get_secret('sp-hostname').value
site_path = client.get_secret('sp-sitepath').value
list_name = client.get_secret("sp-listname").value
access_token = acquire_graph_api_token(tenant_id, client_id, client_secret, [scope])
site_id = get_sharepoint_site_id(hostname, site_path, session=graph_session, access_token=access_token)
list_id = get_list_id(site_id, list_name, access_token=access_token, session=graph_session)
try:
items = get_large_list_items(site_id, list_id, top=4000, session=graph_session, access_token=access_token)
print(f"Lookup cache: {lookup_cache}")
print(f"Items fetched: {len(items)}")
for item in items:
case_id = item.get("fields", {}).get("CaseNoLookupId")
case_value = lookup_cache.get(case_id, "Unknown")
item["fields"]["CaseNo"] = case_value
return func.HttpResponse(json.dumps(items), mimetype="application/json")
except Exception as e:
logging.error(f"Error: {e}")
return func.HttpResponse(f"Error: {str(e)}", status_code=500)
except Exception as e:
logging.error(str(e))
return func.HttpResponse(
f"Error: {str(e)}",
status_code=500
)
finally:
if graph_session:
graph_session.close()
π Conclusion
By implementing a centralized retry mechanism with exponential backoff, respecting Retry-After headers, and utilizing session management, we’ve significantly enhanced the resilience of our Python Azure Function when interacting with Microsoft Graph API. This approach ensures that your serverless functions are more robust, reliable, and better equipped to handle the dynamic nature of cloud services.
Happy Codingβ¦