import os import httpx import logging from typing import Optional from core.models import IngestedDocument, DocumentClassificationResult, DocumentType, ProcessingPolicy, PdfType from extraction.magic_numbers import MagicNumberValidator from extraction.pdf_inspector import PDFInspector logger = logging.getLogger("DCE") class DocumentClassificationEngine: """ Document Classification Engine (DCE). Phân loại file trước khi quyết định OCR / MarkItDown / Skip. """ def __init__(self, provider=None): """ Args: provider: BaseStorageProvider instance (optional). Nếu có, dùng để download file. """ self.pdf_inspector = PDFInspector() self.provider = provider def classify(self, document: IngestedDocument, target_item: dict = None) -> DocumentClassificationResult: """ Phân loại tài liệu. Args: document: IngestedDocument từ ingestion output target_item: Original item dict từ provider (dùng để download qua provider) """ logger.info(f"Classifying document: {document.name} (ID: {document.item_id})") ext = os.path.splitext(document.name)[1].lower() doc_type = DocumentType.UNKNOWN policy = ProcessingPolicy.UNSUPPORTED reason = "Initial state" # 1. Routing Rules if ext == ".pdf": pdf_type = self._classify_pdf(document, target_item) if pdf_type == PdfType.TEXT_PDF: doc_type = DocumentType.TEXTUAL_DOCUMENT policy = ProcessingPolicy.SKIP_OCR reason = "PDF has text layer (TEXT_PDF)" elif pdf_type == PdfType.DRAWING_PDF: doc_type = DocumentType.DRAWING policy = ProcessingPolicy.METADATA_ONLY reason = "PDF has large vector dimensions (DRAWING_PDF)" elif pdf_type == PdfType.AMBIGUOUS_PDF: doc_type = DocumentType.UNKNOWN policy = ProcessingPolicy.REQUIRES_REVIEW reason = "PDF size lớn bất thường (A3/A2 hoặc DPI cao), cần con người xác nhận" else: doc_type = DocumentType.TEXTUAL_DOCUMENT policy = ProcessingPolicy.REQUIRES_OCR reason = "PDF has no text layer (SCAN_PDF)" elif ext in [".docx", ".doc", ".txt", ".md"]: doc_type = DocumentType.TEXTUAL_DOCUMENT policy = ProcessingPolicy.SKIP_OCR reason = "Standard textual document format" elif ext in [".xlsx", ".xls", ".csv"]: doc_type = DocumentType.SPREADSHEET policy = ProcessingPolicy.SKIP_OCR reason = "Spreadsheet document format" elif ext in [".dwg", ".dxf", ".cad"]: doc_type = DocumentType.DRAWING policy = ProcessingPolicy.METADATA_ONLY reason = "Native CAD drawing format" elif ext in [".pptx", ".ppt"]: doc_type = DocumentType.PRESENTATION policy = ProcessingPolicy.SKIP_OCR reason = "Presentation document format" else: doc_type = DocumentType.BINARY policy = ProcessingPolicy.UNSUPPORTED reason = f"Unsupported or binary extension: {ext}" result = DocumentClassificationResult( item_id=document.item_id, doc_type=doc_type, processing_policy=policy, file_extension=ext, is_supported=policy != ProcessingPolicy.UNSUPPORTED, reason=reason ) logger.info(f"Result -> Type: {doc_type.value}, Policy: {policy.value}, Reason: {reason}") return result def _classify_pdf(self, document: IngestedDocument, target_item: dict = None) -> PdfType: """Phân loại PDF thành TEXT_PDF, SCAN_PDF, DRAWING_PDF, AMBIGUOUS_PDF.""" pdf_bytes = self._download_pdf(document, target_item) if not pdf_bytes: logger.warning(f"Cannot download PDF {document.name}. Defaulting to SCAN_PDF.") return PdfType.SCAN_PDF # Magic Number validation header = pdf_bytes[:256] is_valid, detected_type, sig_desc = MagicNumberValidator.validate_from_bytes(header) if is_valid: logger.info(f"Magic Number match: {sig_desc}") else: logger.warning(f"Magic number mismatch for {document.name}. Continuing with inspection.") # PDF Inspection return self.pdf_inspector.inspect_pdf_from_bytes(pdf_bytes) def _download_pdf(self, document: IngestedDocument, target_item: dict = None) -> Optional[bytes]: """Download PDF bytes. Ưu tiên dùng provider, fallback sang httpx.""" # Cách 1: Dùng provider (ưu tiên, đúng auth) if self.provider and target_item: # Chuẩn hóa: đảm bảo có field 'id' (ingestion_output có thể dùng 'item_id') if "id" not in target_item and "item_id" in target_item: target_item = {**target_item, "id": target_item["item_id"]} try: return self.provider.download_file(target_item) except Exception as e: logger.warning(f"Provider download failed: {e}. Falling back to httpx.") # Cách 2: Dùng httpx trực tiếp với download_url if document.download_url: try: with httpx.Client(follow_redirects=True, timeout=60.0) as client: resp = client.get(document.download_url) resp.raise_for_status() return resp.content except Exception as e: logger.error(f"httpx download failed: {e}") return None