Robust Authentication with Microsoft Graph API (Using MSAL and Service Principals)

🚀 Introduction

The Microsoft Graph API is a powerful gateway to data and intelligence in Microsoft 365. Whether you’re automating tasks, building custom applications, or syncing data, secure and efficient interaction with Graph API is paramount. For applications that run in the background or as services (like Azure Functions fetching SharePoint data), Service Principals combined with the Microsoft Authentication Library (MSAL) offer a robust and recommended authentication pattern.

This post will guide you through the best practices for authenticating to Microsoft Graph API using MSAL and Service Principals in Python, focusing on the Client Credentials Flow. We’ll look at an Azure Function example and highlight how to optimize token acquisition for better performance and reliability.

Part 1 – Build a Python Azure Function to Connect with SharePoint Online via Microsoft Graph API

Part 2 – Secure Python Azure Function Using Azure Key Vault and Managed Identity

Part 3 – Improving Python Azure Function Performance: Handling Large SharePoint Lists with Throttling, Caching, and Batch Requests

🧩 Core Concepts

🛡️ Service Principals (App Principals)

A Service Principal is an identity for your application or service within Azure Active Directory (Azure AD). Think of it as a “user identity” but for an application. It allows your code to authenticate and be authorized to access Azure AD-protected resources like Microsoft Graph API, without a signed-in user. This is ideal for unattended scripts, daemons, or backend services.

🔑 MSAL (Microsoft Authentication Library)

MSAL simplifies the complexities of acquiring, caching, and renewing OAuth 2.0 tokens. For Python, the msal library provides a ConfidentialClientApplication class, which is specifically designed for applications that can securely store a client secret (like server-side applications or Azure Functions).

✨ Best Practices for Robust Authentication

Client Credentials Flow

This OAuth 2.0 flow is used for service-to-service authentication where the application authenticates itself directly using its client ID and client secret (or a certificate). MSAL’s acquire_token_for_client method implements this.

🔒 Secure Credential Management

Azure Key Vault (Recommended): Store your TENANT_ID, CLIENT_ID, and CLIENT_SECRET in Azure Key Vault. Your Azure Function can then use a Managed Identity to securely access these secrets without needing to store any credentials in your code or application settings.

Environment Variables: For Azure Functions, Application Settings (which are exposed as environment variables) are a secure way to store these secrets. Avoid hardcoding credentials directly in your script.

🔄 Efficient Token Management with MSAL

Reuse ConfidentialClientApplication Instance: MSAL’s ConfidentialClientApplication has built-in in-memory token caching. By initializing this object once and reusing it for subsequent token requests (within the lifetime of your function’s warm instance), you allow MSAL to serve tokens from its cache if they are still valid. This significantly reduces calls to Azure AD, improving performance and reducing the likelihood of being throttled.

Automatic Token Renewal: MSAL handles the logic for checking token expiry and acquiring a new token when needed before the old one expires.

🛠️ Granular Permissions (Least Privilege)

In your Azure AD App Registration, grant only the specific Microsoft Graph API permissions your application needs (e.g., Sites.Read.All for reading SharePoint sites, User.Read.All for reading user profiles).

Use the scope ["https://graph.microsoft.com/.default"] when acquiring a token. This special scope requests tokens for all the application permissions that have been granted to your Service Principal in Azure AD.

🚨 Comprehensive Error Handling

Implement robust error handling for token acquisition failures. Log detailed error information, including any correlation IDs provided by Azure AD, to aid in troubleshooting.

For Graph API calls themselves, implement retry logic for transient errors like throttling (HTTP 429, 503, 504), as shown in the perform_request_with_retry function.

⚙️ Implementing Robust Authentication in Python (Azure Function Example)

Let’s walk through how to apply these best practices in an Azure Function written in Python.

Step 1: Azure AD App Registration

The detailed steps with screenshot is explained in the below link

Step 2: Storing Configuration Securely

The detailed steps with command is explained in the below link

📄 Code Walkthrough and Enhancements

The below code sample is updated from the previous article for robust authentication.

The key improvement for robust authentication lies in how we manage the ConfidentialClientApplication instance and acquire tokens.

  • Global _msal_app_instance:
    _msal_app_instance = None
    

    This module-level global variable will hold our single instance of ConfidentialClientApplication. In an Azure Function environment, global variables persist across multiple invocations on the same (warm) function instance, enabling token caching.

    • _initialize_confidential_client_app() Function:
    def _initialize_confidential_client_app(tenant_id: str, client_id: str, client_secret: str) -> ConfidentialClientApplication:
        global _msal_app_instance
        if _msal_app_instance is None:
            logging.info("Initializing MSAL ConfidentialClientApplication.")
            authority = f"https://login.microsoftonline.com/{tenant_id}"
            _msal_app_instance = ConfidentialClientApplication(
                client_id,
                authority=authority,
                client_credential=client_secret
            )
        return _msal_app_instance
    

    This function acts as a singleton provider for the MSAL app. It creates the instance only if it doesn’t already exist.

    • acquire_graph_api_token() Function
    def acquire_graph_api_token(tenant_id: str, client_id: str, client_secret: str, scopes: list) -> str:
        app = _initialize_confidential_client_app(tenant_id, client_id, client_secret)
        result = app.acquire_token_for_client(scopes=scopes)
    
        if "access_token" in result:
            logging.info("Successfully acquired Graph API token.")
            return result['access_token']
        else:
            # ... (detailed error logging as shown in the full code) ...
            raise Exception(f"Graph API token acquisition failed: {error_description} (Correlation ID: {correlation_id})")
    
    1. It first ensures the ConfidentialClientApplication (app) is initialized.
    2. Then, app.acquire_token_for_client(scopes=scopes) is called. Crucially, because app is a reused instance, MSAL will first check its internal memory cache for a valid token matching the scopes. If found, it returns the cached token without a network call to Azure AD. If no valid token is cached, MSAL acquires a new one from Azure AD and caches it.
    3. Error logging now includes the error, error_description, and correlation_id from the Azure AD response, which is vital for diagnosing issues.
    • Integration into main()

    In the main Azure Function handler (main function):

    # ... load tenant_id, client_id, client_secret ...
    scopes_list = [os.getenv('SCOPE', 'https://graph.microsoft.com/.default').strip()]
    access_token = acquire_graph_api_token(tenant_id, client_id, client_secret, scopes_list)
    # ... use access_token for Graph API calls ...
    

    The old get_graph_token function (which created a new MSAL app instance on every call) is replaced by acquire_graph_api_token.

    📈 Benefits of This Approach

    Performance: Significantly reduces latency for token acquisition after the first call by serving tokens from MSAL’s in-memory cache.

    Efficiency: Minimizes the number of direct authentication requests to Azure AD, making your application a better network citizen and reducing the risk of being throttled by the identity provider.

    Reliability: Leverages MSAL’s built-in logic for token management, which is tested and maintained by Microsoft.

    Security: Follows Microsoft’s recommended practices for application authentication using service principals and the client credentials flow

    Final Code

    import json
    import logging
    import os
    import requests
    from msal import ConfidentialClientApplication
    import azure.functions as func
    from azure.identity import DefaultAzureCredential
    from azure.keyvault.secrets import SecretClient
    import time
    
    GRAPH_API_BASE = "https://graph.microsoft.com/v1.0"
    # Set Key Vault URL
    KEYVAULT_URL = "https://<keyvaultname>.vault.azure.net/"
    # Cache for lookup values
    lookup_cache = {}
    
    # Global variable to hold the MSAL app instance
    _msal_app_instance = None
    
    # --- Authentication Functions ---
    def _initialize_confidential_client_app(tenant_id: str, client_id: str, client_secret: str) -> ConfidentialClientApplication:
        """
        Initializes and returns a ConfidentialClientApplication instance.
        This instance is cached globally to leverage MSAL's in-memory token caching.
        """
        global _msal_app_instance
        if _msal_app_instance is None:
            logging.info("Initializing MSAL ConfidentialClientApplication.")
            authority = f"https://login.microsoftonline.com/{tenant_id}"
            _msal_app_instance = ConfidentialClientApplication(
                client_id,
                authority=authority,
                client_credential=client_secret
            )
            # For more advanced scenarios, you could configure token cache serialization here
            # if you need token persistence beyond the lifetime of this specific function instance
            # (e.g., using a distributed cache like Redis).
            # For Azure Functions, the in-memory cache is per warm instance.
        return _msal_app_instance
    
    def acquire_graph_api_token(tenant_id: str, client_id: str, client_secret: str, scopes: list) -> str:
        """
        Acquires an access token for Microsoft Graph API using the client credentials flow.
        It leverages a globally cached ConfidentialClientApplication instance for efficiency.
    
        Args:
            tenant_id: Your Azure AD tenant ID.
            client_id: The Application (client) ID of your Azure AD app registration.
            client_secret: The client secret for your Azure AD app registration.
            scopes: A list of scopes, e.g., ["https://graph.microsoft.com/.default"].
    
        Returns:
            The access token string.
    
        Raises:
            Exception: If token acquisition fails.
        """
        if not scopes:
            raise ValueError("Scopes list cannot be empty. Typically, use ['https://graph.microsoft.com/.default'].")
    
        app = _initialize_confidential_client_app(tenant_id, client_id, client_secret)
    
        # The ConfidentialClientApplication object will automatically cache the token.
        # Subsequent calls to acquire_token_for_client (with the same scopes) on the same
        # app instance will return the cached token if it's still valid, or acquire a new one if needed.
        result = app.acquire_token_for_client(scopes=scopes)
    
        if "access_token" in result:
            # Log successful acquisition; MSAL handles the actual caching.
            # You can inspect result['expires_in'] or result.get('token_type') if needed.
            logging.info("Successfully acquired Graph API token.")
            return result['access_token']
        else:
            error = result.get("error")
            error_description = result.get("error_description", "No error description provided.")
            correlation_id = result.get("correlation_id") # Crucial for troubleshooting with Microsoft support
            
            log_message = (
                f"Failed to acquire Graph API token. "
                f"Error: {error}, Description: {error_description}, "
                f"Correlation ID: {correlation_id}. "
                f"Check an AAD sign-in log for more details."
            )
            logging.error(log_message)
            # You might want to include more details from the 'result' object if available and helpful
            # For example, some errors might include 'claims' or specific 'error_codes'.
            raise Exception(f"Graph API token acquisition failed: {error_description} (Correlation ID: {correlation_id})")
    
    # --- End of Authentication Functions ---
        
    def get_headers(access_token):
        return {
            "Authorization": f"Bearer {access_token}",
            "Accept": "application/json"
        }
        
    def get_sharepoint_site_id(access_token, hostname, site_path):
        if site_path:
            endpoint = f"{GRAPH_API_BASE}/sites/{hostname}:{'/' + site_path if not site_path.startswith('/') else site_path}"
        else:
            endpoint = f"{GRAPH_API_BASE}/sites/{hostname}"
    
        headers = {"Authorization": f"Bearer {access_token}"}
        response = requests.get(endpoint, headers=headers)
        response.raise_for_status()
        return response.json().get("id")
    
    def get_list_id(site_id, access_token, list_name):
        url = f"{GRAPH_API_BASE}/sites/{site_id}/lists"
        headers = {"Authorization": f"Bearer {access_token}"}
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        lists = response.json().get("value", [])
        for lst in lists:
            if lst.get("name") == list_name:
                return lst.get("id")
        raise Exception(f"List '{list_name}' not found.")
    
    def get_list_items(site_id, list_id, access_token):
        url = f"{GRAPH_API_BASE}/sites/{site_id}/lists/{list_id}/items?$top=10&expand=fields"
        headers = {"Authorization": f"Bearer {access_token}"}
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        return response.json().get("value", [])
    
    def perform_request_with_retry(method, url, access_token, json_payload=None, max_retries=5):
        session = requests.Session()
        delay = 1
        for attempt in range(max_retries):
            try:
                if method == "GET":
                    response = session.get(url, headers={"Authorization": f"Bearer {access_token}"})
                elif method == "POST":
                    response = session.post(url, headers={"Authorization": f"Bearer {access_token}"}, json=json_payload)
                else:
                    raise ValueError("Unsupported HTTP method")
    
                if response.status_code in [429, 503, 504]:
                    retry_after = int(response.headers.get("Retry-After", delay))
                    logging.warning(f"Throttled. Retrying in {retry_after}s (Attempt {attempt + 1})")
                    time.sleep(retry_after)
                    delay = min(delay * 2, 60)
                    continue
    
                response.raise_for_status()
                return response
            except requests.exceptions.RequestException as e:
                logging.warning(f"Request error: {e} (Attempt {attempt + 1})")
                if attempt < max_retries - 1:
                    time.sleep(delay)
                    delay = min(delay * 2, 60)
                else:
                    raise
    
    def batch_lookup_request(site_id, list_id, ids, access_token):
        batch_requests = []
        for i, lookup_id in enumerate(ids):
            if lookup_id not in lookup_cache:
                batch_requests.append({
                    "id": str(i),
                    "method": "GET",
                    "url": f"/sites/{site_id}/lists/{list_id}/items/{lookup_id}?expand=fields"
                })
    
        if not batch_requests:
            return
    
        payload = {"requests": batch_requests}
        response = perform_request_with_retry("POST", f"{GRAPH_API_BASE}/$batch", access_token, json_payload=payload)
        # print(f"Batch response: {response.json()}")
        for resp in response.json().get("responses", []):
            if resp.get("status") == 200:
                item = resp["body"]
                lookup_id = str(item["id"])
                lookup_cache[lookup_id] = item.get("fields", {}).get("CaseNo")
    
    def get_large_list_items(site_id, list_id, access_token, top=4000):
        items = []
        url = f"{GRAPH_API_BASE}/sites/{site_id}/lists/{list_id}/items?$top={top}&$select=id,fields&$expand=fields"
        masterlist_id = get_list_id(site_id, access_token, "Patient Details")
        while url:
            resp = perform_request_with_retry("GET", url, access_token=access_token)
            data = resp.json()
            items.extend(data.get("value", []))
    
            # Extract lookup IDs for batch resolve
            lookup_ids = set()
            for item in data.get("value", []):
                fields = item.get("fields", {})
                if "CaseNoLookupId" in fields:
                    lookup_ids.add(fields["CaseNoLookupId"])
    
            # Batch resolve lookups
            batch_lookup_request(site_id, masterlist_id, lookup_ids, access_token)
    
            url = data.get("@odata.nextLink")
    
        return items
    
    def fetch_large_list(site_id, list_id, access_token, top=100):
        url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/lists/{list_id}/items"
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Accept": "application/json"
        }
    
        params = {
            "$top": top,
            "$select": "id",  # fetch minimal data
            "$expand": "fields"
            # Optionally use $filter here if needed
        }
    
        session = requests.Session()
        results = []
    
        while url:
            print(f"Fetching: {url}")
            resp = session.get(url, headers=headers, params=params)
            resp.raise_for_status()
            data = resp.json()
            print(f"Fetched {len(data.get('value', []))} items.")
            results.extend(data.get("value", []))
    
            # Handle pagination
            url = data.get("@odata.nextLink", None)
            params = None  # Required to avoid duplicating query params
    
        return results
    
    def main(req: func.HttpRequest) -> func.HttpResponse:
        logging.info('Fetching SharePoint list data using Graph API.')
        try:
            credential = DefaultAzureCredential()
            client = SecretClient(vault_url=KEYVAULT_URL, credential=credential)
    
            tenant_id = client.get_secret('tenant-id').value
            client_id = client.get_secret('client-id').value
            client_secret = client.get_secret('client-secret').value
            scope = client.get_secret('scope').value
            hostname = client.get_secret('sp-hostname').value
            site_path = client.get_secret('sp-sitepath').value
            list_name = client.get_secret("sp-listname").value
    
            access_token = acquire_graph_api_token(tenant_id, client_id, client_secret, [scope])
    
            site_id = get_sharepoint_site_id(access_token, hostname, site_path)
            list_id = get_list_id(site_id, access_token, list_name)
    
            try:
                items = get_large_list_items(site_id, list_id, access_token, top=4000)
                print(f"Lookup cache: {lookup_cache}")
                print(f"Items fetched: {len(items)}")
                for item in items:
                    case_id = item.get("fields", {}).get("CaseNoLookupId")
                    case_value = lookup_cache.get(case_id, "Unknown")
                    item["fields"]["CaseNo"] = case_value
    
                return func.HttpResponse(json.dumps(items), mimetype="application/json")
    
            except Exception as e:
                logging.error(f"Error: {e}")
                return func.HttpResponse(f"Error: {str(e)}", status_code=500)
    
        except Exception as e:
            logging.error(str(e))
            return func.HttpResponse(
                f"Error: {str(e)}",
                status_code=500
            )
    
    

    🏁 Conclusion

    Authenticating to Microsoft Graph API securely and efficiently is crucial for modern applications. By using Service Principals with the Client Credentials Flow and leveraging MSAL’s capabilities, particularly its token caching by reusing the ConfidentialClientApplication instance, you can build robust, performant, and reliable Python applications like the Azure Function demonstrated. Always prioritize secure credential management (Azure Key Vault with Managed Identities is highly recommended) and the principle of least privilege when granting API permissions.

    By implementing these best practices, you ensure that your automated processes interact with Microsoft Graph API in a scalable and resilient manner.

    One thought on “Robust Authentication with Microsoft Graph API (Using MSAL and Service Principals)

    1. Pingback: 🚀 Building Resilient Azure Functions: Mastering Microsoft Graph API Calls with Python | Knowledge Share

    Leave a comment