Files
poc_system/indexing/vector_store.py

102 lines
4.3 KiB
Python

import logging
from typing import List
from opensearchpy import OpenSearch, RequestsHttpConnection
from core.models import DocumentChunk
from core.config import settings
logger = logging.getLogger("VectorStore")
class VectorStore:
def __init__(self, index_name: str = "sharepoint_docs"):
self.index_name = index_name
# Kết nối tới OpenSearch Cluster
self.client = OpenSearch(
hosts=[{'host': settings.opensearch_host, 'port': settings.opensearch_port}],
http_auth=(settings.opensearch_user, settings.opensearch_pass),
use_ssl=False,
verify_certs=False,
connection_class=RequestsHttpConnection
)
# Load Local Embedding Model (Chạy bằng CPU/GPU nội bộ)
logger.info("Đang nạp Local Embedding Model (keepitreal/vietnamese-sbert)...")
try:
from sentence_transformers import SentenceTransformer
# Vietnamese-SBERT (768 dimensions), cực kỳ tốt cho văn bản Tiếng Việt
self.embedder = SentenceTransformer('keepitreal/vietnamese-sbert')
logger.info("Nạp Embedding Model thành công!")
except ImportError:
logger.error("LỖI: Chưa cài thư viện. Hãy chạy: pip install sentence-transformers opensearch-py")
raise
self._ensure_index_exists()
def _ensure_index_exists(self):
"""Khởi tạo Mapping cho Index nếu chưa có"""
if not self.client.indices.exists(index=self.index_name):
mapping = {
"settings": {
"index": {
"knn": True,
"knn.algo_param.ef_search": 100
}
},
"mappings": {
"properties": {
"chunk_id": { "type": "keyword" },
"file_id": { "type": "keyword" },
"file_name": { "type": "text" },
"text": {
"type": "text",
"analyzer": "standard" # Có thể đổi sang analyzer tiếng Việt nếu cài plugin vi
},
"embedding": {
"type": "knn_vector",
"dimension": 768, # Chiều dài vector của vietnamese-sbert
"method": {
"name": "hnsw",
"space_type": "l2",
"engine": "nmslib"
}
},
"site_id": { "type": "keyword" },
"page_from": { "type": "integer" },
"page_to": { "type": "integer" },
"source_url": { "type": "keyword" },
"permissions": { "type": "keyword" }
}
}
}
self.client.indices.create(index=self.index_name, body=mapping)
logger.info(f"Đã tạo OpenSearch Index: {self.index_name}")
def embed_and_index(self, chunks: List[DocumentChunk]):
"""Biến đổi Text thành Vector và lưu vào Database"""
if not chunks:
return
logger.info(f"Đang băm (Embedding) {len(chunks)} chunks thành Vector...")
texts = [chunk.text for chunk in chunks]
# Chạy model AI để tạo vector
embeddings = self.embedder.encode(texts, show_progress_bar=False)
actions = []
for i, chunk in enumerate(chunks):
chunk.embedding = embeddings[i].tolist()
# Chuẩn bị dữ liệu JSON cho OpenSearch Bulk API
action = {
"_op_type": "index",
"_index": self.index_name,
"_id": chunk.chunk_id,
"_source": chunk.dict()
}
actions.append(action)
logger.info("Đang nạp dữ liệu vào OpenSearch...")
from opensearchpy.helpers import bulk
success, failed = bulk(self.client, actions)
logger.info(f"Hoàn tất! Bơm thành công: {success} chunks. Thất bại: {len(failed) if isinstance(failed, list) else failed}")