40 lines
1.7 KiB
Python
40 lines
1.7 KiB
Python
from typing import Dict, Any, Tuple
|
|
import httpx
|
|
import logging
|
|
from core.models import IngestedDocument, DocumentClassificationResult, DocumentType, ProcessingPolicy
|
|
|
|
logger = logging.getLogger("DCE")
|
|
|
|
class MagicNumberValidator:
|
|
"""Validates file types using magic numbers (file signatures)."""
|
|
|
|
SIGNATURES = {
|
|
b"%PDF-": (DocumentType.TEXTUAL_DOCUMENT, "PDF Document"),
|
|
b"PK\x03\x04": (DocumentType.UNKNOWN, "ZIP Archive / Office Open XML"), # Needs further check
|
|
b"\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1": (DocumentType.UNKNOWN, "Legacy Office Document"),
|
|
# Add CAD magic numbers here if needed (e.g., AutoCAD DWG: b"AC10")
|
|
b"AC10": (DocumentType.DRAWING, "AutoCAD Drawing (DWG)")
|
|
}
|
|
|
|
@classmethod
|
|
def validate_from_bytes(cls, header_bytes: bytes) -> Tuple[bool, DocumentType, str]:
|
|
"""Checks if the bytes match any known signature."""
|
|
for sig, (doc_type, desc) in cls.SIGNATURES.items():
|
|
if header_bytes.startswith(sig):
|
|
return True, doc_type, desc
|
|
return False, DocumentType.UNKNOWN, "Unknown Signature"
|
|
|
|
@classmethod
|
|
def fetch_header_bytes(cls, download_url: str, num_bytes: int = 256) -> bytes:
|
|
"""Fetches only the first N bytes of a file using HTTP Range request."""
|
|
try:
|
|
# Idea: HTTP Range request prevents downloading huge files just to check headers
|
|
headers = {"Range": f"bytes=0-{num_bytes - 1}"}
|
|
with httpx.Client() as client:
|
|
response = client.get(download_url, headers=headers)
|
|
response.raise_for_status()
|
|
return response.content
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch header bytes: {e}")
|
|
return b""
|