Skip to content

RAG Module API Reference

Auto-generated from Python docstrings

LLM Factory

green_gov_rag.rag.llm_factory

LLM Provider Factory for multi-platform support.

Supports OpenAI, Azure OpenAI, AWS Bedrock, and Anthropic. Uses LangChain for abstraction across providers.

LLMFactory

Factory for creating LLM instances based on provider configuration.

Source code in green_gov_rag/rag/llm_factory.py
class LLMFactory:
    """Factory for creating LLM instances based on provider configuration."""

    @staticmethod
    def create_llm(
        provider: str | None = None,
        model: str | None = None,
        temperature: float = 0.2,
        max_tokens: int = 500,
    ) -> BaseLanguageModel:
        """Create an LLM instance based on the provider.

        Args:
        ----
            provider: LLM provider (openai, azure, bedrock, anthropic).
                     Defaults to settings.llm_provider
            model: Model name. Defaults to settings.llm_model
            temperature: Sampling temperature
            max_tokens: Maximum tokens in response

        Returns:
        -------
            LangChain BaseLanguageModel instance

        Raises:
        ------
            ValueError: If provider is not supported or required credentials are missing

        """
        provider = provider or settings.llm_provider
        model = model or settings.llm_model

        if provider == LLMProvider.OPENAI:
            return LLMFactory._create_openai(model, temperature, max_tokens)
        elif provider == LLMProvider.AZURE:
            return LLMFactory._create_azure_openai(model, temperature, max_tokens)
        elif provider == LLMProvider.BEDROCK:
            return LLMFactory._create_bedrock(model, temperature, max_tokens)
        elif provider == LLMProvider.ANTHROPIC:
            return LLMFactory._create_anthropic(model, temperature, max_tokens)
        else:
            msg = f"Unsupported LLM provider: {provider}"
            raise ValueError(msg)

    @staticmethod
    def _create_openai(
        model: str,
        temperature: float,
        max_tokens: int,
    ) -> BaseLanguageModel:
        """Create OpenAI LLM instance."""
        from langchain_openai import ChatOpenAI
        from pydantic import SecretStr

        if not settings.openai_api_key:
            msg = "OPENAI_API_KEY is required for OpenAI provider"
            raise ValueError(msg)

        return ChatOpenAI(
            model=model,
            temperature=temperature,
            max_completion_tokens=max_tokens,
            api_key=SecretStr(settings.openai_api_key),
        )

    @staticmethod
    def _create_azure_openai(
        model: str,
        temperature: float,
        max_tokens: int,
    ) -> BaseLanguageModel:
        """Create Azure OpenAI LLM instance."""
        from langchain_openai import AzureChatOpenAI
        from pydantic import SecretStr

        if not settings.azure_openai_api_key or not settings.azure_openai_endpoint:
            msg = "AZURE_OPENAI_API_KEY and AZURE_OPENAI_ENDPOINT are required for Azure provider"
            raise ValueError(msg)

        deployment_name = settings.azure_openai_deployment or model

        return AzureChatOpenAI(
            azure_deployment=deployment_name,
            model=model,
            temperature=temperature,
            # max_tokens parameter removed - causes empty responses with some models
            azure_endpoint=settings.azure_openai_endpoint,
            api_key=SecretStr(settings.azure_openai_api_key),
            api_version=settings.azure_openai_api_version,
        )

    @staticmethod
    def _create_bedrock(
        model: str,
        temperature: float,
        max_tokens: int,
    ) -> BaseLanguageModel:
        """Create AWS Bedrock LLM instance."""
        from langchain_aws import ChatBedrock
        from pydantic import SecretStr

        if not settings.aws_access_key_id or not settings.aws_secret_access_key:
            msg = "AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are required for Bedrock provider"
            raise ValueError(msg)

        model_id = settings.bedrock_model_id or model

        return ChatBedrock(
            model=model_id,
            model_kwargs={
                "temperature": temperature,
                "max_tokens": max_tokens,
            },
            credentials_profile_name=None,
            aws_access_key_id=SecretStr(settings.aws_access_key_id),
            aws_secret_access_key=SecretStr(settings.aws_secret_access_key),
            region=settings.aws_region,
        )

    @staticmethod
    def _create_anthropic(
        model: str,
        temperature: float,
        max_tokens: int,
    ) -> BaseLanguageModel:
        """Create Anthropic LLM instance."""
        from langchain_anthropic import ChatAnthropic
        from pydantic import SecretStr

        if not settings.anthropic_api_key:
            msg = "ANTHROPIC_API_KEY is required for Anthropic provider"
            raise ValueError(msg)

        return ChatAnthropic(
            model_name=model,
            temperature=temperature,
            max_tokens_to_sample=max_tokens,
            timeout=None,
            stop=None,
            api_key=SecretStr(settings.anthropic_api_key),
        )
create_llm staticmethod
create_llm(provider: str | None = None, model: str | None = None, temperature: float = 0.2, max_tokens: int = 500) -> BaseLanguageModel

Create an LLM instance based on the provider.


provider: LLM provider (openai, azure, bedrock, anthropic).
         Defaults to settings.llm_provider
model: Model name. Defaults to settings.llm_model
temperature: Sampling temperature
max_tokens: Maximum tokens in response

LangChain BaseLanguageModel instance

ValueError: If provider is not supported or required credentials are missing
Source code in green_gov_rag/rag/llm_factory.py
@staticmethod
def create_llm(
    provider: str | None = None,
    model: str | None = None,
    temperature: float = 0.2,
    max_tokens: int = 500,
) -> BaseLanguageModel:
    """Create an LLM instance based on the provider.

    Args:
    ----
        provider: LLM provider (openai, azure, bedrock, anthropic).
                 Defaults to settings.llm_provider
        model: Model name. Defaults to settings.llm_model
        temperature: Sampling temperature
        max_tokens: Maximum tokens in response

    Returns:
    -------
        LangChain BaseLanguageModel instance

    Raises:
    ------
        ValueError: If provider is not supported or required credentials are missing

    """
    provider = provider or settings.llm_provider
    model = model or settings.llm_model

    if provider == LLMProvider.OPENAI:
        return LLMFactory._create_openai(model, temperature, max_tokens)
    elif provider == LLMProvider.AZURE:
        return LLMFactory._create_azure_openai(model, temperature, max_tokens)
    elif provider == LLMProvider.BEDROCK:
        return LLMFactory._create_bedrock(model, temperature, max_tokens)
    elif provider == LLMProvider.ANTHROPIC:
        return LLMFactory._create_anthropic(model, temperature, max_tokens)
    else:
        msg = f"Unsupported LLM provider: {provider}"
        raise ValueError(msg)

get_llm

get_llm(provider: str | None = None, model: str | None = None, temperature: float = 0.2, max_tokens: int = 500) -> BaseLanguageModel

Convenience function to get an LLM instance.


provider: LLM provider (openai, azure, bedrock, anthropic)
model: Model name
temperature: Sampling temperature
max_tokens: Maximum tokens in response

LangChain BaseLanguageModel instance
Source code in green_gov_rag/rag/llm_factory.py
def get_llm(
    provider: str | None = None,
    model: str | None = None,
    temperature: float = 0.2,
    max_tokens: int = 500,
) -> BaseLanguageModel:
    """Convenience function to get an LLM instance.

    Args:
    ----
        provider: LLM provider (openai, azure, bedrock, anthropic)
        model: Model name
        temperature: Sampling temperature
        max_tokens: Maximum tokens in response

    Returns:
    -------
        LangChain BaseLanguageModel instance

    """
    return LLMFactory.create_llm(provider, model, temperature, max_tokens)

Vector Store Factory

green_gov_rag.rag.vector_store_factory

Factory for creating vector store instances.

VectorStoreFactory

Factory for creating vector store instances based on configuration.

Source code in green_gov_rag/rag/vector_store_factory.py
class VectorStoreFactory:
    """Factory for creating vector store instances based on configuration."""

    @staticmethod
    def create_vector_store(
        embeddings: Embeddings,
        store_type: str | None = None,
        **kwargs,
    ) -> VectorStoreInterface:
        """Create a vector store instance.

        Args:
            embeddings: Embeddings model to use
            store_type: Type of store ('faiss', 'qdrant', 'chromadb').
                       If None, uses settings.vector_store_type
            **kwargs: Additional arguments for specific store implementations

        Returns:
            VectorStoreInterface: Initialized vector store

        Raises:
            ValueError: If store_type is not supported

        Examples:
            >>> from green_gov_rag.rag.embeddings import ChunkEmbedder
            >>> embeddings = ChunkEmbedder().embedder
            >>> store = VectorStoreFactory.create_vector_store(embeddings)

            >>> # Explicitly choose Qdrant
            >>> store = VectorStoreFactory.create_vector_store(
            ...     embeddings,
            ...     store_type='qdrant',
            ...     url='http://localhost:6333'
            ... )
        """
        store_type = store_type or settings.vector_store_type

        logger.info(f"Creating vector store: {store_type}")

        if store_type == "faiss":
            return VectorStoreFactory._create_faiss_store(embeddings, **kwargs)
        elif store_type == "qdrant":
            return VectorStoreFactory._create_qdrant_store(embeddings, **kwargs)
        elif store_type == "chromadb":
            return VectorStoreFactory._create_chroma_store(embeddings, **kwargs)
        else:
            raise ValueError(
                f"Unsupported vector store type: {store_type}. "
                f"Supported types: faiss, qdrant, chromadb"
            )

    @staticmethod
    def _create_faiss_store(
        embeddings: Embeddings,
        **kwargs,
    ) -> VectorStoreInterface:
        """Create FAISS vector store."""
        from green_gov_rag.rag.stores import FAISSVectorStore

        index_path = kwargs.get("index_path") or settings.vector_store_path

        return FAISSVectorStore(
            embeddings=embeddings,
            index_path=index_path,
            **kwargs,
        )

    @staticmethod
    def _create_qdrant_store(
        embeddings: Embeddings,
        **kwargs,
    ) -> VectorStoreInterface:
        """Create Qdrant vector store."""
        from green_gov_rag.rag.stores import QdrantVectorStore

        url: str = kwargs.pop("url", None) or settings.qdrant_url or ""
        if not url:
            raise ValueError(
                "Qdrant URL not configured. Set QDRANT_URL in environment or pass url parameter."
            )

        api_key: str | None = kwargs.pop("api_key", None) or settings.qdrant_api_key
        collection_name: str = str(
            kwargs.pop("collection_name", settings.collection_name)
        )

        return QdrantVectorStore(
            embeddings=embeddings,
            url=url,
            api_key=api_key,
            collection_name=collection_name,
            **kwargs,  # Any remaining kwargs
        )

    @staticmethod
    def _create_chroma_store(
        embeddings: Embeddings,
        **kwargs,
    ) -> VectorStoreInterface:
        """Create ChromaDB vector store."""
        # TODO: Implement ChromaDB store
        raise NotImplementedError(
            "ChromaDB support coming soon. Use 'faiss' or 'qdrant' for now."
        )

    @staticmethod
    def get_available_stores() -> list[str]:
        """Get list of available vector store types.

        Returns:
            List of supported store types
        """
        available = ["faiss"]

        # Check if Qdrant is available
        try:
            import qdrant_client  # noqa: F401

            available.append("qdrant")
        except ImportError:
            pass

        # Check if ChromaDB is available
        try:
            import chromadb  # noqa: F401

            available.append("chromadb")
        except ImportError:
            pass

        return available

    @staticmethod
    def validate_config(store_type: str | None = None) -> dict:
        """Validate configuration for a vector store type.

        Args:
            store_type: Type to validate, or None for current config

        Returns:
            Dictionary with validation results

        Examples:
            >>> VectorStoreFactory.validate_config('qdrant')
            {
                'valid': True,
                'store_type': 'qdrant',
                'issues': [],
                'config': {'url': 'http://localhost:6333', ...}
            }
        """
        store_type = store_type or settings.vector_store_type
        issues = []
        config: dict[str, str | None] = {}

        if store_type == "faiss":
            config["index_path"] = settings.vector_store_path
            if not settings.vector_store_path:
                issues.append("VECTOR_STORE_PATH not configured")

        elif store_type == "qdrant":
            config["url"] = settings.qdrant_url
            config["api_key"] = "***" if settings.qdrant_api_key else None

            if not settings.qdrant_url:
                issues.append("QDRANT_URL not configured")

            # Check if Qdrant client is installed
            try:
                import qdrant_client  # noqa: F401
            except ImportError:
                issues.append(
                    "qdrant_client not installed. "
                    "Install with: pip install qdrant-client langchain-qdrant"
                )

        elif store_type == "chromadb":
            issues.append("ChromaDB not yet implemented")

        else:
            issues.append(f"Unknown store type: {store_type}")

        return {
            "valid": len(issues) == 0,
            "store_type": store_type,
            "issues": issues,
            "config": config,
        }
create_vector_store staticmethod
create_vector_store(embeddings: Embeddings, store_type: str | None = None, **kwargs) -> VectorStoreInterface

Create a vector store instance.

Parameters:

Name Type Description Default
embeddings Embeddings

Embeddings model to use

required
store_type str | None

Type of store ('faiss', 'qdrant', 'chromadb'). If None, uses settings.vector_store_type

None
**kwargs

Additional arguments for specific store implementations

{}

Returns:

Name Type Description
VectorStoreInterface VectorStoreInterface

Initialized vector store

Raises:

Type Description
ValueError

If store_type is not supported

Examples:

>>> from green_gov_rag.rag.embeddings import ChunkEmbedder
>>> embeddings = ChunkEmbedder().embedder
>>> store = VectorStoreFactory.create_vector_store(embeddings)
>>> # Explicitly choose Qdrant
>>> store = VectorStoreFactory.create_vector_store(
...     embeddings,
...     store_type='qdrant',
...     url='http://localhost:6333'
... )
Source code in green_gov_rag/rag/vector_store_factory.py
@staticmethod
def create_vector_store(
    embeddings: Embeddings,
    store_type: str | None = None,
    **kwargs,
) -> VectorStoreInterface:
    """Create a vector store instance.

    Args:
        embeddings: Embeddings model to use
        store_type: Type of store ('faiss', 'qdrant', 'chromadb').
                   If None, uses settings.vector_store_type
        **kwargs: Additional arguments for specific store implementations

    Returns:
        VectorStoreInterface: Initialized vector store

    Raises:
        ValueError: If store_type is not supported

    Examples:
        >>> from green_gov_rag.rag.embeddings import ChunkEmbedder
        >>> embeddings = ChunkEmbedder().embedder
        >>> store = VectorStoreFactory.create_vector_store(embeddings)

        >>> # Explicitly choose Qdrant
        >>> store = VectorStoreFactory.create_vector_store(
        ...     embeddings,
        ...     store_type='qdrant',
        ...     url='http://localhost:6333'
        ... )
    """
    store_type = store_type or settings.vector_store_type

    logger.info(f"Creating vector store: {store_type}")

    if store_type == "faiss":
        return VectorStoreFactory._create_faiss_store(embeddings, **kwargs)
    elif store_type == "qdrant":
        return VectorStoreFactory._create_qdrant_store(embeddings, **kwargs)
    elif store_type == "chromadb":
        return VectorStoreFactory._create_chroma_store(embeddings, **kwargs)
    else:
        raise ValueError(
            f"Unsupported vector store type: {store_type}. "
            f"Supported types: faiss, qdrant, chromadb"
        )
get_available_stores staticmethod
get_available_stores() -> list[str]

Get list of available vector store types.

Returns:

Type Description
list[str]

List of supported store types

Source code in green_gov_rag/rag/vector_store_factory.py
@staticmethod
def get_available_stores() -> list[str]:
    """Get list of available vector store types.

    Returns:
        List of supported store types
    """
    available = ["faiss"]

    # Check if Qdrant is available
    try:
        import qdrant_client  # noqa: F401

        available.append("qdrant")
    except ImportError:
        pass

    # Check if ChromaDB is available
    try:
        import chromadb  # noqa: F401

        available.append("chromadb")
    except ImportError:
        pass

    return available
validate_config staticmethod
validate_config(store_type: str | None = None) -> dict

Validate configuration for a vector store type.

Parameters:

Name Type Description Default
store_type str | None

Type to validate, or None for current config

None

Returns:

Type Description
dict

Dictionary with validation results

Examples:

>>> VectorStoreFactory.validate_config('qdrant')
{
    'valid': True,
    'store_type': 'qdrant',
    'issues': [],
    'config': {'url': 'http://localhost:6333', ...}
}
Source code in green_gov_rag/rag/vector_store_factory.py
@staticmethod
def validate_config(store_type: str | None = None) -> dict:
    """Validate configuration for a vector store type.

    Args:
        store_type: Type to validate, or None for current config

    Returns:
        Dictionary with validation results

    Examples:
        >>> VectorStoreFactory.validate_config('qdrant')
        {
            'valid': True,
            'store_type': 'qdrant',
            'issues': [],
            'config': {'url': 'http://localhost:6333', ...}
        }
    """
    store_type = store_type or settings.vector_store_type
    issues = []
    config: dict[str, str | None] = {}

    if store_type == "faiss":
        config["index_path"] = settings.vector_store_path
        if not settings.vector_store_path:
            issues.append("VECTOR_STORE_PATH not configured")

    elif store_type == "qdrant":
        config["url"] = settings.qdrant_url
        config["api_key"] = "***" if settings.qdrant_api_key else None

        if not settings.qdrant_url:
            issues.append("QDRANT_URL not configured")

        # Check if Qdrant client is installed
        try:
            import qdrant_client  # noqa: F401
        except ImportError:
            issues.append(
                "qdrant_client not installed. "
                "Install with: pip install qdrant-client langchain-qdrant"
            )

    elif store_type == "chromadb":
        issues.append("ChromaDB not yet implemented")

    else:
        issues.append(f"Unknown store type: {store_type}")

    return {
        "valid": len(issues) == 0,
        "store_type": store_type,
        "issues": issues,
        "config": config,
    }

create_vector_store

create_vector_store(embeddings: Embeddings, store_type: str | None = None, **kwargs) -> VectorStoreInterface

Create a vector store instance.

Convenience wrapper around VectorStoreFactory.create_vector_store()

Parameters:

Name Type Description Default
embeddings Embeddings

Embeddings model

required
store_type str | None

Type of store (faiss, qdrant, chromadb)

None
**kwargs

Additional store-specific arguments

{}

Returns:

Name Type Description
VectorStoreInterface VectorStoreInterface

Initialized vector store

Source code in green_gov_rag/rag/vector_store_factory.py
def create_vector_store(
    embeddings: Embeddings,
    store_type: str | None = None,
    **kwargs,
) -> VectorStoreInterface:
    """Create a vector store instance.

    Convenience wrapper around VectorStoreFactory.create_vector_store()

    Args:
        embeddings: Embeddings model
        store_type: Type of store (faiss, qdrant, chromadb)
        **kwargs: Additional store-specific arguments

    Returns:
        VectorStoreInterface: Initialized vector store
    """
    return VectorStoreFactory.create_vector_store(embeddings, store_type, **kwargs)

Embeddings

green_gov_rag.rag.embeddings

Embeddings module.

Generate vector embeddings for document chunks using either AWS Bedrock LLM or HuggingFace embedding models.

  1. Supports dual embedding providers:
    • HuggingFace (sentence-transformers)
    • AWS Bedrock (via OpenAI-compatible API)
  2. Takes chunk dicts with content + metadata.
  3. Returns dicts with embedding included.
  4. Easily integrated into your ETL pipeline after chunker.py.

Now uses centralized settings from green_gov_rag.config

ChunkEmbedder

Source code in green_gov_rag/rag/embeddings.py
class ChunkEmbedder:
    def __init__(self, provider: str = "bedrock", model_name: str | None = None):
        """Initialize embedding generator.

        :param provider: "bedrock" or "huggingface"
        :param model_name: Name of the model to use.
        """
        self.provider = provider.lower()
        if self.provider == "huggingface":
            self.model_name = model_name or settings.embedding_model
            self.embedder: HuggingFaceEmbeddings | OpenAIEmbeddings = (
                HuggingFaceEmbeddings(model_name=self.model_name)
            )
        elif self.provider == "bedrock":
            bedrock_model = model_name or settings.bedrock_model_id
            self.model_name = bedrock_model if bedrock_model else "anthropic.claude-v2"
            self.embedder = OpenAIEmbeddings(model=self.model_name)
        else:
            msg = "provider must be 'bedrock' or 'huggingface'"
            raise ValueError(msg)

    def embed_chunks(
        self, chunks: list[dict], batch_size: int = 100, show_progress: bool = True
    ) -> list[dict]:
        """Generate embeddings for a list of chunk dictionaries using batching.

        :param chunks: List of dicts with at least {"content": str, "metadata": dict}
        :param batch_size: Number of chunks to embed per batch (default: 100)
        :param show_progress: Show progress information (default: True)
        :return: List of dicts with {"content", "metadata", "embedding"}
        """
        embedded_chunks = []

        # Filter out empty chunks
        valid_chunks = [
            chunk
            for chunk in chunks
            if chunk.get("content") and str(chunk.get("content")).strip()
        ]

        if not valid_chunks:
            return []

        total_batches = (len(valid_chunks) + batch_size - 1) // batch_size

        for i in range(0, len(valid_chunks), batch_size):
            batch = valid_chunks[i : i + batch_size]
            batch_num = i // batch_size + 1

            # Extract texts and metadata
            texts = [chunk["content"] for chunk in batch]
            metadatas = [chunk.get("metadata", {}) for chunk in batch]

            # Generate embeddings for entire batch at once
            vectors = self.embedder.embed_documents(texts)

            # Combine results
            for text, metadata, vector in zip(texts, metadatas, vectors):
                embedded_chunks.append(
                    {"content": text, "metadata": metadata, "embedding": vector}
                )

            if show_progress and batch_num % 10 == 0:
                print(
                    f"   Processed batch {batch_num}/{total_batches} ({len(embedded_chunks)} chunks)"
                )

        if show_progress:
            print(
                f"   Completed: {len(embedded_chunks)} chunks embedded in {total_batches} batches"
            )

        return embedded_chunks
__init__
__init__(provider: str = 'bedrock', model_name: str | None = None)

Initialize embedding generator.

:param provider: "bedrock" or "huggingface" :param model_name: Name of the model to use.

Source code in green_gov_rag/rag/embeddings.py
def __init__(self, provider: str = "bedrock", model_name: str | None = None):
    """Initialize embedding generator.

    :param provider: "bedrock" or "huggingface"
    :param model_name: Name of the model to use.
    """
    self.provider = provider.lower()
    if self.provider == "huggingface":
        self.model_name = model_name or settings.embedding_model
        self.embedder: HuggingFaceEmbeddings | OpenAIEmbeddings = (
            HuggingFaceEmbeddings(model_name=self.model_name)
        )
    elif self.provider == "bedrock":
        bedrock_model = model_name or settings.bedrock_model_id
        self.model_name = bedrock_model if bedrock_model else "anthropic.claude-v2"
        self.embedder = OpenAIEmbeddings(model=self.model_name)
    else:
        msg = "provider must be 'bedrock' or 'huggingface'"
        raise ValueError(msg)
embed_chunks
embed_chunks(chunks: list[dict], batch_size: int = 100, show_progress: bool = True) -> list[dict]

Generate embeddings for a list of chunk dictionaries using batching.

:param chunks: List of dicts with at least {"content": str, "metadata": dict} :param batch_size: Number of chunks to embed per batch (default: 100) :param show_progress: Show progress information (default: True) :return: List of dicts with {"content", "metadata", "embedding"}

Source code in green_gov_rag/rag/embeddings.py
def embed_chunks(
    self, chunks: list[dict], batch_size: int = 100, show_progress: bool = True
) -> list[dict]:
    """Generate embeddings for a list of chunk dictionaries using batching.

    :param chunks: List of dicts with at least {"content": str, "metadata": dict}
    :param batch_size: Number of chunks to embed per batch (default: 100)
    :param show_progress: Show progress information (default: True)
    :return: List of dicts with {"content", "metadata", "embedding"}
    """
    embedded_chunks = []

    # Filter out empty chunks
    valid_chunks = [
        chunk
        for chunk in chunks
        if chunk.get("content") and str(chunk.get("content")).strip()
    ]

    if not valid_chunks:
        return []

    total_batches = (len(valid_chunks) + batch_size - 1) // batch_size

    for i in range(0, len(valid_chunks), batch_size):
        batch = valid_chunks[i : i + batch_size]
        batch_num = i // batch_size + 1

        # Extract texts and metadata
        texts = [chunk["content"] for chunk in batch]
        metadatas = [chunk.get("metadata", {}) for chunk in batch]

        # Generate embeddings for entire batch at once
        vectors = self.embedder.embed_documents(texts)

        # Combine results
        for text, metadata, vector in zip(texts, metadatas, vectors):
            embedded_chunks.append(
                {"content": text, "metadata": metadata, "embedding": vector}
            )

        if show_progress and batch_num % 10 == 0:
            print(
                f"   Processed batch {batch_num}/{total_batches} ({len(embedded_chunks)} chunks)"
            )

    if show_progress:
        print(
            f"   Completed: {len(embedded_chunks)} chunks embedded in {total_batches} batches"
        )

    return embedded_chunks

Enhanced Response

green_gov_rag.rag.enhanced_response

Enhanced Response Generator with Citations and Deep Links.

This module provides advanced RAG response formatting with: 1. Inline citations with source numbers [1], [2], etc. 2. Deep links to specific PDF pages/sections 3. Hierarchical section path display (e.g., "Section 2.1.3") 4. Source attribution with document metadata 5. Confidence scoring for cited passages

Citation

A citation linking answer text to source document.

Source code in green_gov_rag/rag/enhanced_response.py
class Citation:
    """A citation linking answer text to source document."""

    def __init__(
        self,
        source_number: int,
        document: Document,
        text_snippet: str,
        confidence: float = 1.0,
    ):
        """Initialize citation.

        Args:
        ----
            source_number: Citation number (1, 2, 3, etc.)
            document: Source Document object
            text_snippet: Text excerpt that was cited
            confidence: Confidence score for this citation (0-1)

        """
        self.source_number = source_number
        self.document = document
        self.text_snippet = text_snippet
        self.confidence = confidence
        self.metadata = document.metadata

    def get_deep_link(self) -> str | None:
        """Generate deep link to specific page/section in PDF.

        Returns
        -------
            URL with fragment identifier for PDF page

        """
        source_url = self.metadata.get("source_url")
        if not source_url:
            return None

        # Get page number if available
        page = self.metadata.get("page")
        if page is not None:
            # PDF page fragment (page=N)
            return f"{source_url}#page={page}"

        # Get section anchor if available
        section_id = self.metadata.get("section_id")
        if section_id:
            return f"{source_url}#{section_id}"

        return source_url

    def get_section_path(self) -> str | None:
        """Get hierarchical section path (e.g., 'Section 2.1.3').

        Returns
        -------
            Formatted section path string

        """
        # Check for hierarchical metadata from LayoutPDFReader
        section_path = self.metadata.get("section_path")
        if section_path:
            return section_path

        # Fallback: construct from section_number and section_title
        section_num = self.metadata.get("section_number")
        section_title = self.metadata.get("section_title")

        if section_num and section_title:
            return f"Section {section_num}: {section_title}"
        elif section_num:
            return f"Section {section_num}"
        elif section_title:
            return section_title

        return None

    def format_citation_markdown(self) -> str:
        """Format citation as markdown with link.

        Returns
        -------
            Markdown-formatted citation string

        """
        title = self.metadata.get("title", "Untitled Document")
        deep_link = self.get_deep_link()
        section_path = self.get_section_path()

        # Build citation components
        citation_parts = [f"[{self.source_number}]"]

        if deep_link:
            citation_parts.append(f"[{title}]({deep_link})")
        else:
            citation_parts.append(title)

        # Add section path if available
        if section_path:
            citation_parts.append(f"({section_path})")

        # Add page number if no section path
        elif "page" in self.metadata:
            citation_parts.append(f"(p. {self.metadata['page']})")

        return " ".join(citation_parts)

    def to_dict(self) -> dict[str, Any]:
        """Convert citation to dictionary.

        Returns
        -------
            Dict representation of citation

        """
        return {
            "source_number": self.source_number,
            "title": self.metadata.get("title", "Untitled"),
            "text_snippet": self.text_snippet,
            "confidence": self.confidence,
            "deep_link": self.get_deep_link(),
            "section_path": self.get_section_path(),
            "page": self.metadata.get("page"),
            "source_url": self.metadata.get("source_url"),
            "metadata": self.metadata,
        }
__init__
__init__(source_number: int, document: Document, text_snippet: str, confidence: float = 1.0)

Initialize citation.


source_number: Citation number (1, 2, 3, etc.)
document: Source Document object
text_snippet: Text excerpt that was cited
confidence: Confidence score for this citation (0-1)
Source code in green_gov_rag/rag/enhanced_response.py
def __init__(
    self,
    source_number: int,
    document: Document,
    text_snippet: str,
    confidence: float = 1.0,
):
    """Initialize citation.

    Args:
    ----
        source_number: Citation number (1, 2, 3, etc.)
        document: Source Document object
        text_snippet: Text excerpt that was cited
        confidence: Confidence score for this citation (0-1)

    """
    self.source_number = source_number
    self.document = document
    self.text_snippet = text_snippet
    self.confidence = confidence
    self.metadata = document.metadata
get_deep_link() -> str | None

Generate deep link to specific page/section in PDF.

URL with fragment identifier for PDF page
Source code in green_gov_rag/rag/enhanced_response.py
def get_deep_link(self) -> str | None:
    """Generate deep link to specific page/section in PDF.

    Returns
    -------
        URL with fragment identifier for PDF page

    """
    source_url = self.metadata.get("source_url")
    if not source_url:
        return None

    # Get page number if available
    page = self.metadata.get("page")
    if page is not None:
        # PDF page fragment (page=N)
        return f"{source_url}#page={page}"

    # Get section anchor if available
    section_id = self.metadata.get("section_id")
    if section_id:
        return f"{source_url}#{section_id}"

    return source_url
get_section_path
get_section_path() -> str | None

Get hierarchical section path (e.g., 'Section 2.1.3').

Returns
Formatted section path string
Source code in green_gov_rag/rag/enhanced_response.py
def get_section_path(self) -> str | None:
    """Get hierarchical section path (e.g., 'Section 2.1.3').

    Returns
    -------
        Formatted section path string

    """
    # Check for hierarchical metadata from LayoutPDFReader
    section_path = self.metadata.get("section_path")
    if section_path:
        return section_path

    # Fallback: construct from section_number and section_title
    section_num = self.metadata.get("section_number")
    section_title = self.metadata.get("section_title")

    if section_num and section_title:
        return f"Section {section_num}: {section_title}"
    elif section_num:
        return f"Section {section_num}"
    elif section_title:
        return section_title

    return None
format_citation_markdown
format_citation_markdown() -> str

Format citation as markdown with link.

Returns
Markdown-formatted citation string
Source code in green_gov_rag/rag/enhanced_response.py
def format_citation_markdown(self) -> str:
    """Format citation as markdown with link.

    Returns
    -------
        Markdown-formatted citation string

    """
    title = self.metadata.get("title", "Untitled Document")
    deep_link = self.get_deep_link()
    section_path = self.get_section_path()

    # Build citation components
    citation_parts = [f"[{self.source_number}]"]

    if deep_link:
        citation_parts.append(f"[{title}]({deep_link})")
    else:
        citation_parts.append(title)

    # Add section path if available
    if section_path:
        citation_parts.append(f"({section_path})")

    # Add page number if no section path
    elif "page" in self.metadata:
        citation_parts.append(f"(p. {self.metadata['page']})")

    return " ".join(citation_parts)
to_dict
to_dict() -> dict[str, Any]

Convert citation to dictionary.

Returns
Dict representation of citation
Source code in green_gov_rag/rag/enhanced_response.py
def to_dict(self) -> dict[str, Any]:
    """Convert citation to dictionary.

    Returns
    -------
        Dict representation of citation

    """
    return {
        "source_number": self.source_number,
        "title": self.metadata.get("title", "Untitled"),
        "text_snippet": self.text_snippet,
        "confidence": self.confidence,
        "deep_link": self.get_deep_link(),
        "section_path": self.get_section_path(),
        "page": self.metadata.get("page"),
        "source_url": self.metadata.get("source_url"),
        "metadata": self.metadata,
    }

EnhancedResponse

Enhanced RAG response with inline citations and source attribution.

Source code in green_gov_rag/rag/enhanced_response.py
class EnhancedResponse:
    """Enhanced RAG response with inline citations and source attribution."""

    def __init__(self, answer: str, sources: list[Document], query: str):
        """Initialize enhanced response.

        Args:
        ----
            answer: Generated answer text
            sources: List of source Documents used
            query: Original user query

        """
        self.answer = answer
        self.sources = sources
        self.query = query
        self.citations: list[Citation] = []
        self._build_citations()

    def _build_citations(self) -> None:
        """Build citation objects from source documents."""
        for i, doc in enumerate(self.sources, start=1):
            # Create citation with snippet from document
            snippet = (
                doc.page_content[:200] + "..."
                if len(doc.page_content) > 200
                else doc.page_content
            )

            citation = Citation(
                source_number=i,
                document=doc,
                text_snippet=snippet,
                confidence=doc.metadata.get("relevance_score", 1.0),
            )
            self.citations.append(citation)

    def format_answer_with_inline_citations(self) -> str:
        """Format answer with inline citation markers.

        Returns
        -------
            Answer text with inline [1], [2], etc. citations

        """
        # In a production system, this would use NLP to identify
        # which parts of the answer come from which sources
        # For now, add all citations at the end

        answer_with_citations = self.answer

        # Add citation markers if not already present
        if not any(f"[{i}]" in self.answer for i in range(1, len(self.sources) + 1)):
            # Append source indicators
            citation_markers = ", ".join(
                [f"[{i}]" for i in range(1, len(self.sources) + 1)],
            )
            answer_with_citations = f"{self.answer} {citation_markers}"

        return answer_with_citations

    def format_sources_markdown(self) -> str:
        """Format sources as markdown list with deep links.

        Returns
        -------
            Markdown-formatted sources section

        """
        sources_md = ["## Sources\n"]

        for citation in self.citations:
            sources_md.append(citation.format_citation_markdown())
            sources_md.append("")  # Blank line

        return "\n".join(sources_md)

    def format_full_response_markdown(self) -> str:
        """Format complete response with answer and sources.

        Returns
        -------
            Complete markdown response

        """
        parts = [
            f"**Query:** {self.query}\n",
            "## Answer\n",
            self.format_answer_with_inline_citations(),
            "\n",
            self.format_sources_markdown(),
        ]

        return "\n".join(parts)

    def to_dict(self) -> dict[str, Any]:
        """Convert response to dictionary format.

        Returns
        -------
            Dict representation for API/JSON responses

        """
        return {
            "query": self.query,
            "answer": self.answer,
            "answer_with_citations": self.format_answer_with_inline_citations(),
            "citations": [c.to_dict() for c in self.citations],
            "source_count": len(self.sources),
        }
__init__
__init__(answer: str, sources: list[Document], query: str)

Initialize enhanced response.


answer: Generated answer text
sources: List of source Documents used
query: Original user query
Source code in green_gov_rag/rag/enhanced_response.py
def __init__(self, answer: str, sources: list[Document], query: str):
    """Initialize enhanced response.

    Args:
    ----
        answer: Generated answer text
        sources: List of source Documents used
        query: Original user query

    """
    self.answer = answer
    self.sources = sources
    self.query = query
    self.citations: list[Citation] = []
    self._build_citations()
format_answer_with_inline_citations
format_answer_with_inline_citations() -> str

Format answer with inline citation markers.

Returns
Answer text with inline [1], [2], etc. citations
Source code in green_gov_rag/rag/enhanced_response.py
def format_answer_with_inline_citations(self) -> str:
    """Format answer with inline citation markers.

    Returns
    -------
        Answer text with inline [1], [2], etc. citations

    """
    # In a production system, this would use NLP to identify
    # which parts of the answer come from which sources
    # For now, add all citations at the end

    answer_with_citations = self.answer

    # Add citation markers if not already present
    if not any(f"[{i}]" in self.answer for i in range(1, len(self.sources) + 1)):
        # Append source indicators
        citation_markers = ", ".join(
            [f"[{i}]" for i in range(1, len(self.sources) + 1)],
        )
        answer_with_citations = f"{self.answer} {citation_markers}"

    return answer_with_citations
format_sources_markdown
format_sources_markdown() -> str

Format sources as markdown list with deep links.

Returns
Markdown-formatted sources section
Source code in green_gov_rag/rag/enhanced_response.py
def format_sources_markdown(self) -> str:
    """Format sources as markdown list with deep links.

    Returns
    -------
        Markdown-formatted sources section

    """
    sources_md = ["## Sources\n"]

    for citation in self.citations:
        sources_md.append(citation.format_citation_markdown())
        sources_md.append("")  # Blank line

    return "\n".join(sources_md)
format_full_response_markdown
format_full_response_markdown() -> str

Format complete response with answer and sources.

Returns
Complete markdown response
Source code in green_gov_rag/rag/enhanced_response.py
def format_full_response_markdown(self) -> str:
    """Format complete response with answer and sources.

    Returns
    -------
        Complete markdown response

    """
    parts = [
        f"**Query:** {self.query}\n",
        "## Answer\n",
        self.format_answer_with_inline_citations(),
        "\n",
        self.format_sources_markdown(),
    ]

    return "\n".join(parts)
to_dict
to_dict() -> dict[str, Any]

Convert response to dictionary format.

Returns
Dict representation for API/JSON responses
Source code in green_gov_rag/rag/enhanced_response.py
def to_dict(self) -> dict[str, Any]:
    """Convert response to dictionary format.

    Returns
    -------
        Dict representation for API/JSON responses

    """
    return {
        "query": self.query,
        "answer": self.answer,
        "answer_with_citations": self.format_answer_with_inline_citations(),
        "citations": [c.to_dict() for c in self.citations],
        "source_count": len(self.sources),
    }

ResponseFormatter

Utility class for formatting RAG responses with citations.

Source code in green_gov_rag/rag/enhanced_response.py
class ResponseFormatter:
    """Utility class for formatting RAG responses with citations."""

    @staticmethod
    def create_enhanced_response(
        query: str,
        answer: str,
        sources: list[Document],
    ) -> EnhancedResponse:
        """Create an enhanced response with citations.

        Args:
        ----
            query: User query
            answer: Generated answer
            sources: Source documents

        Returns:
        -------
            EnhancedResponse object

        """
        return EnhancedResponse(answer=answer, sources=sources, query=query)

    @staticmethod
    def format_with_hierarchical_context(
        sources: list[Document],
    ) -> list[dict[str, Any]]:
        """Format sources with hierarchical section context.

        Args:
        ----
            sources: List of source documents

        Returns:
        -------
            List of formatted source dictionaries

        """
        formatted_sources = []

        for i, doc in enumerate(sources, start=1):
            metadata = doc.metadata

            # Extract hierarchical metadata
            section_hierarchy = {
                "section_path": metadata.get("section_path"),
                "section_number": metadata.get("section_number"),
                "section_title": metadata.get("section_title"),
                "parent_section": metadata.get("parent_section"),
                "section_level": metadata.get("section_level"),
            }

            # Build formatted source
            formatted_source = {
                "citation_number": i,
                "title": metadata.get("title", "Untitled"),
                "content_snippet": doc.page_content[:300],
                "page": metadata.get("page"),
                "source_url": metadata.get("source_url"),
                "hierarchy": section_hierarchy,
                "deep_link": ResponseFormatter._build_deep_link(metadata),
                "breadcrumb": ResponseFormatter._build_breadcrumb(metadata),
            }

            formatted_sources.append(formatted_source)

        return formatted_sources

    @staticmethod
    def _build_deep_link(metadata: dict) -> str | None:
        """Build deep link to specific section/page.

        Args:
        ----
            metadata: Document metadata dict

        Returns:
        -------
            Deep link URL or None

        """
        source_url = metadata.get("source_url")
        if not source_url:
            return None

        # Prefer section ID over page number
        section_id = metadata.get("section_id")
        if section_id:
            return f"{source_url}#{section_id}"

        # Fallback to page number
        page = metadata.get("page")
        if page is not None:
            return f"{source_url}#page={page}"

        return source_url

    @staticmethod
    def _build_breadcrumb(metadata: dict) -> str | None:
        """Build hierarchical breadcrumb (e.g., 'Document > Section 2 > Subsection 2.1').

        Args:
        ----
            metadata: Document metadata dict

        Returns:
        -------
            Breadcrumb string or None

        """
        parts = []

        # Document title
        title = metadata.get("title")
        if title:
            parts.append(title)

        # Section hierarchy
        section_path = metadata.get("section_path")
        if section_path:
            parts.append(section_path)
        else:
            # Fallback: build from section_number and section_title
            section_num = metadata.get("section_number")
            section_title = metadata.get("section_title")

            if section_num and section_title:
                parts.append(f"Section {section_num}: {section_title}")
            elif section_num:
                parts.append(f"Section {section_num}")
            elif section_title:
                parts.append(section_title)

        if not parts:
            return None

        return " > ".join(parts)
create_enhanced_response staticmethod
create_enhanced_response(query: str, answer: str, sources: list[Document]) -> EnhancedResponse

Create an enhanced response with citations.


query: User query
answer: Generated answer
sources: Source documents

EnhancedResponse object
Source code in green_gov_rag/rag/enhanced_response.py
@staticmethod
def create_enhanced_response(
    query: str,
    answer: str,
    sources: list[Document],
) -> EnhancedResponse:
    """Create an enhanced response with citations.

    Args:
    ----
        query: User query
        answer: Generated answer
        sources: Source documents

    Returns:
    -------
        EnhancedResponse object

    """
    return EnhancedResponse(answer=answer, sources=sources, query=query)
format_with_hierarchical_context staticmethod
format_with_hierarchical_context(sources: list[Document]) -> list[dict[str, Any]]

Format sources with hierarchical section context.


sources: List of source documents

List of formatted source dictionaries
Source code in green_gov_rag/rag/enhanced_response.py
@staticmethod
def format_with_hierarchical_context(
    sources: list[Document],
) -> list[dict[str, Any]]:
    """Format sources with hierarchical section context.

    Args:
    ----
        sources: List of source documents

    Returns:
    -------
        List of formatted source dictionaries

    """
    formatted_sources = []

    for i, doc in enumerate(sources, start=1):
        metadata = doc.metadata

        # Extract hierarchical metadata
        section_hierarchy = {
            "section_path": metadata.get("section_path"),
            "section_number": metadata.get("section_number"),
            "section_title": metadata.get("section_title"),
            "parent_section": metadata.get("parent_section"),
            "section_level": metadata.get("section_level"),
        }

        # Build formatted source
        formatted_source = {
            "citation_number": i,
            "title": metadata.get("title", "Untitled"),
            "content_snippet": doc.page_content[:300],
            "page": metadata.get("page"),
            "source_url": metadata.get("source_url"),
            "hierarchy": section_hierarchy,
            "deep_link": ResponseFormatter._build_deep_link(metadata),
            "breadcrumb": ResponseFormatter._build_breadcrumb(metadata),
        }

        formatted_sources.append(formatted_source)

    return formatted_sources

Hybrid Geospatial Search for GreenGovRAG.

Combines vector similarity search, spatial filtering, and metadata filtering following the Elasticsearch/Bedrock geospatial RAG pattern.

Key Features: 1. Vector similarity search (semantic search) 2. Spatial filtering by LGA codes, state, or coordinates 3. Metadata filtering (jurisdiction, topic, ESG scope) 4. Hierarchical spatial filtering (federal → state → local) 5. Re-ranking by relevance

SpatialQuery dataclass

Spatial query parameters extracted from user input.

Source code in green_gov_rag/rag/hybrid_search.py
@dataclass
class SpatialQuery:
    """Spatial query parameters extracted from user input."""

    location_name: str  # e.g., "City of Adelaide"
    lga_codes: list[str]  # e.g., ["40070"]
    state: str | None = None  # e.g., "SA"
    coordinates: tuple[float, float] | None = None  # (lat, lon)
    radius_km: float = 5.0  # Radius for coordinate-based search

HybridGeospatialSearch

Combine lexical, spatial, and vector search for geospatial RAG.

Source code in green_gov_rag/rag/hybrid_search.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
class HybridGeospatialSearch:
    """Combine lexical, spatial, and vector search for geospatial RAG."""

    def __init__(
        self,
        vector_store: Union["VectorStore", "VectorStoreInterface"],
        enable_ner: bool = True,
    ):
        """Initialize hybrid search with vector store.

        Args:
        ----
            vector_store: VectorStore instance for similarity search
            enable_ner: Whether to enable NER for automatic location extraction

        """
        self.vector_store = vector_store
        self.ner = LocationNER(use_llm=False) if enable_ner else None

    def search(
        self,
        query: str,
        spatial_query: Optional[SpatialQuery] = None,
        metadata_filters: Optional[dict] = None,
        k: int = 10,
        enable_query_expansion: bool = True,
    ) -> list[Document]:
        """Hybrid search combining vector, spatial, and metadata filtering.

        Args:
        ----
            query: User query string
            spatial_query: Optional SpatialQuery for location-based filtering
            metadata_filters: Optional dict for metadata filtering
            k: Number of initial results to retrieve (before filtering)
            enable_query_expansion: Whether to expand acronyms in query (default: True)

        Returns:
        -------
            List of Document objects ranked by relevance

        """
        # Step 0: Query expansion and jurisdiction detection
        expanded_query = expand_query(query) if enable_query_expansion else query

        # Auto-detect jurisdiction if not provided
        if metadata_filters is None:
            metadata_filters = {}

        if "jurisdiction" not in metadata_filters:
            detected_jurisdiction = detect_jurisdiction_from_query(query)
            if detected_jurisdiction:
                metadata_filters["jurisdiction"] = detected_jurisdiction

        # Step 1: Vector similarity search
        # Retrieve more results initially to account for filtering
        initial_k = k * 3 if (spatial_query or metadata_filters) else k
        results = self.vector_store.similarity_search(expanded_query, k=initial_k)

        # Step 2: Apply spatial filters if provided
        if spatial_query:
            results = self._filter_by_spatial(results, spatial_query)

        # Step 3: Apply metadata filters if provided
        if metadata_filters:
            results = self._filter_by_metadata(results, metadata_filters)

        # Step 4: Apply jurisdiction boosting if jurisdiction filter present
        if metadata_filters and "jurisdiction" in metadata_filters:
            results = self._boost_by_jurisdiction(
                results, metadata_filters["jurisdiction"]
            )

        # Step 5: Re-rank by relevance (already ordered by similarity)
        # Keep top k results
        return results[:k]

    def _filter_by_spatial(
        self,
        results: list[Document],
        spatial_query: SpatialQuery,
    ) -> list[Document]:
        """Filter results by spatial criteria using hierarchical filtering.

        Hierarchical filtering logic:
        1. Federal documents (spatial_scope=federal) → always included
        2. State documents (spatial_scope=state) → included if state matches
        3. Local documents (spatial_scope=local) → included if LGA code matches

        Args:
        ----
            results: List of Document objects from vector search
            spatial_query: SpatialQuery with location criteria

        Returns:
        -------
            Filtered list of Document objects

        """
        filtered = []

        for doc in results:
            metadata = doc.metadata
            spatial_scope = metadata.get("spatial_scope", "")

            # Federal documents always apply
            if spatial_scope == "federal":
                filtered.append(doc)
                continue

            # State documents: check if state matches
            if spatial_scope == "state":
                doc_state = metadata.get("state")
                if spatial_query.state and doc_state == spatial_query.state:
                    filtered.append(doc)
                continue

            # Local documents: check LGA codes
            if spatial_scope == "local":
                doc_lga_codes = metadata.get("lga_codes", [])

                # Check if any of the query LGA codes match document LGA codes
                if any(code in spatial_query.lga_codes for code in doc_lga_codes):
                    filtered.append(doc)
                    continue

                # Also check state match for local documents
                # (local docs in the same state may be relevant)
                doc_state = metadata.get("state")
                if spatial_query.state and doc_state == spatial_query.state:
                    # Add with lower priority (could implement scoring here)
                    filtered.append(doc)

        return filtered

    def _filter_by_metadata(
        self,
        results: list[Document],
        metadata_filters: dict,
    ) -> list[Document]:
        """Filter results by metadata criteria.

        Supports filtering by:
        - jurisdiction (federal, state, local)
        - category (environment, planning, etc.)
        - topic (emissions_reporting, biodiversity, etc.)
        - ESG metadata (emission_scopes, frameworks, etc.)

        Args:
        ----
            results: List of Document objects
            metadata_filters: Dict of metadata key-value pairs to filter on

        Returns:
        -------
            Filtered list of Document objects

        """
        filtered = []

        for doc in results:
            metadata = doc.metadata
            match = True

            for key, expected_value in metadata_filters.items():
                # Handle nested ESG metadata (e.g., esg_metadata.emission_scopes)
                value: object
                if "." in key:
                    keys = key.split(".")
                    value = metadata
                    for k in keys:
                        if isinstance(value, dict):
                            value = value.get(k, {})
                        else:
                            value = None
                        if value is None:
                            break
                else:
                    value = metadata.get(key)

                # Support list of expected values (OR logic)
                if isinstance(expected_value, list):
                    # Check if doc value is in expected list
                    # OR if doc value is a list, check for overlap
                    if isinstance(value, list):
                        if not any(v in expected_value for v in value):
                            match = False
                            break
                    elif value not in expected_value:
                        match = False
                        break
                # Single value comparison
                elif isinstance(value, list):
                    # Doc has list, expected is single value
                    if expected_value not in value:
                        match = False
                        break
                elif value != expected_value:
                    match = False
                    break

            if match:
                filtered.append(doc)

        return filtered

    def _boost_by_jurisdiction(
        self,
        results: list[Document],
        target_jurisdiction: str,
    ) -> list[Document]:
        """Boost documents matching the target jurisdiction.

        Documents matching the target jurisdiction get a 30% boost in ranking.
        This helps prioritize correct jurisdiction sources while keeping
        relevant cross-jurisdiction documents in results.

        Args:
        ----
            results: List of Document objects
            target_jurisdiction: Target jurisdiction ("federal", "state", "local")

        Returns:
        -------
            Re-ranked list of Document objects with matching jurisdiction boosted

        """
        from green_gov_rag.types import JurisdictionLevel

        # Validate jurisdiction
        valid_jurisdictions = [j.value for j in JurisdictionLevel]
        if target_jurisdiction not in valid_jurisdictions:
            # Invalid jurisdiction, return as-is
            return results

        # Separate matching and non-matching documents
        matching = []
        non_matching = []

        for doc in results:
            doc_jurisdiction = doc.metadata.get("jurisdiction")
            if doc_jurisdiction == target_jurisdiction:
                matching.append(doc)
            else:
                non_matching.append(doc)

        # Boost factor: 1.3 = 30% boost
        # In practice, this means we interleave 1 non-matching for every ~3 matching
        # to maintain diversity while prioritizing correct jurisdiction
        boosted_results = []
        match_idx = 0
        non_match_idx = 0

        # Interleave with 3:1 ratio (matching:non-matching)
        while match_idx < len(matching) or non_match_idx < len(non_matching):
            # Add 3 matching documents
            for _ in range(3):
                if match_idx < len(matching):
                    boosted_results.append(matching[match_idx])
                    match_idx += 1
                elif non_match_idx < len(non_matching):
                    # If no more matching, add non-matching
                    boosted_results.append(non_matching[non_match_idx])
                    non_match_idx += 1
                else:
                    break

            # Add 1 non-matching document
            if non_match_idx < len(non_matching):
                boosted_results.append(non_matching[non_match_idx])
                non_match_idx += 1

        return boosted_results

    def search_with_lga(
        self,
        query: str,
        lga_name: str,
        lga_code: str,
        state: str,
        k: int = 10,
    ) -> list[Document]:
        """Convenience method for LGA-based search.

        Args:
        ----
            query: User query string
            lga_name: Name of the LGA (e.g., "City of Adelaide")
            lga_code: ABS LGA code (e.g., "40070")
            state: State code (e.g., "SA")
            k: Number of results to return

        Returns:
        -------
            List of Document objects relevant to the LGA

        """
        spatial_query = SpatialQuery(
            location_name=lga_name,
            lga_codes=[lga_code],
            state=state,
        )

        return self.search(query=query, spatial_query=spatial_query, k=k)

    def search_with_esg_filters(
        self,
        query: str,
        emission_scopes: list[str] | None = None,
        frameworks: list[str] | None = None,
        greenhouse_gases: list[str] | None = None,
        consolidation_method: str | None = None,
        methodology_type: str | None = None,
        scope_3_categories: list[str] | None = None,
        regulator: str | None = None,
        activity_types: list[str] | None = None,
        industry_codes: list[str] | None = None,
        k: int = 10,
    ) -> list[Document]:
        """Convenience method for ESG-filtered search.

        Args:
        ----
            query: User query string
            emission_scopes: List of emission scopes (e.g., ["scope_1", "scope_2"])
            frameworks: List of frameworks (e.g., ["NGER", "ISSB", "GHG_Protocol"])
            greenhouse_gases: List of gases (e.g., ["CO2", "CH4", "N2O", "SF6", "HFCs", "PFCs", "NF3"])
            consolidation_method: Consolidation approach (e.g., "operational_control", "equity_share", "financial_control")
            methodology_type: Methodology type (e.g., "calculation", "reporting", "verification")
            scope_3_categories: List of Scope 3 categories (e.g., ["upstream_transport", "business_travel"])
            regulator: Regulator name (e.g., "Clean Energy Regulator", "NSW EPA")
            activity_types: List of activity types (e.g., ["fuel_combustion", "electricity_consumption"])
            industry_codes: List of ANZSIC industry codes (e.g., ["B0600"])
            k: Number of results to return

        Returns:
        -------
            List of Document objects matching ESG criteria

        """
        metadata_filters: dict[str, object] = {}

        if emission_scopes:
            metadata_filters["esg_metadata.emission_scopes"] = emission_scopes

        if frameworks:
            metadata_filters["esg_metadata.frameworks"] = frameworks

        if greenhouse_gases:
            metadata_filters["esg_metadata.greenhouse_gases"] = greenhouse_gases

        if consolidation_method:
            metadata_filters["esg_metadata.consolidation_method"] = consolidation_method

        if methodology_type:
            metadata_filters["esg_metadata.methodology_type"] = methodology_type

        if scope_3_categories:
            metadata_filters["esg_metadata.scope_3_categories"] = scope_3_categories

        if regulator:
            metadata_filters["esg_metadata.regulator"] = regulator

        if activity_types:
            metadata_filters["esg_metadata.activity_types"] = activity_types

        if industry_codes:
            metadata_filters["esg_metadata.industry_codes"] = industry_codes

        return self.search(query=query, metadata_filters=metadata_filters, k=k)

    def search_with_auto_location(self, query: str, k: int = 10) -> list[Document]:
        """Search with automatic location extraction from query text.

        Uses NER to extract LGA codes and states from the query, then
        performs spatial filtering automatically.

        Args:
        ----
            query: User query text (e.g., "What are tree rules in Adelaide?")
            k: Number of results to return

        Returns:
        -------
            List of Document objects matching query and extracted locations

        Example:
        -------
            >>> search_with_auto_location("emission rules in Port Adelaide Enfield", k=5)
            # Automatically extracts LGA code "40280" and state "SA"

        """
        if not self.ner:
            # NER disabled, fall back to regular search
            return self.search(query=query, k=k)

        # Extract locations from query
        locations = self.ner.extract_locations(query)
        lga_codes = [lga["code"] for lga in locations["lgas"]]
        state_codes = locations["states"]

        # Build spatial query if locations found
        if lga_codes or state_codes:
            spatial_query = SpatialQuery(
                location_name=", ".join(locations["raw_locations"]),
                lga_codes=lga_codes,
                state=state_codes[0] if state_codes else None,
            )
            return self.search(query=query, spatial_query=spatial_query, k=k)

        # No locations found, perform regular search
        return self.search(query=query, k=k)

    def search_by_jurisdiction_and_category(
        self,
        query: str,
        jurisdiction: str | None = None,
        category: str | None = None,
        topic: str | None = None,
        region: str | None = None,
        k: int = 10,
    ) -> list[Document]:
        """Search filtered by jurisdiction, category, and topic.

        Args:
        ----
            query: User query string
            jurisdiction: Jurisdiction level (e.g., "federal", "state", "local")
            category: Document category (e.g., "environment", "planning", "legislation")
            topic: Specific topic (e.g., "emissions_reporting", "biodiversity", "tree_management")
            region: Region name (e.g., "South Australia", "New South Wales")
            k: Number of results to return

        Returns:
        -------
            List of Document objects matching criteria

        """
        metadata_filters: dict[str, object] = {}

        if jurisdiction:
            metadata_filters["jurisdiction"] = jurisdiction

        if category:
            metadata_filters["category"] = category

        if topic:
            metadata_filters["topic"] = topic

        if region:
            metadata_filters["region"] = region

        return self.search(query=query, metadata_filters=metadata_filters, k=k)

    def search_nger_compliant(
        self,
        query: str,
        reportable_under_nger: bool = True,
        nger_threshold_tonnes: int | None = None,
        k: int = 10,
    ) -> list[Document]:
        """Search for NGER-compliant documents.

        Args:
        ----
            query: User query string
            reportable_under_nger: Filter for NGER reportability
            nger_threshold_tonnes: Filter by NGER threshold (e.g., 25000, 100000)
            k: Number of results to return

        Returns:
        -------
            List of NGER-compliant Document objects

        """
        metadata_filters: dict[str, object] = {
            "esg_metadata.reportable_under_nger": reportable_under_nger,
        }

        if nger_threshold_tonnes:
            metadata_filters[
                "esg_metadata.nger_threshold_tonnes"
            ] = nger_threshold_tonnes

        return self.search(query=query, metadata_filters=metadata_filters, k=k)

    def search_scope_3(
        self,
        query: str,
        scope_3_categories: list[str] | None = None,
        frameworks: list[str] | None = None,
        include_issb: bool = True,
        k: int = 10,
    ) -> list[Document]:
        """Search for Scope 3 emissions guidance.

        Args:
        ----
            query: User query string
            scope_3_categories: List of Scope 3 categories to filter by:
                - purchased_goods_services (Cat 1)
                - capital_goods (Cat 2)
                - fuel_energy_activities (Cat 3)
                - upstream_transport (Cat 4)
                - waste_generated (Cat 5)
                - business_travel (Cat 6)
                - employee_commuting (Cat 7)
                - upstream_leased_assets (Cat 8)
                - downstream_transport (Cat 9)
                - processing_sold_products (Cat 10)
                - use_of_sold_products (Cat 11)
                - end_of_life_treatment (Cat 12)
                - downstream_leased_assets (Cat 13)
                - franchises (Cat 14)
                - investments (Cat 15)
            frameworks: ESG frameworks (e.g., ["ISSB", "GHG_Protocol", "GRI"])
            include_issb: Whether to include ISSB standards (default: True)
            k: Number of results to return

        Returns:
        -------
            List of Scope 3 Document objects

        """
        metadata_filters: dict[str, object] = {
            "esg_metadata.emission_scopes": ["scope_3"],
        }

        if scope_3_categories:
            metadata_filters["esg_metadata.scope_3_categories"] = scope_3_categories

        if frameworks:
            metadata_filters["esg_metadata.frameworks"] = frameworks
        elif include_issb:
            # Default to ISSB if no frameworks specified
            metadata_filters["esg_metadata.frameworks"] = ["ISSB"]

        return self.search(query=query, metadata_filters=metadata_filters, k=k)

    def search_scope_3_by_type(
        self,
        query: str,
        scope_type: str = "upstream",
        k: int = 10,
    ) -> list[Document]:
        """Search Scope 3 emissions by upstream or downstream type.

        Args:
        ----
            query: User query string
            scope_type: Either "upstream" (categories 1-8) or "downstream" (categories 9-15)
            k: Number of results to return

        Returns:
        -------
            List of Scope 3 Document objects filtered by type

        """
        if scope_type.lower() == "upstream":
            categories = [
                "purchased_goods_services",
                "capital_goods",
                "fuel_energy_activities",
                "upstream_transport",
                "waste_generated",
                "business_travel",
                "employee_commuting",
                "upstream_leased_assets",
            ]
        elif scope_type.lower() == "downstream":
            categories = [
                "downstream_transport",
                "processing_sold_products",
                "use_of_sold_products",
                "end_of_life_treatment",
                "downstream_leased_assets",
                "franchises",
                "investments",
            ]
        else:
            msg = (
                f"Invalid scope_type: {scope_type}. Must be 'upstream' or 'downstream'"
            )
            raise ValueError(
                msg,
            )

        return self.search_scope_3(query=query, scope_3_categories=categories, k=k)

    def advanced_search(
        self,
        query: str,
        # Spatial filters
        lga_codes: list[str] | None = None,
        state: str | None = None,
        # Basic metadata
        jurisdiction: str | None = None,
        category: str | None = None,
        topic: str | None = None,
        # ESG filters
        emission_scopes: list[str] | None = None,
        frameworks: list[str] | None = None,
        greenhouse_gases: list[str] | None = None,
        regulator: str | None = None,
        # Industry filters
        industry_codes: list[str] | None = None,
        facility_types: list[str] | None = None,
        k: int = 10,
    ) -> list[Document]:
        """Advanced search with multiple filter types.

        Combines spatial, metadata, and ESG filters for precise retrieval.

        Args:
        ----
            query: User query string
            lga_codes: List of LGA codes for spatial filtering
            state: State code for spatial filtering
            jurisdiction: Jurisdiction level (federal/state/local)
            category: Document category
            topic: Specific topic
            emission_scopes: List of emission scopes
            frameworks: List of ESG frameworks
            greenhouse_gases: List of greenhouse gases
            regulator: Regulator name
            industry_codes: List of ANZSIC codes
            facility_types: List of facility types
            k: Number of results to return

        Returns:
        -------
            List of filtered and ranked Document objects

        """
        # Build spatial query
        spatial_query = None
        if lga_codes or state:
            spatial_query = SpatialQuery(
                location_name="",
                lga_codes=lga_codes or [],
                state=state,
            )

        # Build metadata filters
        metadata_filters: dict[str, object] = {}

        if jurisdiction:
            metadata_filters["jurisdiction"] = jurisdiction

        if category:
            metadata_filters["category"] = category

        if topic:
            metadata_filters["topic"] = topic

        if emission_scopes:
            metadata_filters["esg_metadata.emission_scopes"] = emission_scopes

        if frameworks:
            metadata_filters["esg_metadata.frameworks"] = frameworks

        if greenhouse_gases:
            metadata_filters["esg_metadata.greenhouse_gases"] = greenhouse_gases

        if regulator:
            metadata_filters["esg_metadata.regulator"] = regulator

        if industry_codes:
            metadata_filters["esg_metadata.industry_codes"] = industry_codes

        if facility_types:
            metadata_filters["esg_metadata.facility_types"] = facility_types

        return self.search(
            query=query,
            spatial_query=spatial_query,
            metadata_filters=metadata_filters or None,
            k=k,
        )
__init__
__init__(vector_store: Union['VectorStore', 'VectorStoreInterface'], enable_ner: bool = True)

Initialize hybrid search with vector store.


vector_store: VectorStore instance for similarity search
enable_ner: Whether to enable NER for automatic location extraction
Source code in green_gov_rag/rag/hybrid_search.py
def __init__(
    self,
    vector_store: Union["VectorStore", "VectorStoreInterface"],
    enable_ner: bool = True,
):
    """Initialize hybrid search with vector store.

    Args:
    ----
        vector_store: VectorStore instance for similarity search
        enable_ner: Whether to enable NER for automatic location extraction

    """
    self.vector_store = vector_store
    self.ner = LocationNER(use_llm=False) if enable_ner else None
search
search(query: str, spatial_query: Optional[SpatialQuery] = None, metadata_filters: Optional[dict] = None, k: int = 10, enable_query_expansion: bool = True) -> list[Document]

Hybrid search combining vector, spatial, and metadata filtering.


query: User query string
spatial_query: Optional SpatialQuery for location-based filtering
metadata_filters: Optional dict for metadata filtering
k: Number of initial results to retrieve (before filtering)
enable_query_expansion: Whether to expand acronyms in query (default: True)

List of Document objects ranked by relevance
Source code in green_gov_rag/rag/hybrid_search.py
def search(
    self,
    query: str,
    spatial_query: Optional[SpatialQuery] = None,
    metadata_filters: Optional[dict] = None,
    k: int = 10,
    enable_query_expansion: bool = True,
) -> list[Document]:
    """Hybrid search combining vector, spatial, and metadata filtering.

    Args:
    ----
        query: User query string
        spatial_query: Optional SpatialQuery for location-based filtering
        metadata_filters: Optional dict for metadata filtering
        k: Number of initial results to retrieve (before filtering)
        enable_query_expansion: Whether to expand acronyms in query (default: True)

    Returns:
    -------
        List of Document objects ranked by relevance

    """
    # Step 0: Query expansion and jurisdiction detection
    expanded_query = expand_query(query) if enable_query_expansion else query

    # Auto-detect jurisdiction if not provided
    if metadata_filters is None:
        metadata_filters = {}

    if "jurisdiction" not in metadata_filters:
        detected_jurisdiction = detect_jurisdiction_from_query(query)
        if detected_jurisdiction:
            metadata_filters["jurisdiction"] = detected_jurisdiction

    # Step 1: Vector similarity search
    # Retrieve more results initially to account for filtering
    initial_k = k * 3 if (spatial_query or metadata_filters) else k
    results = self.vector_store.similarity_search(expanded_query, k=initial_k)

    # Step 2: Apply spatial filters if provided
    if spatial_query:
        results = self._filter_by_spatial(results, spatial_query)

    # Step 3: Apply metadata filters if provided
    if metadata_filters:
        results = self._filter_by_metadata(results, metadata_filters)

    # Step 4: Apply jurisdiction boosting if jurisdiction filter present
    if metadata_filters and "jurisdiction" in metadata_filters:
        results = self._boost_by_jurisdiction(
            results, metadata_filters["jurisdiction"]
        )

    # Step 5: Re-rank by relevance (already ordered by similarity)
    # Keep top k results
    return results[:k]
search_with_lga
search_with_lga(query: str, lga_name: str, lga_code: str, state: str, k: int = 10) -> list[Document]

Convenience method for LGA-based search.


query: User query string
lga_name: Name of the LGA (e.g., "City of Adelaide")
lga_code: ABS LGA code (e.g., "40070")
state: State code (e.g., "SA")
k: Number of results to return

List of Document objects relevant to the LGA
Source code in green_gov_rag/rag/hybrid_search.py
def search_with_lga(
    self,
    query: str,
    lga_name: str,
    lga_code: str,
    state: str,
    k: int = 10,
) -> list[Document]:
    """Convenience method for LGA-based search.

    Args:
    ----
        query: User query string
        lga_name: Name of the LGA (e.g., "City of Adelaide")
        lga_code: ABS LGA code (e.g., "40070")
        state: State code (e.g., "SA")
        k: Number of results to return

    Returns:
    -------
        List of Document objects relevant to the LGA

    """
    spatial_query = SpatialQuery(
        location_name=lga_name,
        lga_codes=[lga_code],
        state=state,
    )

    return self.search(query=query, spatial_query=spatial_query, k=k)
search_with_esg_filters
search_with_esg_filters(query: str, emission_scopes: list[str] | None = None, frameworks: list[str] | None = None, greenhouse_gases: list[str] | None = None, consolidation_method: str | None = None, methodology_type: str | None = None, scope_3_categories: list[str] | None = None, regulator: str | None = None, activity_types: list[str] | None = None, industry_codes: list[str] | None = None, k: int = 10) -> list[Document]

Convenience method for ESG-filtered search.


query: User query string
emission_scopes: List of emission scopes (e.g., ["scope_1", "scope_2"])
frameworks: List of frameworks (e.g., ["NGER", "ISSB", "GHG_Protocol"])
greenhouse_gases: List of gases (e.g., ["CO2", "CH4", "N2O", "SF6", "HFCs", "PFCs", "NF3"])
consolidation_method: Consolidation approach (e.g., "operational_control", "equity_share", "financial_control")
methodology_type: Methodology type (e.g., "calculation", "reporting", "verification")
scope_3_categories: List of Scope 3 categories (e.g., ["upstream_transport", "business_travel"])
regulator: Regulator name (e.g., "Clean Energy Regulator", "NSW EPA")
activity_types: List of activity types (e.g., ["fuel_combustion", "electricity_consumption"])
industry_codes: List of ANZSIC industry codes (e.g., ["B0600"])
k: Number of results to return

List of Document objects matching ESG criteria
Source code in green_gov_rag/rag/hybrid_search.py
def search_with_esg_filters(
    self,
    query: str,
    emission_scopes: list[str] | None = None,
    frameworks: list[str] | None = None,
    greenhouse_gases: list[str] | None = None,
    consolidation_method: str | None = None,
    methodology_type: str | None = None,
    scope_3_categories: list[str] | None = None,
    regulator: str | None = None,
    activity_types: list[str] | None = None,
    industry_codes: list[str] | None = None,
    k: int = 10,
) -> list[Document]:
    """Convenience method for ESG-filtered search.

    Args:
    ----
        query: User query string
        emission_scopes: List of emission scopes (e.g., ["scope_1", "scope_2"])
        frameworks: List of frameworks (e.g., ["NGER", "ISSB", "GHG_Protocol"])
        greenhouse_gases: List of gases (e.g., ["CO2", "CH4", "N2O", "SF6", "HFCs", "PFCs", "NF3"])
        consolidation_method: Consolidation approach (e.g., "operational_control", "equity_share", "financial_control")
        methodology_type: Methodology type (e.g., "calculation", "reporting", "verification")
        scope_3_categories: List of Scope 3 categories (e.g., ["upstream_transport", "business_travel"])
        regulator: Regulator name (e.g., "Clean Energy Regulator", "NSW EPA")
        activity_types: List of activity types (e.g., ["fuel_combustion", "electricity_consumption"])
        industry_codes: List of ANZSIC industry codes (e.g., ["B0600"])
        k: Number of results to return

    Returns:
    -------
        List of Document objects matching ESG criteria

    """
    metadata_filters: dict[str, object] = {}

    if emission_scopes:
        metadata_filters["esg_metadata.emission_scopes"] = emission_scopes

    if frameworks:
        metadata_filters["esg_metadata.frameworks"] = frameworks

    if greenhouse_gases:
        metadata_filters["esg_metadata.greenhouse_gases"] = greenhouse_gases

    if consolidation_method:
        metadata_filters["esg_metadata.consolidation_method"] = consolidation_method

    if methodology_type:
        metadata_filters["esg_metadata.methodology_type"] = methodology_type

    if scope_3_categories:
        metadata_filters["esg_metadata.scope_3_categories"] = scope_3_categories

    if regulator:
        metadata_filters["esg_metadata.regulator"] = regulator

    if activity_types:
        metadata_filters["esg_metadata.activity_types"] = activity_types

    if industry_codes:
        metadata_filters["esg_metadata.industry_codes"] = industry_codes

    return self.search(query=query, metadata_filters=metadata_filters, k=k)
search_with_auto_location
search_with_auto_location(query: str, k: int = 10) -> list[Document]

Search with automatic location extraction from query text.

Uses NER to extract LGA codes and states from the query, then performs spatial filtering automatically.


query: User query text (e.g., "What are tree rules in Adelaide?")
k: Number of results to return

List of Document objects matching query and extracted locations
Example:
>>> search_with_auto_location("emission rules in Port Adelaide Enfield", k=5)
# Automatically extracts LGA code "40280" and state "SA"
Source code in green_gov_rag/rag/hybrid_search.py
def search_with_auto_location(self, query: str, k: int = 10) -> list[Document]:
    """Search with automatic location extraction from query text.

    Uses NER to extract LGA codes and states from the query, then
    performs spatial filtering automatically.

    Args:
    ----
        query: User query text (e.g., "What are tree rules in Adelaide?")
        k: Number of results to return

    Returns:
    -------
        List of Document objects matching query and extracted locations

    Example:
    -------
        >>> search_with_auto_location("emission rules in Port Adelaide Enfield", k=5)
        # Automatically extracts LGA code "40280" and state "SA"

    """
    if not self.ner:
        # NER disabled, fall back to regular search
        return self.search(query=query, k=k)

    # Extract locations from query
    locations = self.ner.extract_locations(query)
    lga_codes = [lga["code"] for lga in locations["lgas"]]
    state_codes = locations["states"]

    # Build spatial query if locations found
    if lga_codes or state_codes:
        spatial_query = SpatialQuery(
            location_name=", ".join(locations["raw_locations"]),
            lga_codes=lga_codes,
            state=state_codes[0] if state_codes else None,
        )
        return self.search(query=query, spatial_query=spatial_query, k=k)

    # No locations found, perform regular search
    return self.search(query=query, k=k)
search_by_jurisdiction_and_category
search_by_jurisdiction_and_category(query: str, jurisdiction: str | None = None, category: str | None = None, topic: str | None = None, region: str | None = None, k: int = 10) -> list[Document]

Search filtered by jurisdiction, category, and topic.


query: User query string
jurisdiction: Jurisdiction level (e.g., "federal", "state", "local")
category: Document category (e.g., "environment", "planning", "legislation")
topic: Specific topic (e.g., "emissions_reporting", "biodiversity", "tree_management")
region: Region name (e.g., "South Australia", "New South Wales")
k: Number of results to return

List of Document objects matching criteria
Source code in green_gov_rag/rag/hybrid_search.py
def search_by_jurisdiction_and_category(
    self,
    query: str,
    jurisdiction: str | None = None,
    category: str | None = None,
    topic: str | None = None,
    region: str | None = None,
    k: int = 10,
) -> list[Document]:
    """Search filtered by jurisdiction, category, and topic.

    Args:
    ----
        query: User query string
        jurisdiction: Jurisdiction level (e.g., "federal", "state", "local")
        category: Document category (e.g., "environment", "planning", "legislation")
        topic: Specific topic (e.g., "emissions_reporting", "biodiversity", "tree_management")
        region: Region name (e.g., "South Australia", "New South Wales")
        k: Number of results to return

    Returns:
    -------
        List of Document objects matching criteria

    """
    metadata_filters: dict[str, object] = {}

    if jurisdiction:
        metadata_filters["jurisdiction"] = jurisdiction

    if category:
        metadata_filters["category"] = category

    if topic:
        metadata_filters["topic"] = topic

    if region:
        metadata_filters["region"] = region

    return self.search(query=query, metadata_filters=metadata_filters, k=k)
search_nger_compliant
search_nger_compliant(query: str, reportable_under_nger: bool = True, nger_threshold_tonnes: int | None = None, k: int = 10) -> list[Document]

Search for NGER-compliant documents.


query: User query string
reportable_under_nger: Filter for NGER reportability
nger_threshold_tonnes: Filter by NGER threshold (e.g., 25000, 100000)
k: Number of results to return

List of NGER-compliant Document objects
Source code in green_gov_rag/rag/hybrid_search.py
def search_nger_compliant(
    self,
    query: str,
    reportable_under_nger: bool = True,
    nger_threshold_tonnes: int | None = None,
    k: int = 10,
) -> list[Document]:
    """Search for NGER-compliant documents.

    Args:
    ----
        query: User query string
        reportable_under_nger: Filter for NGER reportability
        nger_threshold_tonnes: Filter by NGER threshold (e.g., 25000, 100000)
        k: Number of results to return

    Returns:
    -------
        List of NGER-compliant Document objects

    """
    metadata_filters: dict[str, object] = {
        "esg_metadata.reportable_under_nger": reportable_under_nger,
    }

    if nger_threshold_tonnes:
        metadata_filters[
            "esg_metadata.nger_threshold_tonnes"
        ] = nger_threshold_tonnes

    return self.search(query=query, metadata_filters=metadata_filters, k=k)
search_scope_3
search_scope_3(query: str, scope_3_categories: list[str] | None = None, frameworks: list[str] | None = None, include_issb: bool = True, k: int = 10) -> list[Document]

Search for Scope 3 emissions guidance.


query: User query string
scope_3_categories: List of Scope 3 categories to filter by:
    - purchased_goods_services (Cat 1)
    - capital_goods (Cat 2)
    - fuel_energy_activities (Cat 3)
    - upstream_transport (Cat 4)
    - waste_generated (Cat 5)
    - business_travel (Cat 6)
    - employee_commuting (Cat 7)
    - upstream_leased_assets (Cat 8)
    - downstream_transport (Cat 9)
    - processing_sold_products (Cat 10)
    - use_of_sold_products (Cat 11)
    - end_of_life_treatment (Cat 12)
    - downstream_leased_assets (Cat 13)
    - franchises (Cat 14)
    - investments (Cat 15)
frameworks: ESG frameworks (e.g., ["ISSB", "GHG_Protocol", "GRI"])
include_issb: Whether to include ISSB standards (default: True)
k: Number of results to return

List of Scope 3 Document objects
Source code in green_gov_rag/rag/hybrid_search.py
def search_scope_3(
    self,
    query: str,
    scope_3_categories: list[str] | None = None,
    frameworks: list[str] | None = None,
    include_issb: bool = True,
    k: int = 10,
) -> list[Document]:
    """Search for Scope 3 emissions guidance.

    Args:
    ----
        query: User query string
        scope_3_categories: List of Scope 3 categories to filter by:
            - purchased_goods_services (Cat 1)
            - capital_goods (Cat 2)
            - fuel_energy_activities (Cat 3)
            - upstream_transport (Cat 4)
            - waste_generated (Cat 5)
            - business_travel (Cat 6)
            - employee_commuting (Cat 7)
            - upstream_leased_assets (Cat 8)
            - downstream_transport (Cat 9)
            - processing_sold_products (Cat 10)
            - use_of_sold_products (Cat 11)
            - end_of_life_treatment (Cat 12)
            - downstream_leased_assets (Cat 13)
            - franchises (Cat 14)
            - investments (Cat 15)
        frameworks: ESG frameworks (e.g., ["ISSB", "GHG_Protocol", "GRI"])
        include_issb: Whether to include ISSB standards (default: True)
        k: Number of results to return

    Returns:
    -------
        List of Scope 3 Document objects

    """
    metadata_filters: dict[str, object] = {
        "esg_metadata.emission_scopes": ["scope_3"],
    }

    if scope_3_categories:
        metadata_filters["esg_metadata.scope_3_categories"] = scope_3_categories

    if frameworks:
        metadata_filters["esg_metadata.frameworks"] = frameworks
    elif include_issb:
        # Default to ISSB if no frameworks specified
        metadata_filters["esg_metadata.frameworks"] = ["ISSB"]

    return self.search(query=query, metadata_filters=metadata_filters, k=k)
search_scope_3_by_type
search_scope_3_by_type(query: str, scope_type: str = 'upstream', k: int = 10) -> list[Document]

Search Scope 3 emissions by upstream or downstream type.


query: User query string
scope_type: Either "upstream" (categories 1-8) or "downstream" (categories 9-15)
k: Number of results to return

List of Scope 3 Document objects filtered by type
Source code in green_gov_rag/rag/hybrid_search.py
def search_scope_3_by_type(
    self,
    query: str,
    scope_type: str = "upstream",
    k: int = 10,
) -> list[Document]:
    """Search Scope 3 emissions by upstream or downstream type.

    Args:
    ----
        query: User query string
        scope_type: Either "upstream" (categories 1-8) or "downstream" (categories 9-15)
        k: Number of results to return

    Returns:
    -------
        List of Scope 3 Document objects filtered by type

    """
    if scope_type.lower() == "upstream":
        categories = [
            "purchased_goods_services",
            "capital_goods",
            "fuel_energy_activities",
            "upstream_transport",
            "waste_generated",
            "business_travel",
            "employee_commuting",
            "upstream_leased_assets",
        ]
    elif scope_type.lower() == "downstream":
        categories = [
            "downstream_transport",
            "processing_sold_products",
            "use_of_sold_products",
            "end_of_life_treatment",
            "downstream_leased_assets",
            "franchises",
            "investments",
        ]
    else:
        msg = (
            f"Invalid scope_type: {scope_type}. Must be 'upstream' or 'downstream'"
        )
        raise ValueError(
            msg,
        )

    return self.search_scope_3(query=query, scope_3_categories=categories, k=k)
advanced_search(query: str, lga_codes: list[str] | None = None, state: str | None = None, jurisdiction: str | None = None, category: str | None = None, topic: str | None = None, emission_scopes: list[str] | None = None, frameworks: list[str] | None = None, greenhouse_gases: list[str] | None = None, regulator: str | None = None, industry_codes: list[str] | None = None, facility_types: list[str] | None = None, k: int = 10) -> list[Document]

Advanced search with multiple filter types.

Combines spatial, metadata, and ESG filters for precise retrieval.


query: User query string
lga_codes: List of LGA codes for spatial filtering
state: State code for spatial filtering
jurisdiction: Jurisdiction level (federal/state/local)
category: Document category
topic: Specific topic
emission_scopes: List of emission scopes
frameworks: List of ESG frameworks
greenhouse_gases: List of greenhouse gases
regulator: Regulator name
industry_codes: List of ANZSIC codes
facility_types: List of facility types
k: Number of results to return

List of filtered and ranked Document objects
Source code in green_gov_rag/rag/hybrid_search.py
def advanced_search(
    self,
    query: str,
    # Spatial filters
    lga_codes: list[str] | None = None,
    state: str | None = None,
    # Basic metadata
    jurisdiction: str | None = None,
    category: str | None = None,
    topic: str | None = None,
    # ESG filters
    emission_scopes: list[str] | None = None,
    frameworks: list[str] | None = None,
    greenhouse_gases: list[str] | None = None,
    regulator: str | None = None,
    # Industry filters
    industry_codes: list[str] | None = None,
    facility_types: list[str] | None = None,
    k: int = 10,
) -> list[Document]:
    """Advanced search with multiple filter types.

    Combines spatial, metadata, and ESG filters for precise retrieval.

    Args:
    ----
        query: User query string
        lga_codes: List of LGA codes for spatial filtering
        state: State code for spatial filtering
        jurisdiction: Jurisdiction level (federal/state/local)
        category: Document category
        topic: Specific topic
        emission_scopes: List of emission scopes
        frameworks: List of ESG frameworks
        greenhouse_gases: List of greenhouse gases
        regulator: Regulator name
        industry_codes: List of ANZSIC codes
        facility_types: List of facility types
        k: Number of results to return

    Returns:
    -------
        List of filtered and ranked Document objects

    """
    # Build spatial query
    spatial_query = None
    if lga_codes or state:
        spatial_query = SpatialQuery(
            location_name="",
            lga_codes=lga_codes or [],
            state=state,
        )

    # Build metadata filters
    metadata_filters: dict[str, object] = {}

    if jurisdiction:
        metadata_filters["jurisdiction"] = jurisdiction

    if category:
        metadata_filters["category"] = category

    if topic:
        metadata_filters["topic"] = topic

    if emission_scopes:
        metadata_filters["esg_metadata.emission_scopes"] = emission_scopes

    if frameworks:
        metadata_filters["esg_metadata.frameworks"] = frameworks

    if greenhouse_gases:
        metadata_filters["esg_metadata.greenhouse_gases"] = greenhouse_gases

    if regulator:
        metadata_filters["esg_metadata.regulator"] = regulator

    if industry_codes:
        metadata_filters["esg_metadata.industry_codes"] = industry_codes

    if facility_types:
        metadata_filters["esg_metadata.facility_types"] = facility_types

    return self.search(
        query=query,
        spatial_query=spatial_query,
        metadata_filters=metadata_filters or None,
        k=k,
    )

Location NER

green_gov_rag.rag.location_ner

Named Entity Recognition for Location Extraction.

Extracts Australian locations (LGAs, states, cities) from text queries and maps them to standardized codes for geospatial filtering.

Uses both rule-based matching and LLM-based extraction for robustness.

LocationNER

Extract and normalize Australian locations from text.

Source code in green_gov_rag/rag/location_ner.py
class LocationNER:
    """Extract and normalize Australian locations from text."""

    def __init__(self, use_llm: bool = True, llm_model: str = "gpt-3.5-turbo"):
        """Initialize location NER.

        Args:
        ----
            use_llm: Whether to use LLM for extraction (more accurate)
            llm_model: OpenAI model to use for LLM-based extraction

        """
        self.use_llm = use_llm
        self.llm: Any = None
        if use_llm:
            from green_gov_rag.rag.llm_factory import get_llm

            self.llm = get_llm(model=llm_model, temperature=0.0)

        # Load mappings from types module
        self._state_mappings = get_state_mapping()
        self._lga_mappings = get_lga_mappings()

    def extract_locations(self, text: str) -> dict[str, Any]:
        """Extract locations from text using both rule-based and LLM methods.

        Args:
        ----
            text: Query text to extract locations from

        Returns:
        -------
            Dict with extracted locations:
            {
                "states": ["SA", "NSW"],
                "lgas": [{"name": "Adelaide", "code": "40070", "state": "SA"}],
                "raw_locations": ["Adelaide", "South Australia"]
            }

        """
        # Rule-based extraction
        rule_based = self._extract_rule_based(text)

        # LLM-based extraction (if enabled)
        if self.use_llm:
            llm_based = self._extract_llm_based(text)
            # Merge results
            return self._merge_results(rule_based, llm_based)

        return rule_based

    def _extract_rule_based(self, text: str) -> dict[str, Any]:
        """Extract locations using rule-based pattern matching.

        Args:
        ----
            text: Query text

        Returns:
        -------
            Dict with extracted locations

        """
        text_lower = text.lower()
        results: dict[str, Any] = {
            "states": [],
            "lgas": [],
            "raw_locations": [],
        }

        # Extract states
        for state_name, state_enum in self._state_mappings.items():
            # Use word boundaries to avoid partial matches
            pattern = r"\b" + re.escape(state_name) + r"\b"
            if re.search(pattern, text_lower):
                state_code = state_enum.value
                if state_code not in results["states"]:
                    results["states"].append(state_code)
                    results["raw_locations"].append(state_name)

        # Extract LGAs
        for lga_name, lga_info in self._lga_mappings.items():
            pattern = r"\b" + re.escape(lga_name) + r"\b"
            if re.search(pattern, text_lower):
                # Convert LGAInfo to dict format for backward compatibility
                lga_dict = {
                    "name": lga_info.name,
                    "code": lga_info.code,
                    "state": lga_info.state.value,
                }
                if lga_dict not in results["lgas"]:
                    results["lgas"].append(lga_dict)
                    results["raw_locations"].append(lga_name)

        return results

    def _extract_llm_based(self, text: str) -> dict[str, Any]:
        """Extract locations using LLM.

        Args:
        ----
            text: Query text

        Returns:
        -------
            Dict with extracted locations

        """
        prompt = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    "You are a location extraction expert for Australian queries.",
                ),
                (
                    "human",
                    """Extract Australian locations from this text.

Text: {text}

Return a JSON object with:
- "states": list of Australian state/territory codes (NSW, VIC, QLD, SA, WA, TAS, NT, ACT)
- "lgas": list of Local Government Area names
- "cities": list of city/suburb names

If no locations found, return empty lists.

Example:
Text: "What are the tree rules in Adelaide, South Australia?"
Output: {{"states": ["SA"], "lgas": ["Adelaide"], "cities": ["Adelaide"]}}

Only return the JSON object, nothing else.""",
                ),
            ],
        )

        chain: Any = prompt | self.llm
        response = chain.invoke({"text": text})

        # Parse LLM response
        try:
            import json

            result = json.loads(response.content)

            # Map LGAs to our standard format
            lgas = []
            for lga_name in result.get("lgas", []):
                lga_lower = lga_name.lower()
                if lga_lower in self._lga_mappings:
                    lga_info = self._lga_mappings[lga_lower]
                    lga_dict = {
                        "name": lga_info.name,
                        "code": lga_info.code,
                        "state": lga_info.state.value,
                    }
                    lgas.append(lga_dict)

            return {
                "states": result.get("states", []),
                "lgas": lgas,
                "raw_locations": result.get("lgas", []) + result.get("cities", []),
            }
        except json.JSONDecodeError:
            return {"states": [], "lgas": [], "raw_locations": []}

    def _merge_results(
        self,
        rule_based: dict[str, Any],
        llm_based: dict[str, Any],
    ) -> dict[str, Any]:
        """Merge rule-based and LLM results.

        Args:
        ----
            rule_based: Results from rule-based extraction
            llm_based: Results from LLM extraction

        Returns:
        -------
            Merged results

        """
        merged: dict[str, Any] = {
            "states": list(set(rule_based["states"] + llm_based["states"])),
            "lgas": rule_based["lgas"]
            + [lga for lga in llm_based["lgas"] if lga not in rule_based["lgas"]],
            "raw_locations": list(
                set(rule_based["raw_locations"] + llm_based["raw_locations"]),
            ),
        }

        return merged

    def extract_lga_codes(self, text: str) -> list[str]:
        """Extract LGA codes from text (convenience method).

        Args:
        ----
            text: Query text

        Returns:
        -------
            List of LGA codes

        """
        locations = self.extract_locations(text)
        return [lga["code"] for lga in locations["lgas"]]

    def extract_state_codes(self, text: str) -> list[str]:
        """Extract state codes from text (convenience method).

        Args:
        ----
            text: Query text

        Returns:
        -------
            List of state codes

        """
        locations = self.extract_locations(text)
        return locations["states"]

    def add_lga_mapping(
        self,
        name: str,
        lga_code: str,
        state: str,
        official_name: str | None = None,
    ) -> None:
        """Add a new LGA mapping.

        Args:
        ----
            name: Common name (e.g., "adelaide")
            lga_code: ABS LGA code
            state: State code (e.g., "NSW", "VIC")
            official_name: Official LGA name (defaults to capitalized name)

        """
        from green_gov_rag.types import AustralianState, LGAInfo

        # Convert state string to AustralianState enum
        state_enum = AustralianState(state)

        # Create LGAInfo and add to mappings
        lga_info = LGAInfo(
            name=official_name or name.title(),
            code=lga_code,
            state=state_enum,
        )
        self._lga_mappings[name.lower()] = lga_info
__init__
__init__(use_llm: bool = True, llm_model: str = 'gpt-3.5-turbo')

Initialize location NER.


use_llm: Whether to use LLM for extraction (more accurate)
llm_model: OpenAI model to use for LLM-based extraction
Source code in green_gov_rag/rag/location_ner.py
def __init__(self, use_llm: bool = True, llm_model: str = "gpt-3.5-turbo"):
    """Initialize location NER.

    Args:
    ----
        use_llm: Whether to use LLM for extraction (more accurate)
        llm_model: OpenAI model to use for LLM-based extraction

    """
    self.use_llm = use_llm
    self.llm: Any = None
    if use_llm:
        from green_gov_rag.rag.llm_factory import get_llm

        self.llm = get_llm(model=llm_model, temperature=0.0)

    # Load mappings from types module
    self._state_mappings = get_state_mapping()
    self._lga_mappings = get_lga_mappings()
extract_locations
extract_locations(text: str) -> dict[str, Any]

Extract locations from text using both rule-based and LLM methods.


text: Query text to extract locations from

Dict with extracted locations:
{
    "states": ["SA", "NSW"],
    "lgas": [{"name": "Adelaide", "code": "40070", "state": "SA"}],
    "raw_locations": ["Adelaide", "South Australia"]
}
Source code in green_gov_rag/rag/location_ner.py
def extract_locations(self, text: str) -> dict[str, Any]:
    """Extract locations from text using both rule-based and LLM methods.

    Args:
    ----
        text: Query text to extract locations from

    Returns:
    -------
        Dict with extracted locations:
        {
            "states": ["SA", "NSW"],
            "lgas": [{"name": "Adelaide", "code": "40070", "state": "SA"}],
            "raw_locations": ["Adelaide", "South Australia"]
        }

    """
    # Rule-based extraction
    rule_based = self._extract_rule_based(text)

    # LLM-based extraction (if enabled)
    if self.use_llm:
        llm_based = self._extract_llm_based(text)
        # Merge results
        return self._merge_results(rule_based, llm_based)

    return rule_based
extract_lga_codes
extract_lga_codes(text: str) -> list[str]

Extract LGA codes from text (convenience method).


text: Query text

List of LGA codes
Source code in green_gov_rag/rag/location_ner.py
def extract_lga_codes(self, text: str) -> list[str]:
    """Extract LGA codes from text (convenience method).

    Args:
    ----
        text: Query text

    Returns:
    -------
        List of LGA codes

    """
    locations = self.extract_locations(text)
    return [lga["code"] for lga in locations["lgas"]]
extract_state_codes
extract_state_codes(text: str) -> list[str]

Extract state codes from text (convenience method).


text: Query text

List of state codes
Source code in green_gov_rag/rag/location_ner.py
def extract_state_codes(self, text: str) -> list[str]:
    """Extract state codes from text (convenience method).

    Args:
    ----
        text: Query text

    Returns:
    -------
        List of state codes

    """
    locations = self.extract_locations(text)
    return locations["states"]
add_lga_mapping
add_lga_mapping(name: str, lga_code: str, state: str, official_name: str | None = None) -> None

Add a new LGA mapping.


name: Common name (e.g., "adelaide")
lga_code: ABS LGA code
state: State code (e.g., "NSW", "VIC")
official_name: Official LGA name (defaults to capitalized name)
Source code in green_gov_rag/rag/location_ner.py
def add_lga_mapping(
    self,
    name: str,
    lga_code: str,
    state: str,
    official_name: str | None = None,
) -> None:
    """Add a new LGA mapping.

    Args:
    ----
        name: Common name (e.g., "adelaide")
        lga_code: ABS LGA code
        state: State code (e.g., "NSW", "VIC")
        official_name: Official LGA name (defaults to capitalized name)

    """
    from green_gov_rag.types import AustralianState, LGAInfo

    # Convert state string to AustralianState enum
    state_enum = AustralianState(state)

    # Create LGAInfo and add to mappings
    lga_info = LGAInfo(
        name=official_name or name.title(),
        code=lga_code,
        state=state_enum,
    )
    self._lga_mappings[name.lower()] = lga_info

QueryLocationProcessor

Process queries to extract and enrich with location information.

Source code in green_gov_rag/rag/location_ner.py
class QueryLocationProcessor:
    """Process queries to extract and enrich with location information."""

    def __init__(self, ner: LocationNER | None = None):
        """Initialize processor.

        Args:
        ----
            ner: LocationNER instance (creates one if not provided)

        """
        self.ner = ner or LocationNER(use_llm=True)

    def process_query(self, query: str) -> dict[str, Any]:
        """Process query and extract location metadata.

        Args:
        ----
            query: User query text

        Returns:
        -------
            Dict with query and location metadata

        """
        locations = self.ner.extract_locations(query)

        return {
            "original_query": query,
            "locations": locations,
            "has_location": bool(locations["states"] or locations["lgas"]),
            "lga_codes": [lga["code"] for lga in locations["lgas"]],
            "state_codes": locations["states"],
        }
__init__
__init__(ner: LocationNER | None = None)

Initialize processor.


ner: LocationNER instance (creates one if not provided)
Source code in green_gov_rag/rag/location_ner.py
def __init__(self, ner: LocationNER | None = None):
    """Initialize processor.

    Args:
    ----
        ner: LocationNER instance (creates one if not provided)

    """
    self.ner = ner or LocationNER(use_llm=True)
process_query
process_query(query: str) -> dict[str, Any]

Process query and extract location metadata.


query: User query text

Dict with query and location metadata
Source code in green_gov_rag/rag/location_ner.py
def process_query(self, query: str) -> dict[str, Any]:
    """Process query and extract location metadata.

    Args:
    ----
        query: User query text

    Returns:
    -------
        Dict with query and location metadata

    """
    locations = self.ner.extract_locations(query)

    return {
        "original_query": query,
        "locations": locations,
        "has_location": bool(locations["states"] or locations["lgas"]),
        "lga_codes": [lga["code"] for lga in locations["lgas"]],
        "state_codes": locations["states"],
    }