Improving Python Azure Function Performance: Handling Large SharePoint Lists with Throttling, Caching, and Batch Requests

This article enhances the previous article Secure Python Azure Function Using Azure Key Vault and Managed Identity where we used Azure Key Vault to secure the Azure function.

When dealing with large SharePoint lists, API throttling, and efficient data resolution are critical challenges. Azure Functions, combined with the power of Microsoft Graph API, provide a scalable solution for fetching, resolving, and processing data from SharePoint. In this article, we’ll explore key best practices to enhance performance, including throttling handling, batch requests, lookup resolution, and in-memory caching.

Part 1 – Build a Python Azure Function to Connect with SharePoint Online via Microsoft Graph API

Part 2 – Secure Python Azure Function Using Azure Key Vault and Managed Identity

🚀 Key Improvements Overview

Below are the essential performance improvements that we will implement in the Azure Function to handle large SharePoint lists efficiently:

Batch Graph API Requests: Using Microsoft Graph’s batching to reduce the number of API calls.

Throttling Handling: Introducing retry logic to ensure our function doesn’t fail due to frequent requests.

Cache Resolved Lookup Values: Minimizing API calls by caching the resolved values in memory.

Pagination Handling with @odata.nextLink: Efficiently handling large lists by paginating results and preventing excessive memory usage.

Minimizing Payload: Using $select to fetch only the necessary fields and optimizing the data we retrieve from SharePoint.

🔄 Batch Graph API Requests

Batching API requests is a powerful feature of Microsoft Graph API that helps to minimize the number of API requests made to the server. Rather than making multiple individual requests, batching allows us to group multiple operations into a single request. This significantly reduces network overhead.

Here’s how we implement batch processing to resolve multiple lookup values. In our scenario, we will use batching to retrieve the lookup values since Graph API will give only the ID of the lookup field and not the values, so we have to make a separate call to the master list to get the lookup value.

def batch_lookup_request(site_id, list_id, ids, access_token):
    batch_requests = []
    for i, lookup_id in enumerate(ids):
        if lookup_id not in lookup_cache:
            batch_requests.append({
                "id": str(i),
                "method": "GET",
                "url": f"/sites/{site_id}/lists/{list_id}/items/{lookup_id}?expand=fields"
            })

    if not batch_requests:
        return

    payload = {"requests": batch_requests}
    response = perform_request_with_retry("POST", f"{GRAPH_API_BASE}/$batch", access_token, json_payload=payload)
    # print(f"Batch response: {response.json()}")
    for resp in response.json().get("responses", []):
        if resp.get("status") == 200:
            item = resp["body"]
            lookup_id = str(item["id"])
            lookup_cache[lookup_id] = item.get("fields", {}).get("Title")

We pass Site id, master list id, item ids and access token to fetch the title of the master list items and store it in a collection.

⏱️ Throttling Handling with Retry Logic

API throttling can occur when too many requests are made in a short time. To mitigate this, we implement retry logic to automatically retry failed requests with a backoff strategy. This ensures that your function does not stop processing due to throttling errors.

def perform_request_with_retry(method, url, access_token, json_payload=None, max_retries=5):
    session = requests.Session()
    delay = 1
    for attempt in range(max_retries):
        try:
            if method == "GET":
                response = session.get(url, headers={"Authorization": f"Bearer {access_token}"})
            elif method == "POST":
                response = session.post(url, headers={"Authorization": f"Bearer {access_token}"}, json=json_payload)
            else:
                raise ValueError("Unsupported HTTP method")

            if response.status_code in [429, 503, 504]:
                retry_after = int(response.headers.get("Retry-After", delay))
                logging.warning(f"Throttled. Retrying in {retry_after}s (Attempt {attempt + 1})")
                time.sleep(retry_after)
                delay = min(delay * 2, 60)
                continue

            response.raise_for_status()
            return response
        except requests.exceptions.RequestException as e:
            logging.warning(f"Request error: {e} (Attempt {attempt + 1})")
            if attempt < max_retries - 1:
                time.sleep(delay)
                delay = min(delay * 2, 60)
            else:
                raise

🧠 Cache Resolved Lookup Values

Caching lookup values in memory (or a persistent store like Azure Blob Storage or Redis) significantly reduces redundant API calls, improving performance. Here, we use a simple in-memory cache (lookup_cache) to store resolved lookup values.

lookup_cache = {}
for resp in response.json().get("responses", []):
        if resp.get("status") == 200:
            item = resp["body"]
            lookup_id = str(item["id"])
            lookup_cache[lookup_id] = item.get("fields", {}).get("CaseNo")

🔁 Pagination Handling with @odata.nextLink

When working with large SharePoint lists, it’s crucial to paginate the results to prevent memory overload. Using the @odata.nextLink property, we can continue fetching results in batches until the entire dataset is retrieved.

def get_large_list_items(site_id, list_id, access_token, top=4000):
    items = []
    url = f"{GRAPH_API_BASE}/sites/{site_id}/lists/{list_id}/items?$top={top}&$select=id,fields&$expand=fields"
    masterlist_id = get_list_id(site_id, access_token, "Patient Details")
    while url:
        resp = perform_request_with_retry("GET", url, access_token=access_token)
        data = resp.json()
        items.extend(data.get("value", []))

        # Extract lookup IDs for batch resolve
        lookup_ids = set()
        for item in data.get("value", []):
            fields = item.get("fields", {})
            if "CaseNoLookupId" in fields:
                lookup_ids.add(fields["CaseNoLookupId"])

        # Batch resolve lookups
        batch_lookup_request(site_id, masterlist_id, lookup_ids, access_token)

        url = data.get("@odata.nextLink")

    return items

✅ Conclusion

By combining these techniques — batch requests, throttling handling, in-memory caching, pagination, and selective field fetching — we can significantly improve the performance and reliability of Azure Functions when handling large SharePoint lists. This approach ensures that we are not only avoiding throttling but also managing data efficiently, making the function more cost-effective and scalable.

With these improvements, your Azure Function can efficiently interact with millions of SharePoint list items while respecting API limits and avoiding bottlenecks.

2 thoughts on “Improving Python Azure Function Performance: Handling Large SharePoint Lists with Throttling, Caching, and Batch Requests

  1. Pingback: Robust Authentication with Microsoft Graph API (Using MSAL and Service Principals) | Knowledge Share

  2. Pingback: 🚀 Building Resilient Azure Functions: Mastering Microsoft Graph API Calls with Python | Knowledge Share

Leave a comment