156 lines
6.1 KiB
Python
156 lines
6.1 KiB
Python
import logging
|
|
from typing import Dict, List, Tuple
|
|
from .base_provider import BaseStorageProvider
|
|
from ingestion.graph_client import GraphClient
|
|
|
|
from core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class SharePointProvider(BaseStorageProvider):
|
|
"""
|
|
Storage Provider implementation for Microsoft SharePoint using Graph API.
|
|
"""
|
|
def __init__(self, hostname: str = "285pdg.sharepoint.com", site_path: str = "/sites/poc_system"):
|
|
self.graph = GraphClient()
|
|
|
|
logger.info(f"Resolving site: {hostname}:{site_path}")
|
|
site_info = self.graph.get_site_by_path(hostname, site_path)
|
|
site_id = site_info["id"]
|
|
|
|
logger.info(f"Resolving drive for site: {site_id}")
|
|
drive_info = self.graph.get_drive(site_id)
|
|
self.drive_id = drive_info["id"]
|
|
|
|
logger.info(f"Initialized SharePoint Provider for Drive ID: {self.drive_id}")
|
|
|
|
def fetch_changes(self, sync_state: Dict) -> Tuple[List[Dict], Dict]:
|
|
"""
|
|
Fetch all delta changes from SharePoint, handling pagination internally.
|
|
"""
|
|
delta_link = sync_state.get("sharepoint_delta_link")
|
|
|
|
items_collected = []
|
|
current_url = delta_link
|
|
|
|
# Loop over pagination
|
|
while True:
|
|
# We need to construct the URL manually or let graph_client do it
|
|
if not current_url:
|
|
current_url = f"{self.graph.base_url}/drives/{self.drive_id}/root/delta"
|
|
|
|
response = self.graph._make_get_request(current_url)
|
|
values = response.get("value", [])
|
|
items_collected.extend(values)
|
|
|
|
if "@odata.nextLink" in response:
|
|
current_url = response["@odata.nextLink"]
|
|
logger.info("Fetching next page of SharePoint delta results...")
|
|
elif "@odata.deltaLink" in response:
|
|
new_delta_link = response["@odata.deltaLink"]
|
|
logger.info("Reached end of SharePoint delta changes.")
|
|
break
|
|
else:
|
|
logger.warning("No nextLink or deltaLink found in response! Breaking loop.")
|
|
new_delta_link = current_url
|
|
break
|
|
|
|
# Standardize output for the ingestion pipeline
|
|
standardized_items = []
|
|
for item in items_collected:
|
|
# Bỏ qua root drive
|
|
if "folder" in item and "root" in item.get("folder", {}):
|
|
continue
|
|
|
|
is_deleted = "deleted" in item
|
|
|
|
std_item = {
|
|
"id": item.get("id"),
|
|
"name": item.get("name"),
|
|
"is_deleted": is_deleted,
|
|
"is_folder": "folder" in item,
|
|
"provider": "sharepoint",
|
|
"last_modified": item.get("lastModifiedDateTime"),
|
|
"size": item.get("size"),
|
|
"raw_data": item
|
|
}
|
|
standardized_items.append(std_item)
|
|
|
|
new_state = sync_state.copy()
|
|
new_state["sharepoint_delta_link"] = new_delta_link
|
|
|
|
return standardized_items, new_state
|
|
|
|
def get_item_details(self, item_id: str) -> Dict:
|
|
"""
|
|
Get full item details including webUrl and downloadUrl.
|
|
"""
|
|
try:
|
|
item = self.graph.get_item_details(self.drive_id, item_id)
|
|
return {
|
|
"id": item.get("id"),
|
|
"name": item.get("name"),
|
|
"web_url": item.get("webUrl"),
|
|
"download_url": item.get("@microsoft.graph.downloadUrl"),
|
|
"size": item.get("size"),
|
|
"last_modified": item.get("lastModifiedDateTime"),
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to get item details for {item_id}: {e}")
|
|
raise e
|
|
|
|
def get_item_permissions(self, item_id: str) -> List[str]:
|
|
"""
|
|
Get permissions for an item. Returns list of user/group emails or IDs.
|
|
"""
|
|
try:
|
|
response = self.graph.get_item_permissions(self.drive_id, item_id)
|
|
permissions = set()
|
|
|
|
for perm in response.get("value", []):
|
|
# Lấy grantedTo hoặc grantedToIdentities
|
|
granted = perm.get("grantedTo", {})
|
|
if not granted:
|
|
identities = perm.get("grantedToIdentitiesV2", [])
|
|
for identity in identities:
|
|
user = identity.get("user", {})
|
|
if user.get("email"):
|
|
permissions.add(user["email"].lower())
|
|
elif user.get("id"):
|
|
permissions.add(user["id"])
|
|
|
|
user = granted.get("user", {})
|
|
if user.get("email"):
|
|
permissions.add(user["email"].lower())
|
|
elif user.get("id"):
|
|
permissions.add(user["id"])
|
|
|
|
# Nếu có grantedToV2 (site group)
|
|
granted_v2 = perm.get("grantedToV2", {})
|
|
site_group = granted_v2.get("siteGroup", {})
|
|
if site_group.get("displayName"):
|
|
permissions.add(f"group:{site_group['displayName']}")
|
|
|
|
return list(permissions) if permissions else ["*"]
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get permissions for {item_id}: {e}. Defaulting to ['*']")
|
|
return ["*"]
|
|
|
|
def download_file(self, target_item: Dict) -> bytes:
|
|
"""
|
|
Download file content from SharePoint.
|
|
"""
|
|
try:
|
|
raw_data = target_item.get("raw_data", {})
|
|
item_id = target_item.get("id")
|
|
|
|
# Gọi thẳng endpoint /content qua Graph API để tránh lỗi 401
|
|
url = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{item_id}/content"
|
|
file_bytes = self.graph._download_file(url)
|
|
|
|
return file_bytes
|
|
except Exception as e:
|
|
logger.error(f"SharePoint download_file failed for {target_item.get('name')}: {e}")
|
|
raise e
|