import json import os import logging from datetime import datetime from typing import Dict, List, Optional logger = logging.getLogger("SyncAudit") AUDIT_DIR = os.path.dirname(os.path.abspath(__file__)) AUDIT_FILE = os.path.join(AUDIT_DIR, "sync_log.json") def _load_log() -> Dict: """Load audit log từ file.""" if os.path.exists(AUDIT_FILE): try: with open(AUDIT_FILE, "r", encoding="utf-8") as f: return json.load(f) except json.JSONDecodeError: return {"runs": []} return {"runs": []} def _save_log(data: Dict): """Lưu audit log ra file.""" with open(AUDIT_FILE, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) def start_run() -> str: """ Bắt đầu một sync run mới. Returns: run_id """ run_id = datetime.now().strftime("%Y%m%d_%H%M%S") data = _load_log() run = { "run_id": run_id, "started_at": datetime.now().isoformat(), "finished_at": None, "status": "running", "summary": {"processed": 0, "skipped": 0, "errors": 0}, "files": [], "errors": [] } data["runs"].insert(0, run) # Mới nhất lên đầu # Giới hạn 50 runs gần nhất if len(data["runs"]) > 50: data["runs"] = data["runs"][:50] _save_log(data) logger.info(f"Audit: Started run {run_id}") return run_id def log_file(run_id: str, name: str, item_id: str, policy: str, doc_type: str, chunks: int, status: str, detail: str = ""): """ Ghi log cho 1 file đã xử lý. Args: run_id: ID của sync run name: Tên file item_id: SharePoint item ID policy: Processing policy (requires_ocr, skip_ocr, metadata_only, unsupported) doc_type: Document type (textual_document, spreadsheet, drawing, binary) chunks: Số chunks đã tạo status: indexed, skipped, error detail: Ghi chú thêm """ data = _load_log() for run in data["runs"]: if run["run_id"] == run_id: file_entry = { "name": name, "item_id": item_id, "doc_type": doc_type, "policy": policy, "chunks": chunks, "status": status, "detail": detail, "timestamp": datetime.now().isoformat() } run["files"].append(file_entry) # Cập nhật summary if status == "indexed": run["summary"]["processed"] += 1 elif status == "skipped": run["summary"]["skipped"] += 1 elif status == "error": run["summary"]["errors"] += 1 run["errors"].append(f"{name}: {detail}") break _save_log(data) def finish_run(run_id: str, status: str = "completed"): """ Đánh dấu sync run hoàn thành. """ data = _load_log() for run in data["runs"]: if run["run_id"] == run_id: run["finished_at"] = datetime.now().isoformat() run["status"] = status break _save_log(data) logger.info(f"Audit: Finished run {run_id} ({status})") def get_history(limit: int = 10) -> List[Dict]: """ Lấy lịch sử sync runs. """ data = _load_log() return data["runs"][:limit] def get_run_detail(run_id: str) -> Optional[Dict]: """ Lấy chi tiết của 1 sync run. """ data = _load_log() for run in data["runs"]: if run["run_id"] == run_id: return run return None def get_latest_run() -> Optional[Dict]: """ Lấy sync run gần nhất. """ data = _load_log() return data["runs"][0] if data["runs"] else None