import logging from typing import Dict, List, Tuple from .base_provider import BaseStorageProvider from ingestion.graph_client import GraphClient from core.config import settings logger = logging.getLogger(__name__) class SharePointProvider(BaseStorageProvider): """ Storage Provider implementation for Microsoft SharePoint using Graph API. """ def __init__(self, hostname: str = "285pdg.sharepoint.com", site_path: str = "/sites/poc_system"): self.graph = GraphClient() logger.info(f"Resolving site: {hostname}:{site_path}") site_info = self.graph.get_site_by_path(hostname, site_path) site_id = site_info["id"] logger.info(f"Resolving drive for site: {site_id}") drive_info = self.graph.get_drive(site_id) self.drive_id = drive_info["id"] logger.info(f"Initialized SharePoint Provider for Drive ID: {self.drive_id}") def fetch_changes(self, sync_state: Dict) -> Tuple[List[Dict], Dict]: """ Fetch all delta changes from SharePoint, handling pagination internally. """ delta_link = sync_state.get("sharepoint_delta_link") items_collected = [] current_url = delta_link # Loop over pagination while True: # We need to construct the URL manually or let graph_client do it if not current_url: current_url = f"{self.graph.base_url}/drives/{self.drive_id}/root/delta" response = self.graph._make_get_request(current_url) values = response.get("value", []) items_collected.extend(values) if "@odata.nextLink" in response: current_url = response["@odata.nextLink"] logger.info("Fetching next page of SharePoint delta results...") elif "@odata.deltaLink" in response: new_delta_link = response["@odata.deltaLink"] logger.info("Reached end of SharePoint delta changes.") break else: logger.warning("No nextLink or deltaLink found in response! Breaking loop.") new_delta_link = current_url break # Standardize output for the ingestion pipeline standardized_items = [] for item in items_collected: # Bỏ qua root drive if "folder" in item and "root" in item.get("folder", {}): continue is_deleted = "deleted" in item std_item = { "id": item.get("id"), "name": item.get("name"), "is_deleted": is_deleted, "is_folder": "folder" in item, "provider": "sharepoint", "last_modified": item.get("lastModifiedDateTime"), "size": item.get("size"), "raw_data": item } standardized_items.append(std_item) new_state = sync_state.copy() new_state["sharepoint_delta_link"] = new_delta_link return standardized_items, new_state def get_item_details(self, item_id: str) -> Dict: """ Get full item details including webUrl and downloadUrl. """ try: item = self.graph.get_item_details(self.drive_id, item_id) return { "id": item.get("id"), "name": item.get("name"), "web_url": item.get("webUrl"), "download_url": item.get("@microsoft.graph.downloadUrl"), "size": item.get("size"), "last_modified": item.get("lastModifiedDateTime"), } except Exception as e: logger.error(f"Failed to get item details for {item_id}: {e}") raise e def get_item_permissions(self, item_id: str) -> List[str]: """ Get permissions for an item. Returns list of user/group emails or IDs. """ try: response = self.graph.get_item_permissions(self.drive_id, item_id) permissions = set() for perm in response.get("value", []): # Lấy grantedTo hoặc grantedToIdentities granted = perm.get("grantedTo", {}) if not granted: identities = perm.get("grantedToIdentitiesV2", []) for identity in identities: user = identity.get("user", {}) if user.get("email"): permissions.add(user["email"].lower()) elif user.get("id"): permissions.add(user["id"]) user = granted.get("user", {}) if user.get("email"): permissions.add(user["email"].lower()) elif user.get("id"): permissions.add(user["id"]) # Nếu có grantedToV2 (site group) granted_v2 = perm.get("grantedToV2", {}) site_group = granted_v2.get("siteGroup", {}) if site_group.get("displayName"): permissions.add(f"group:{site_group['displayName']}") return list(permissions) if permissions else ["*"] except Exception as e: logger.warning(f"Failed to get permissions for {item_id}: {e}. Defaulting to ['*']") return ["*"] def download_file(self, target_item: Dict) -> bytes: """ Download file content from SharePoint. """ try: raw_data = target_item.get("raw_data", {}) item_id = target_item.get("id") # Gọi thẳng endpoint /content qua Graph API để tránh lỗi 401 url = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{item_id}/content" file_bytes = self.graph._download_file(url) return file_bytes except Exception as e: logger.error(f"SharePoint download_file failed for {target_item.get('name')}: {e}") raise e