import httpx import base64 import json import logging from azure.identity import ClientSecretCredential from core.config import settings logging.basicConfig(level=logging.INFO) logger = logging.getLogger("GraphClient") class GraphClient: """ Microsoft Graph API Client using app-only authentication. """ def __init__(self): self.tenant_id = settings.tenant_id self.client_id = settings.client_id self.client_secret = settings.client_secret self.credential = ClientSecretCredential( tenant_id=self.tenant_id, client_id=self.client_id, client_secret=self.client_secret ) self.scopes = ["https://graph.microsoft.com/.default"] self.base_url = "https://graph.microsoft.com/v1.0" self._token = None def decode_jwt_payload(self, token: str) -> dict: parts = token.split('.') if len(parts) != 3: raise ValueError("Invalid JWT token format") payload_b64 = parts[1] payload_b64 += "=" * ((4 - len(payload_b64) % 4) % 4) return json.loads(base64.urlsafe_b64decode(payload_b64)) def validate_required_roles(self, roles: list): if "Sites.Read.All" not in roles: raise PermissionError("FATAL: Token is missing 'Sites.Read.All' role. Stop immediately.") if "Files.Read.All" not in roles: logger.warning("WARNING: Token is missing 'Files.Read.All' role. Drive/delta steps will fail.") raise PermissionError("FATAL: Token is missing 'Files.Read.All' role. Stop immediately.") def get_access_token(self) -> str: if not self._token: token_response = self.credential.get_token(*self.scopes) self._token = token_response.token payload = self.decode_jwt_payload(self._token) aud = payload.get("aud") appid = payload.get("appid") idtyp = payload.get("idtyp") roles = payload.get("roles", []) logger.info(f"Token decoded -> aud: {aud}, appid: {appid}, idtyp: {idtyp}, roles: {roles}") self.validate_required_roles(roles) return self._token def _get_headers(self) -> dict: token = self.get_access_token() return { "Authorization": f"Bearer {token}", "Accept": "application/json" } def _make_get_request(self, url: str) -> dict: logger.info(f"GET Request to: {url}") headers = self._get_headers() try: response = httpx.get(url, headers=headers) logger.info(f"Response Status: {response.status_code}") response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: logger.error(f"HTTP Error: {e.response.status_code}") logger.error(f"Response Body: {e.response.text}") raise e except Exception as e: logger.error(f"Error making request: {str(e)}") raise e def _download_file(self, url: str) -> bytes: logger.info(f"GET Request (Download) to: {url}") headers = self._get_headers() try: # Follow redirects is True by default in httpx.Client, but httpx.get() might need follow_redirects=True with httpx.Client(follow_redirects=True, timeout=60.0) as client: response = client.get(url, headers=headers) logger.info(f"Response Status: {response.status_code}") response.raise_for_status() return response.content except httpx.HTTPStatusError as e: logger.error(f"HTTP Error: {e.response.status_code}") logger.error(f"Response Body: {e.response.text}") raise e except Exception as e: logger.error(f"Error making download request: {str(e)}") raise e def get_site_by_hostname(self, hostname: str): """GET /sites/{hostname}""" url = f"{self.base_url}/sites/{hostname}" return self._make_get_request(url) def get_site_by_path(self, hostname: str, server_relative_path: str): """GET /sites/{hostname}:/{server-relative-path}""" url = f"{self.base_url}/sites/{hostname}:{server_relative_path}" return self._make_get_request(url) def get_drive(self, site_id: str): """GET /sites/{siteId}/drive""" url = f"{self.base_url}/sites/{site_id}/drive" return self._make_get_request(url) def get_drive_root_children(self, site_id: str): """GET /sites/{siteId}/drive/root/children""" url = f"{self.base_url}/sites/{site_id}/drive/root/children" return self._make_get_request(url) def get_drive_root_delta(self, site_id: str): """GET /sites/{siteId}/drive/root/delta""" url = f"{self.base_url}/sites/{site_id}/drive/root/delta" return self._make_get_request(url) def delta_query(self, drive_id: str, delta_link: str = None): """Perform a delta query on a drive.""" if delta_link: url = delta_link else: url = f"{self.base_url}/drives/{drive_id}/root/delta" return self._make_get_request(url)