Spaces:
Running
Running
| # srd_engine_final.py | |
| # ============================================================ | |
| # CedroPass SRD – Final RAG Engine (Stable, Section-Aware) | |
| # ============================================================ | |
| import os | |
| import re | |
| import io | |
| import base64 | |
| import time | |
| import shutil | |
| import warnings | |
| from typing import List, Dict, Any, Optional | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # -------------------- Data Processing -------------------- | |
| import pdfplumber | |
| import camelot | |
| from pdf2image import convert_from_path | |
| import pytesseract | |
| from PIL import Image | |
| # -------------------- NLP & Retrieval -------------------- | |
| import spacy | |
| from sentence_transformers import CrossEncoder | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.retrievers import BM25Retriever | |
| from langchain_core.documents import Document | |
| from rapidfuzz import process as fuzz_process | |
| # -------------------- Claude -------------------- | |
| try: | |
| from anthropic import Anthropic | |
| except ImportError: | |
| Anthropic = None | |
| # -------------------- CONFIG -------------------- | |
| warnings.filterwarnings("ignore") | |
| Image.MAX_IMAGE_PIXELS = None | |
| POPPLER_PATH = os.getenv("POPPLER_PATH") | |
| TESSERACT_PATH = r"C:\Program Files\Tesseract-OCR\tesseract.exe" | |
| if os.path.exists(TESSERACT_PATH): | |
| pytesseract.pytesseract.tesseract_cmd = TESSERACT_PATH | |
| # -------------------- NLP MODEL -------------------- | |
| print("[SYSTEM] Loading NLP pipelines...") | |
| try: | |
| NLP_EN = spacy.load("en_core_web_sm") | |
| except OSError: | |
| from spacy.cli import download | |
| download("en_core_web_sm") | |
| NLP_EN = spacy.load("en_core_web_sm") | |
| # ============================================================ | |
| # TEXT UTILS | |
| # ============================================================ | |
| def normalize_text(text: str) -> str: | |
| return re.sub(r"\s+", " ", text).strip() | |
| def lemmatize_text(text: str) -> str: | |
| doc = NLP_EN(text[:50000]) | |
| return " ".join( | |
| t.lemma_.lower() | |
| for t in doc | |
| if not t.is_space and not t.is_punct | |
| ) | |
| # ============================================================ | |
| # SECTION-AWARE SRD SPLITTER (CRITICAL FIX) | |
| # ============================================================ | |
| class SmartSRDSplitter: | |
| """ | |
| Guarantees that ALL child paragraphs inherit the correct | |
| section_type until a new header appears. | |
| """ | |
| HEADER_REGEX = re.compile( | |
| r"^(\d+(\.\d+)*|FR-\d+|NFR-\d+|[A-Z][A-Za-z\s]{3,}:)", | |
| re.IGNORECASE, | |
| ) | |
| def split_text(self, text: str) -> List[Document]: | |
| docs: List[Document] = [] | |
| lines = text.splitlines() | |
| buffer: List[str] = [] | |
| current_section_title = "General" | |
| current_section_type = "general" | |
| for raw in lines: | |
| line = raw.strip() | |
| if not line: | |
| continue | |
| if self.HEADER_REGEX.match(line): | |
| # Flush previous chunk | |
| if buffer: | |
| docs.append( | |
| Document( | |
| page_content="\n".join(buffer), | |
| metadata={ | |
| "type": "text", | |
| "section": current_section_title, | |
| "section_type": current_section_type, | |
| "source": "SRD_Main", | |
| }, | |
| ) | |
| ) | |
| buffer = [line] | |
| current_section_title = line[:80] | |
| lowered = line.lower() | |
| if "functional requirement" in lowered or "fr-" in lowered: | |
| current_section_type = "functional" | |
| elif "non-functional" in lowered or "nfr-" in lowered: | |
| current_section_type = "nonfunctional" | |
| else: | |
| current_section_type = "general" | |
| else: | |
| buffer.append(line) | |
| # Final flush | |
| if buffer: | |
| docs.append( | |
| Document( | |
| page_content="\n".join(buffer), | |
| metadata={ | |
| "type": "text", | |
| "section": current_section_title, | |
| "section_type": current_section_type, | |
| "source": "SRD_Main", | |
| }, | |
| ) | |
| ) | |
| return docs | |
| # ============================================================ | |
| # PDF EXTRACTORS | |
| # ============================================================ | |
| def extract_pdf_text(path: str) -> str: | |
| text = "" | |
| with pdfplumber.open(path) as pdf: | |
| for p in pdf.pages: | |
| t = p.extract_text() | |
| if t: | |
| text += t + "\n" | |
| return text | |
| def extract_tables(path: str) -> List[Document]: | |
| docs: List[Document] = [] | |
| try: | |
| tables = camelot.read_pdf(path, pages="all", flavor="stream") | |
| for i, t in enumerate(tables): | |
| md = t.df.to_markdown(index=False) | |
| if len(md) > 30: | |
| docs.append( | |
| Document( | |
| page_content=md, | |
| metadata={ | |
| "type": "table", | |
| "section_type": "general", | |
| "source": "SRD_Table", | |
| }, | |
| ) | |
| ) | |
| except Exception: | |
| pass | |
| return docs | |
| # ============================================================ | |
| # DIAGRAM INTERPRETER (TEXT-ONLY SAFE) | |
| # ============================================================ | |
| class DiagramInterpreter: | |
| def __init__(self): | |
| self.client = ( | |
| Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) | |
| if Anthropic and os.getenv("ANTHROPIC_API_KEY") | |
| else None | |
| ) | |
| def describe(self, image: Image.Image, label: str) -> str: | |
| if not self.client: | |
| return pytesseract.image_to_string(image) | |
| buf = io.BytesIO() | |
| image.convert("RGB").save(buf, format="JPEG", quality=85) | |
| b64 = base64.b64encode(buf.getvalue()).decode() | |
| resp = self.client.messages.create( | |
| model="claude-sonnet-4-5-20250929", | |
| max_tokens=600, | |
| temperature=0.2, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": f"Explain this {label} diagram for an SRD."}, | |
| { | |
| "type": "image", | |
| "source": { | |
| "type": "base64", | |
| "media_type": "image/jpeg", | |
| "data": b64, | |
| }, | |
| }, | |
| ], | |
| } | |
| ], | |
| ) | |
| return resp.content[0].text | |
| # ============================================================ | |
| # CORE RAG ENGINE | |
| # ============================================================ | |
| class SRDChatbotEngine: | |
| def __init__(self, chroma_dir: str = "chroma_db_final"): | |
| print("[ENGINE] Initializing retrievers...") | |
| self.embedding_model = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2" | |
| ) | |
| self.reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2") | |
| self.chroma_dir = chroma_dir | |
| self.vectorstore: Optional[Chroma] = None | |
| self.chroma_retriever = None | |
| self.bm25_retriever: Optional[BM25Retriever] = None | |
| self.vocab = set() | |
| # -------------------- BUILD INDEX -------------------- | |
| def build_index( | |
| self, | |
| pdf_path: str, | |
| diagrams: Optional[List[str]] = None, | |
| ): | |
| if os.path.exists(self.chroma_dir): | |
| shutil.rmtree(self.chroma_dir) | |
| splitter = SmartSRDSplitter() | |
| docs = splitter.split_text(extract_pdf_text(pdf_path)) | |
| docs.extend(extract_tables(pdf_path)) | |
| for d in docs: | |
| d.metadata["lemma"] = lemmatize_text(d.page_content) | |
| for w in d.page_content.split(): | |
| if w.isalnum(): | |
| self.vocab.add(w.lower()) | |
| self.vectorstore = Chroma.from_documents( | |
| docs, | |
| embedding=self.embedding_model, | |
| persist_directory=self.chroma_dir, | |
| collection_name="srd_final", | |
| ) | |
| self.chroma_retriever = self.vectorstore.as_retriever(search_kwargs={"k": 20}) | |
| self.bm25_retriever = BM25Retriever.from_documents(docs) | |
| self.bm25_retriever.k = 20 | |
| print(f"✅ Indexed {len(docs)} SRD chunks") | |
| # -------------------- INTENT -------------------- | |
| def detect_intent(self, q: str) -> str: | |
| q = q.lower() | |
| if any(w in q for w in ["list", "enumerate", "all functional", "requirements of"]): | |
| return "enumeration" | |
| return "qa" | |
| # -------------------- ENUMERATION (NO SIM SEARCH) -------------------- | |
| def list_functional_requirements(self) -> List[str]: | |
| data = self.vectorstore.get( | |
| where={"section_type": "functional"} | |
| ) | |
| return data.get("documents", []) | |
| # -------------------- QUERY -------------------- | |
| def answer(self, query: str, claude) -> str: | |
| intent = self.detect_intent(query) | |
| if intent == "enumeration": | |
| items = self.list_functional_requirements() | |
| if not items: | |
| return "I could not find sufficient information in the provided SRD." | |
| prompt = f""" | |
| You are a Senior Project Architect. | |
| List ALL functional requirements below. | |
| Do not merge, summarize, or invent anything. | |
| REQUIREMENTS: | |
| {chr(10).join(items)} | |
| """ | |
| return claude.generate_raw(prompt) | |
| # ---------- Normal QA ---------- | |
| dense = self.chroma_retriever.invoke(query) | |
| sparse = self.bm25_retriever.invoke(query) | |
| pool = dense + sparse | |
| pairs = [[query, d.page_content] for d in pool] | |
| scores = self.reranker.predict(pairs) | |
| top = [ | |
| d.page_content | |
| for d, s in sorted(zip(pool, scores), key=lambda x: x[1], reverse=True) | |
| if s > -6 | |
| ][:8] | |
| if not top: | |
| return "I could not find sufficient information in the provided SRD." | |
| ctx = "\n---\n".join(top[:4000]) | |
| prompt = f""" | |
| Answer using ONLY the SRD context below. | |
| If unsupported, say so explicitly. | |
| CONTEXT: | |
| {ctx} | |
| QUESTION: | |
| {query} | |
| """ | |
| return claude.generate_raw(prompt) | |
| # ============================================================ | |
| # CLAUDE ANSWERER | |
| # ============================================================ | |
| class ClaudeAnswerer: | |
| def __init__(self): | |
| if Anthropic is None: | |
| raise RuntimeError("anthropic not installed") | |
| self.client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) | |
| self.model = "claude-sonnet-4-5-20250929" | |
| def generate_raw(self, prompt: str) -> str: | |
| resp = self.client.messages.create( | |
| model=self.model, | |
| max_tokens=1200, | |
| temperature=0.2, | |
| messages=[{"role": "user", "content": prompt}], | |
| ) | |
| return resp.content[0].text | |