import logging import sys from core.config import settings from core.models import IngestedDocument, ProcessingPolicy from ingestion.providers.sharepoint_provider import SharePointProvider from extraction.dce import DocumentClassificationEngine from extraction.ocr_service import OCRService from extraction.text_extractor import TextExtractor from chunking.markdown_chunker import MarkdownChunker from indexing.vector_store import VectorStore logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s:%(message)s") logger = logging.getLogger("RAGPipeline") def extract_text_from_pdf_bytes(pdf_bytes: bytes) -> str: """Trích xuất text trực tiếp từ PDF có text layer (không cần OCR).""" try: import fitz doc = fitz.open(stream=pdf_bytes, filetype="pdf") texts = [] for page in doc: texts.append(page.get_text()) return "\n\n".join(texts) except Exception as e: logger.error(f"Failed to extract text from PDF: {e}") return "" def run_pipeline(): logger.info("=== BẮT ĐẦU TEST TOÀN BỘ ĐƯỜNG ỐNG RAG (với DCE) ===") if settings.opensearch_host == "opensearch": settings.opensearch_host = "localhost" # 1. INGESTION logger.info("\n--- BƯỚC 1: Lấy file từ SharePoint ---") provider = SharePointProvider() items, _ = provider.fetch_changes({}) if not items: logger.error("Không có file nào trên SharePoint!") sys.exit(1) logger.info(f"Đã lấy {len(items)} items từ SharePoint.") # 2. DCE + PROCESSING dce = DocumentClassificationEngine(provider=provider) ocr = OCRService() chunker = MarkdownChunker(max_chunk_size=1000, overlap=100) try: vector_db = VectorStore(index_name="poc_sharepoint_docs") except Exception as e: logger.error(f"Không kết nối được OpenSearch: {e}") sys.exit(1) processed_count = 0 skipped_count = 0 for item in items: if item.get("is_folder") or item.get("is_deleted"): continue name = item.get("name", "") item_id = item.get("id", "") # Tạo IngestedDocument cho DCE item_details = provider.get_item_details(item_id) permissions = provider.get_item_permissions(item_id) doc = IngestedDocument( site_id=settings.sharepoint_site_id, drive_id="", item_id=item_id, name=name, web_url=item_details.get("web_url", ""), download_url=item_details.get("download_url"), is_folder=False, size=item.get("size", 0), ) # DCE PHÂN LOẠI logger.info(f"\n--- DCE: {name} ---") classification = dce.classify(doc, target_item=item) logger.info(f" → {classification.doc_type.value} | {classification.processing_policy.value} | {classification.reason}") # XỬ LÝ THEO POLICY if classification.processing_policy == ProcessingPolicy.UNSUPPORTED: logger.info(f" ⏭ BỎ QUA: {name} (unsupported)") skipped_count += 1 continue if classification.processing_policy == ProcessingPolicy.METADATA_ONLY: logger.info(f" ⏭ BỎ QUA: {name} (metadata-only, không index text)") skipped_count += 1 continue if classification.processing_policy == ProcessingPolicy.REQUIRES_REVIEW: logger.info(f" ⏭ BỎ QUA: {name} (cần review thủ công)") skipped_count += 1 continue # DOWNLOAD FILE logger.info(f" 📥 Đang tải {name}...") try: file_bytes = provider.download_file(item) except Exception as e: logger.error(f" ❌ Lỗi tải {name}: {e}") skipped_count += 1 continue if not file_bytes: logger.error(f" ❌ File rỗng: {name}") skipped_count += 1 continue # EXTRACTION pages = [] ext = name.lower().rsplit(".", 1)[-1] if "." in name else "" if classification.processing_policy == ProcessingPolicy.SKIP_OCR: if ext == "pdf": # TEXT_PDF: trích xuất text trực tiếp, không OCR logger.info(f" 📄 TEXT_PDF: Trích xuất text trực tiếp (không OCR)...") text = extract_text_from_pdf_bytes(file_bytes) if text.strip(): from core.models import OCRPageResult pages = [OCRPageResult(page=1, text=text, confidence=1.0)] else: logger.warning(f" ⚠️ Không trích xuất được text từ {name}") elif ext in ("docx", "doc"): logger.info(f" 📄 DOCX: Trích xuất text bằng python-docx...") pages = TextExtractor.extract_from_docx(file_bytes) elif ext in ("xlsx", "xls"): logger.info(f" 📄 XLSX: Trích xuất dữ liệu bằng openpyxl...") pages = TextExtractor.extract_from_xlsx(file_bytes) elif ext in ("txt", "md", "csv"): logger.info(f" 📄 {ext.upper()}: Đọc text trực tiếp...") pages = TextExtractor.extract_from_text(file_bytes) else: logger.info(f" 📄 {classification.doc_type.value}: Chưa hỗ trợ extract text, bỏ qua.") skipped_count += 1 continue elif classification.processing_policy == ProcessingPolicy.REQUIRES_OCR: # SCAN_PDF: dùng VLM OCR logger.info(f" 👁️ SCAN_PDF: Đang OCR qua VLM...") pages = ocr.process_pdf_bytes(file_bytes) if not pages: logger.warning(f" ⚠️ Không có nội dung để index: {name}") skipped_count += 1 continue # CHUNKING logger.info(f" ✂️ Đang chunk ({len(pages)} trang)...") metadata = { "item_id": item_id, "name": name, "web_url": item_details.get("web_url"), "download_url": item_details.get("download_url"), "site_id": settings.sharepoint_site_id, "permissions": permissions } chunks = chunker.chunk_document(pages, metadata) if not chunks: logger.warning(f" ⚠️ Không có chunks: {name}") skipped_count += 1 continue # INDEXING logger.info(f" 📦 Đang index {len(chunks)} chunks vào OpenSearch...") vector_db.delete_by_file_id(item_id) vector_db.embed_and_index(chunks) processed_count += 1 logger.info(f" ✅ HOÀN TẤT: {name} → {len(chunks)} chunks") # SUMMARY logger.info("\n" + "=" * 60) logger.info(f"📊 TỔNG KẾT: {processed_count} file đã xử lý, {skipped_count} file bỏ qua") logger.info("=" * 60) if __name__ == "__main__": run_pipeline()