Skip to content

ETL Module API Reference

Auto-generated from Python docstrings

Pipeline

green_gov_rag.etl.pipeline

Enhanced ETL Pipeline with Automated Metadata Tagging.

This module provides an end-to-end ETL pipeline that: 1. Loads documents from config or cloud storage 2. Downloads and parses PDFs 3. Auto-tags with ESG/NGER metadata 4. Chunks with preserved metadata 5. Saves chunks to cloud or local storage 6. Builds embeddings and vector store

Supports both local filesystem and cloud storage (AWS S3, Azure Blob) via the ETL storage adapter.

EnhancedETLPipeline

ETL pipeline with automated metadata extraction and cloud storage support.

Source code in green_gov_rag/etl/pipeline.py
class EnhancedETLPipeline:
    """ETL pipeline with automated metadata extraction and cloud storage support."""

    def __init__(
        self,
        enable_auto_tagging: bool = True,
        chunk_size: int = 1000,
        chunk_overlap: int = 100,
        use_cloud: bool | None = None,
        storage_adapter: ETLStorageAdapter | None = None,
    ):
        """Initialize ETL pipeline.

        Args:
        ----
            enable_auto_tagging: Whether to auto-tag documents with ESG metadata
            chunk_size: Size of text chunks
            chunk_overlap: Overlap between chunks
            use_cloud: Whether to use cloud storage. If None, uses settings.
            storage_adapter: Optional custom storage adapter instance

        """
        self.enable_auto_tagging = enable_auto_tagging
        self.chunker = TextChunker(chunk_size=chunk_size, chunk_overlap=chunk_overlap)

        # Initialize metadata tagger if enabled
        self.tagger = ESGOpenAITagger() if enable_auto_tagging else None

        # Determine storage mode
        if use_cloud is None:
            use_cloud = settings.cloud_provider != "local"

        self.use_cloud = use_cloud
        self.storage_adapter = storage_adapter or (
            ETLStorageAdapter() if use_cloud else None
        )

        logger.info(
            f"Initialized ETL pipeline: use_cloud={use_cloud}, "
            f"auto_tagging={enable_auto_tagging}"
        )

    def load_and_parse_documents(
        self,
        config_path: str = "configs/documents_config.yml",
    ) -> list[Document]:
        """Load documents from config and parse them.

        Args:
        ----
            config_path: Path to documents config YAML

        Returns:
        -------
            List of parsed Document objects with metadata

        """
        # Load document configs
        doc_configs = load_documents_config(config_path)

        documents = []

        # Import DocumentSourceFactory to generate file_ids
        from green_gov_rag.etl.sources.factory import DocumentSourceFactory

        factory = DocumentSourceFactory()

        for doc_config in doc_configs:
            # Create source plugin to generate file_id
            source = factory.create_source(doc_config)

            # Get document metadata from config
            base_metadata = {
                "title": doc_config.get("title", "Untitled"),
                "source_url": doc_config.get("source_url", ""),
                "jurisdiction": doc_config.get("jurisdiction", ""),
                "category": doc_config.get("category", ""),
                "topic": doc_config.get("topic", ""),
                "region": doc_config.get("region", ""),
            }

            # Add ESG metadata if present in config
            if "esg_metadata" in doc_config:
                base_metadata["esg_metadata"] = doc_config["esg_metadata"]

            # Add spatial metadata if present in config
            if "spatial_metadata" in doc_config:
                base_metadata["spatial_metadata"] = doc_config["spatial_metadata"]

            # Download URLs
            urls = doc_config.get("download_urls", [])

            for url in urls:
                # Generate file_id for this URL (consistent with ingest.py)
                file_id = source.get_document_id(url)

                # Add file_id to metadata for citation verification
                metadata_with_file_id = base_metadata.copy()
                metadata_with_file_id["file_id"] = file_id
                metadata_with_file_id[
                    "document_id"
                ] = file_id  # Alias for backward compatibility

                # For now, assume PDFs are already downloaded
                # In production, integrate with download_documents()
                # Create document with config metadata
                doc = Document(
                    page_content="",  # Will be populated by PDF loader
                    metadata=metadata_with_file_id,
                )
                documents.append(doc)

        return documents

    def auto_tag_documents(self, documents: list[Document]) -> list[Document]:
        """Auto-tag documents with ESG/NGER metadata using LLM.

        Args:
        ----
            documents: List of Document objects

        Returns:
        -------
            Documents with enriched metadata

        """
        if not self.tagger:
            return documents

        print("Auto-tagging documents with ESG metadata...")
        tagged_docs = self.tagger.tag_all(documents)
        print(f"Tagged {len(tagged_docs)} documents")

        return tagged_docs

    def chunk_documents(self, documents: list[Document]) -> list[dict[str, Any]]:
        """Chunk documents while preserving metadata.

        Args:
        ----
            documents: List of Document objects

        Returns:
        -------
            List of chunked documents with metadata

        """
        chunked_docs = []

        for doc in documents:
            # Convert to dict format expected by chunker
            doc_dict = {"content": doc.page_content, "metadata": doc.metadata}

            # Chunk with metadata preservation
            chunks = self.chunker.chunk_docs([doc_dict])

            chunked_docs.extend(chunks)

        print(f"Created {len(chunked_docs)} chunks from {len(documents)} documents")
        return chunked_docs

    def run(
        self,
        config_path: str = "configs/documents_config.yml",
        output_path: str | None = None,
        document_ids: list[str] | None = None,
    ) -> list[dict[str, Any]]:
        """Run the complete ETL pipeline.

        Args:
        ----
            config_path: Path to documents config
            output_path: Optional path to save processed chunks (local mode)
            document_ids: Optional list of document IDs to process (cloud mode)

        Returns:
        -------
            List of processed and chunked documents

        """
        print("=" * 60)
        print("Enhanced ETL Pipeline - With Auto-Tagging")
        print(f"Storage mode: {'cloud' if self.use_cloud else 'local'}")
        print("=" * 60)

        # Step 1: Load and parse documents
        print("\n1. Loading documents from config...")
        documents = self.load_and_parse_documents(config_path)
        print(f"Loaded {len(documents)} documents")

        # Step 2: Auto-tag with ESG metadata (if enabled)
        if self.enable_auto_tagging:
            print("\n2. Auto-tagging with ESG metadata...")
            documents = self.auto_tag_documents(documents)

        # Step 3: Chunk documents
        print("\n3. Chunking documents...")
        chunks = self.chunk_documents(documents)

        # Step 4: Save chunks
        if self.use_cloud and self.storage_adapter and document_ids:
            print(f"\n4. Saving {len(chunks)} chunks to cloud storage...")
            for doc_id in document_ids:
                # Filter chunks for this document
                doc_chunks = [
                    c
                    for c in chunks
                    if c.get("metadata", {}).get("document_id") == doc_id
                ]
                if doc_chunks:
                    self.storage_adapter.save_chunks(doc_chunks, doc_id)
                    print(f"Saved {len(doc_chunks)} chunks for document {doc_id}")
        elif output_path:
            print(f"\n4. Saving chunks to {output_path}...")
            Path(output_path).parent.mkdir(parents=True, exist_ok=True)
            with open(output_path, "w", encoding="utf-8") as f:
                json.dump(chunks, f, indent=2)
            print(f"Saved {len(chunks)} chunks")

        print("\n" + "=" * 60)
        print("Pipeline Complete!")
        print("=" * 60)

        return chunks
__init__
__init__(enable_auto_tagging: bool = True, chunk_size: int = 1000, chunk_overlap: int = 100, use_cloud: bool | None = None, storage_adapter: ETLStorageAdapter | None = None)

Initialize ETL pipeline.


enable_auto_tagging: Whether to auto-tag documents with ESG metadata
chunk_size: Size of text chunks
chunk_overlap: Overlap between chunks
use_cloud: Whether to use cloud storage. If None, uses settings.
storage_adapter: Optional custom storage adapter instance
Source code in green_gov_rag/etl/pipeline.py
def __init__(
    self,
    enable_auto_tagging: bool = True,
    chunk_size: int = 1000,
    chunk_overlap: int = 100,
    use_cloud: bool | None = None,
    storage_adapter: ETLStorageAdapter | None = None,
):
    """Initialize ETL pipeline.

    Args:
    ----
        enable_auto_tagging: Whether to auto-tag documents with ESG metadata
        chunk_size: Size of text chunks
        chunk_overlap: Overlap between chunks
        use_cloud: Whether to use cloud storage. If None, uses settings.
        storage_adapter: Optional custom storage adapter instance

    """
    self.enable_auto_tagging = enable_auto_tagging
    self.chunker = TextChunker(chunk_size=chunk_size, chunk_overlap=chunk_overlap)

    # Initialize metadata tagger if enabled
    self.tagger = ESGOpenAITagger() if enable_auto_tagging else None

    # Determine storage mode
    if use_cloud is None:
        use_cloud = settings.cloud_provider != "local"

    self.use_cloud = use_cloud
    self.storage_adapter = storage_adapter or (
        ETLStorageAdapter() if use_cloud else None
    )

    logger.info(
        f"Initialized ETL pipeline: use_cloud={use_cloud}, "
        f"auto_tagging={enable_auto_tagging}"
    )
load_and_parse_documents
load_and_parse_documents(config_path: str = 'configs/documents_config.yml') -> list[Document]

Load documents from config and parse them.


config_path: Path to documents config YAML

List of parsed Document objects with metadata
Source code in green_gov_rag/etl/pipeline.py
def load_and_parse_documents(
    self,
    config_path: str = "configs/documents_config.yml",
) -> list[Document]:
    """Load documents from config and parse them.

    Args:
    ----
        config_path: Path to documents config YAML

    Returns:
    -------
        List of parsed Document objects with metadata

    """
    # Load document configs
    doc_configs = load_documents_config(config_path)

    documents = []

    # Import DocumentSourceFactory to generate file_ids
    from green_gov_rag.etl.sources.factory import DocumentSourceFactory

    factory = DocumentSourceFactory()

    for doc_config in doc_configs:
        # Create source plugin to generate file_id
        source = factory.create_source(doc_config)

        # Get document metadata from config
        base_metadata = {
            "title": doc_config.get("title", "Untitled"),
            "source_url": doc_config.get("source_url", ""),
            "jurisdiction": doc_config.get("jurisdiction", ""),
            "category": doc_config.get("category", ""),
            "topic": doc_config.get("topic", ""),
            "region": doc_config.get("region", ""),
        }

        # Add ESG metadata if present in config
        if "esg_metadata" in doc_config:
            base_metadata["esg_metadata"] = doc_config["esg_metadata"]

        # Add spatial metadata if present in config
        if "spatial_metadata" in doc_config:
            base_metadata["spatial_metadata"] = doc_config["spatial_metadata"]

        # Download URLs
        urls = doc_config.get("download_urls", [])

        for url in urls:
            # Generate file_id for this URL (consistent with ingest.py)
            file_id = source.get_document_id(url)

            # Add file_id to metadata for citation verification
            metadata_with_file_id = base_metadata.copy()
            metadata_with_file_id["file_id"] = file_id
            metadata_with_file_id[
                "document_id"
            ] = file_id  # Alias for backward compatibility

            # For now, assume PDFs are already downloaded
            # In production, integrate with download_documents()
            # Create document with config metadata
            doc = Document(
                page_content="",  # Will be populated by PDF loader
                metadata=metadata_with_file_id,
            )
            documents.append(doc)

    return documents
auto_tag_documents
auto_tag_documents(documents: list[Document]) -> list[Document]

Auto-tag documents with ESG/NGER metadata using LLM.


documents: List of Document objects

Documents with enriched metadata
Source code in green_gov_rag/etl/pipeline.py
def auto_tag_documents(self, documents: list[Document]) -> list[Document]:
    """Auto-tag documents with ESG/NGER metadata using LLM.

    Args:
    ----
        documents: List of Document objects

    Returns:
    -------
        Documents with enriched metadata

    """
    if not self.tagger:
        return documents

    print("Auto-tagging documents with ESG metadata...")
    tagged_docs = self.tagger.tag_all(documents)
    print(f"Tagged {len(tagged_docs)} documents")

    return tagged_docs
chunk_documents
chunk_documents(documents: list[Document]) -> list[dict[str, Any]]

Chunk documents while preserving metadata.


documents: List of Document objects

List of chunked documents with metadata
Source code in green_gov_rag/etl/pipeline.py
def chunk_documents(self, documents: list[Document]) -> list[dict[str, Any]]:
    """Chunk documents while preserving metadata.

    Args:
    ----
        documents: List of Document objects

    Returns:
    -------
        List of chunked documents with metadata

    """
    chunked_docs = []

    for doc in documents:
        # Convert to dict format expected by chunker
        doc_dict = {"content": doc.page_content, "metadata": doc.metadata}

        # Chunk with metadata preservation
        chunks = self.chunker.chunk_docs([doc_dict])

        chunked_docs.extend(chunks)

    print(f"Created {len(chunked_docs)} chunks from {len(documents)} documents")
    return chunked_docs
run
run(config_path: str = 'configs/documents_config.yml', output_path: str | None = None, document_ids: list[str] | None = None) -> list[dict[str, Any]]

Run the complete ETL pipeline.


config_path: Path to documents config
output_path: Optional path to save processed chunks (local mode)
document_ids: Optional list of document IDs to process (cloud mode)

List of processed and chunked documents
Source code in green_gov_rag/etl/pipeline.py
def run(
    self,
    config_path: str = "configs/documents_config.yml",
    output_path: str | None = None,
    document_ids: list[str] | None = None,
) -> list[dict[str, Any]]:
    """Run the complete ETL pipeline.

    Args:
    ----
        config_path: Path to documents config
        output_path: Optional path to save processed chunks (local mode)
        document_ids: Optional list of document IDs to process (cloud mode)

    Returns:
    -------
        List of processed and chunked documents

    """
    print("=" * 60)
    print("Enhanced ETL Pipeline - With Auto-Tagging")
    print(f"Storage mode: {'cloud' if self.use_cloud else 'local'}")
    print("=" * 60)

    # Step 1: Load and parse documents
    print("\n1. Loading documents from config...")
    documents = self.load_and_parse_documents(config_path)
    print(f"Loaded {len(documents)} documents")

    # Step 2: Auto-tag with ESG metadata (if enabled)
    if self.enable_auto_tagging:
        print("\n2. Auto-tagging with ESG metadata...")
        documents = self.auto_tag_documents(documents)

    # Step 3: Chunk documents
    print("\n3. Chunking documents...")
    chunks = self.chunk_documents(documents)

    # Step 4: Save chunks
    if self.use_cloud and self.storage_adapter and document_ids:
        print(f"\n4. Saving {len(chunks)} chunks to cloud storage...")
        for doc_id in document_ids:
            # Filter chunks for this document
            doc_chunks = [
                c
                for c in chunks
                if c.get("metadata", {}).get("document_id") == doc_id
            ]
            if doc_chunks:
                self.storage_adapter.save_chunks(doc_chunks, doc_id)
                print(f"Saved {len(doc_chunks)} chunks for document {doc_id}")
    elif output_path:
        print(f"\n4. Saving chunks to {output_path}...")
        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(chunks, f, indent=2)
        print(f"Saved {len(chunks)} chunks")

    print("\n" + "=" * 60)
    print("Pipeline Complete!")
    print("=" * 60)

    return chunks

process_pdf_with_tagging

process_pdf_with_tagging(pdf_path: str, base_metadata: dict[str, Any] | None = None, auto_tag: bool = True) -> list[dict[str, Any]]

Process a single PDF with optional auto-tagging.


pdf_path: Path to PDF file
base_metadata: Base metadata from config
auto_tag: Whether to auto-tag with ESG metadata

List of chunked documents with metadata
Source code in green_gov_rag/etl/pipeline.py
def process_pdf_with_tagging(
    pdf_path: str,
    base_metadata: dict[str, Any] | None = None,
    auto_tag: bool = True,
) -> list[dict[str, Any]]:
    """Process a single PDF with optional auto-tagging.

    Args:
    ----
        pdf_path: Path to PDF file
        base_metadata: Base metadata from config
        auto_tag: Whether to auto-tag with ESG metadata

    Returns:
    -------
        List of chunked documents with metadata

    """
    # Load PDF
    loader = PyPDFLoader(pdf_path)
    pages = loader.load()

    # Add base metadata to all pages
    if base_metadata:
        for page in pages:
            page.metadata.update(base_metadata)

    # Auto-tag if enabled
    if auto_tag:
        tagger = ESGOpenAITagger()
        pages = tagger.tag_all(pages)

    # Chunk documents
    chunker = TextChunker(chunk_size=1000, chunk_overlap=100)

    chunks = []
    for page in pages:
        doc_dict = {"content": page.page_content, "metadata": page.metadata}
        page_chunks = chunker.chunk_docs([doc_dict])
        chunks.extend(page_chunks)

    return chunks

Ingest

green_gov_rag.etl.ingest

Document ingestion script for GreenGovRAG.

Supports both local filesystem and cloud storage (AWS S3, Azure Blob) via the ETL storage adapter. Provider selection is controlled via CLOUD_PROVIDER environment variable.

load_config

load_config()

Load YAML configuration for documents.

Source code in green_gov_rag/etl/ingest.py
def load_config():
    """Load YAML configuration for documents."""
    with open(CONFIG_PATH, encoding="utf-8") as f:
        return yaml.safe_load(f)

sha256sum

sha256sum(file_path)

Compute SHA256 hash of a file.

Source code in green_gov_rag/etl/ingest.py
def sha256sum(file_path):
    """Compute SHA256 hash of a file."""
    h = hashlib.sha256()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(DEFAULT_HASH_CHUNK_SIZE), b""):
            h.update(chunk)
    return h.hexdigest()

download_file

download_file(url, dest_path, retries=DEFAULT_DOWNLOAD_RETRIES, backoff=DEFAULT_DOWNLOAD_BACKOFF) -> bool

Download file with retry logic.

Uses browser-like headers to avoid bot detection (Cloudflare, etc.).

Parameters:

Name Type Description Default
url

URL to download

required
dest_path

Destination file path

required
retries

Number of retry attempts

DEFAULT_DOWNLOAD_RETRIES
backoff

Backoff multiplier for retries

DEFAULT_DOWNLOAD_BACKOFF

Returns:

Type Description
bool

True if successful, False otherwise

Source code in green_gov_rag/etl/ingest.py
def download_file(
    url,
    dest_path,
    retries=DEFAULT_DOWNLOAD_RETRIES,
    backoff=DEFAULT_DOWNLOAD_BACKOFF,
) -> bool:
    """Download file with retry logic.

    Uses browser-like headers to avoid bot detection (Cloudflare, etc.).

    Args:
        url: URL to download
        dest_path: Destination file path
        retries: Number of retry attempts
        backoff: Backoff multiplier for retries

    Returns:
        True if successful, False otherwise
    """
    attempt = 0
    last_status_code = None

    while attempt < retries:
        try:
            resp = requests.get(
                url,
                timeout=DEFAULT_DOWNLOAD_TIMEOUT,
                headers=DEFAULT_HTTP_HEADERS,
                allow_redirects=True,
            )
            last_status_code = resp.status_code

            # Check for bot protection (403/503 from Cloudflare, etc.)
            if resp.status_code in (403, 503):
                # Check if it's Cloudflare protection
                is_cloudflare = (
                    "cloudflare" in resp.headers.get("Server", "").lower()
                    or "cf-ray" in resp.headers
                    or "cf-mitigated" in resp.headers
                )
                if is_cloudflare:
                    logger.warning(
                        f"Cloudflare protection detected for {url}. "
                        "Manual download required."
                    )
                    # Don't retry Cloudflare-protected URLs
                    return False

            resp.raise_for_status()

            with open(dest_path, "wb") as f:
                f.write(resp.content)
            return True

        except requests.exceptions.HTTPError as e:
            attempt += 1
            if last_status_code in (403, 403):
                logger.error(
                    f"Access denied (HTTP {last_status_code}) for {url}. "
                    "Likely bot protection. Skipping retries."
                )
                break
            logger.error(f"HTTP error downloading {url}: {e} (attempt {attempt})")
            if attempt < retries:
                time.sleep(backoff**attempt)

        except Exception as e:
            attempt += 1
            logger.error(f"Error downloading {url}: {e} (attempt {attempt})")
            if attempt < retries:
                time.sleep(backoff**attempt)

    return False

detect_file_type

detect_file_type(file_path: Path) -> str | None

Detect file type from magic bytes.

Parameters:

Name Type Description Default
file_path Path

Path to file

required

Returns:

Type Description
str | None

File extension (.pdf, .html, etc.) or None if unknown

Source code in green_gov_rag/etl/ingest.py
def detect_file_type(file_path: Path) -> str | None:
    """Detect file type from magic bytes.

    Args:
        file_path: Path to file

    Returns:
        File extension (.pdf, .html, etc.) or None if unknown
    """
    if not file_path.exists():
        return None

    # Read first 16 bytes for magic number detection
    try:
        with open(file_path, "rb") as f:
            header = f.read(16)
    except Exception:
        return None

    # PDF magic bytes
    if header.startswith(b"%PDF"):
        return ".pdf"

    # HTML detection (check for common HTML tags in first 1KB)
    try:
        with open(file_path, "rb") as f:
            content = f.read(1024).lower()
            if any(
                tag in content for tag in [b"<html", b"<!doctype", b"<head", b"<body"]
            ):
                return ".html"
    except Exception:
        pass

    # XML/HTML variants
    if header.startswith(b"<?xml") or header.startswith(b"<"):
        return ".html"

    return None

safe_filename

safe_filename(url)

Generate a safe filename from a URL.

Source code in green_gov_rag/etl/ingest.py
def safe_filename(url):
    """Generate a safe filename from a URL."""
    parsed = urlparse(url)
    filename = os.path.basename(parsed.path)
    return filename or DEFAULT_DOWNLOADED_FILENAME

process_document

process_document(doc: dict[str, Any], storage_adapter: ETLStorageAdapter | None = None, use_cloud: bool = False) -> None

Download and save a single document with metadata.

Parameters:

Name Type Description Default
doc dict[str, Any]

Document configuration dictionary

required
storage_adapter ETLStorageAdapter | None

Optional ETL storage adapter (for cloud storage)

None
use_cloud bool

Whether to use cloud storage (requires storage_adapter)

False
Source code in green_gov_rag/etl/ingest.py
def process_document(
    doc: dict[str, Any],
    storage_adapter: ETLStorageAdapter | None = None,
    use_cloud: bool = False,
) -> None:
    """Download and save a single document with metadata.

    Args:
        doc: Document configuration dictionary
        storage_adapter: Optional ETL storage adapter (for cloud storage)
        use_cloud: Whether to use cloud storage (requires storage_adapter)
    """
    title = doc.get("title", "untitled")
    jurisdiction = doc.get("jurisdiction", "unknown")
    category = doc.get("category", "misc")
    topic = doc.get("topic", "general")
    urls = doc.get("download_urls", [])

    # Extract ESG metadata if present (NGER/ISSB compliance)
    esg_metadata = doc.get("esg_metadata", {})
    spatial_metadata = doc.get("spatial_metadata", {})

    for url in urls:
        # Build metadata
        metadata = {
            "title": title,
            "jurisdiction": jurisdiction,
            "category": category,
            "topic": topic,
            "source_url": doc.get("source_url", url),  # Generic source (portal)
            "source_pdf_url": url,  # Actual PDF URL for deep linking
        }

        # Add ESG metadata if present
        if esg_metadata:
            metadata["esg_metadata"] = esg_metadata

        # Add spatial metadata if present
        if spatial_metadata:
            metadata["spatial_metadata"] = spatial_metadata

        # Use cloud storage if enabled
        if use_cloud and storage_adapter:
            try:
                doc_id = storage_adapter.download_from_url(url, metadata)
                print(f"✅ Downloaded to cloud: {url} (ID: {doc_id})")
            except Exception as e:
                logger.error(f"Failed to download {url} to cloud: {e}", exc_info=True)
                print(f"❌ Failed to download: {url}")
        else:
            # Use local filesystem (backward compatibility)
            _process_document_local(doc, metadata)

download_documents

download_documents(docs: list[dict], output_dir: str) -> list[str]

Download multiple documents to output directory.

:param docs: List of document dicts with 'download_urls' key :param output_dir: Directory to save downloaded files :return: List of downloaded file paths

Source code in green_gov_rag/etl/ingest.py
def download_documents(docs: list[dict], output_dir: str) -> list[str]:
    """Download multiple documents to output directory.

    :param docs: List of document dicts with 'download_urls' key
    :param output_dir: Directory to save downloaded files
    :return: List of downloaded file paths
    """
    import os

    os.makedirs(output_dir, exist_ok=True)

    downloaded_files = []
    for doc in docs:
        title = doc.get("title", "untitled")
        urls = doc.get("download_urls", [])

        for url in urls:
            filename = safe_filename(url)
            dest_path = os.path.join(output_dir, filename)

            if download_file(url, dest_path):
                print(f"✅ Downloaded: {title} -> {filename}")
                downloaded_files.append(dest_path)
            else:
                print(f"❌ Failed: {title} from {url}")

    return downloaded_files

ingest_documents

ingest_documents(use_cloud: bool | None = None, config_path: str | Path | None = None) -> list[str]

Ingest documents from config using plugin system (single source of truth).

This function uses the document source plugin architecture to: - Validate configurations - Generate consistent document IDs (for delta indexing) - Create hierarchical directory structures - Extract metadata using source-specific logic

Parameters:

Name Type Description Default
use_cloud bool | None

Whether to use cloud storage. If None, uses CLOUD_PROVIDER setting.

None
config_path str | Path | None

Path to config file. Defaults to CONFIG_PATH.

None

Returns:

Type Description
list[str]

List of document IDs (cloud mode) or file paths (local mode)

Source code in green_gov_rag/etl/ingest.py
def ingest_documents(
    use_cloud: bool | None = None,
    config_path: str | Path | None = None,
) -> list[str]:
    """Ingest documents from config using plugin system (single source of truth).

    This function uses the document source plugin architecture to:
    - Validate configurations
    - Generate consistent document IDs (for delta indexing)
    - Create hierarchical directory structures
    - Extract metadata using source-specific logic

    Args:
        use_cloud: Whether to use cloud storage. If None, uses CLOUD_PROVIDER setting.
        config_path: Path to config file. Defaults to CONFIG_PATH.

    Returns:
        List of document IDs (cloud mode) or file paths (local mode)
    """
    from green_gov_rag.etl.sources.factory import DocumentSourceFactory

    # Determine storage mode
    if use_cloud is None:
        use_cloud = settings.cloud_provider != "local"

    # Load config
    if config_path is None:
        config_path = CONFIG_PATH

    with open(config_path, encoding="utf-8") as f:
        config = yaml.safe_load(f)

    documents = config.get("documents", [])
    print(f"Found {len(documents)} document sources in config.")
    print(f"Storage mode: {'cloud' if use_cloud else 'local'}")

    # Initialize factory and storage adapter
    factory = DocumentSourceFactory()
    storage_adapter = ETLStorageAdapter() if use_cloud else None

    # Process documents using plugin system
    document_ids = []
    for doc_config in documents:
        try:
            # 1. Create source plugin (auto-detects type)
            source = factory.create_source(doc_config)

            # 2. Validate configuration
            validation = source.validate()
            if not validation.is_valid:
                logger.error(
                    f"Invalid config for {doc_config.get('title', 'unknown')}: {validation.errors}"
                )
                print(f"❌ Validation failed: {doc_config.get('title')}")
                for error in validation.errors:
                    print(f"   - {error}")
                continue

            # 3. Log warnings if any
            if validation.warnings:
                for warning in validation.warnings:
                    logger.warning(f"{doc_config.get('title')}: {warning}")

            # 4. Get URLs from plugin
            urls = source.get_download_urls()

            # 5. Get metadata from plugin (single source of truth)
            metadata = source.get_metadata()

            # 6. Process each URL
            for url in urls:
                try:
                    # Generate document ID using plugin (consistent with monitoring)
                    doc_id = source.get_document_id(url)

                    if use_cloud:
                        # Cloud mode - upload to S3/Azure Blob
                        if storage_adapter:
                            cloud_id = storage_adapter.download_from_url(url, metadata)
                            document_ids.append(cloud_id)
                            print(f"✅ Downloaded to cloud: {url} (ID: {cloud_id})")
                    else:
                        # Local mode - use plugin-generated path
                        dest_path = source.get_destination_path(
                            url, base_dir=str(RAW_DATA_DIR)
                        )
                        dest_path_obj = Path(dest_path)

                        # Create directory
                        dest_path_obj.parent.mkdir(parents=True, exist_ok=True)

                        # Skip if exists
                        if dest_path_obj.exists():
                            logger.info(f"Skipping {url} — already exists")
                            document_ids.append(doc_id)
                            continue

                        # Download file
                        if download_file(url, str(dest_path_obj)):
                            # Detect actual file type and fix extension if needed
                            detected_ext = detect_file_type(dest_path_obj)
                            final_path = dest_path_obj

                            if detected_ext and not dest_path_obj.name.lower().endswith(
                                detected_ext
                            ):
                                # Rename file with correct extension
                                final_filename = f"{dest_path_obj.name}{detected_ext}"
                                final_path = dest_path_obj.parent / final_filename
                                dest_path_obj.rename(final_path)
                                logger.info(
                                    f"Renamed {dest_path_obj.name}{final_filename} "
                                    f"(detected: {detected_ext})"
                                )
                            else:
                                final_filename = dest_path_obj.name

                            # Save metadata
                            metadata_with_file = metadata.copy()
                            metadata_with_file["filename"] = final_filename
                            metadata_with_file[
                                "download_timestamp"
                            ] = datetime.utcnow().isoformat()
                            metadata_with_file["sha256"] = sha256sum(final_path)
                            metadata_with_file[
                                "document_id"
                            ] = doc_id  # NEW: for delta indexing
                            metadata_with_file[
                                "source_pdf_url"
                            ] = url  # Add PDF URL for deep linking

                            metadata_path = (
                                final_path.parent
                                / f"{final_filename}{METADATA_FILE_SUFFIX}"
                            )
                            with open(metadata_path, "w", encoding="utf-8") as mf:
                                json.dump(metadata_with_file, mf, indent=2)

                            print(f"✅ Downloaded: {url} (ID: {doc_id})")
                            document_ids.append(doc_id)
                        else:
                            # Log failed download
                            failed_urls_file = LOG_DIR / FAILED_DOWNLOADS_FILENAME
                            with open(failed_urls_file, "a", encoding="utf-8") as f:
                                f.write(
                                    f"{datetime.utcnow().isoformat()} | {url} | {metadata.get('title')} | {doc_id}\n"
                                )
                            print(f"❌ Failed: {url} (logged to {failed_urls_file})")

                except Exception as e:
                    logger.error(f"Failed to process {url}: {e}", exc_info=True)
                    print(f"❌ Error processing {url}: {e}")

        except Exception as e:
            logger.error(
                f"Failed to create source for {doc_config.get('title', 'unknown')}: {e}",
                exc_info=True,
            )
            print(f"❌ Failed to process document config: {doc_config.get('title')}")

    print(f"\n✅ Ingestion complete. Processed {len(document_ids)} document(s).")
    return document_ids

main

main() -> None

Main entry point for CLI usage.

Source code in green_gov_rag/etl/ingest.py
def main() -> None:
    """Main entry point for CLI usage."""
    config = load_config()
    documents = config.get("documents", [])
    print(f"Found {len(documents)} document sources in config.")

    # Detect storage mode from settings
    use_cloud = settings.cloud_provider != "local"
    print(f"Storage mode: {'cloud' if use_cloud else 'local'}")

    storage_adapter = ETLStorageAdapter() if use_cloud else None

    for doc in documents:
        process_document(doc, storage_adapter=storage_adapter, use_cloud=use_cloud)

Loader

green_gov_rag.etl.loader

load_documents_config

load_documents_config(config_path: str = 'configs/documents_config.yml')

Load document metadata from YAML config.

This function maintains backward compatibility with the original API while supporting the new plugin-based architecture.

Parameters:

Name Type Description Default
config_path str

Path to documents configuration file

'configs/documents_config.yml'

Returns:

Type Description

List of document configuration dictionaries

Source code in green_gov_rag/etl/loader.py
def load_documents_config(config_path: str = "configs/documents_config.yml"):
    """Load document metadata from YAML config.

    This function maintains backward compatibility with the original API
    while supporting the new plugin-based architecture.

    Args:
        config_path: Path to documents configuration file

    Returns:
        List of document configuration dictionaries
    """
    config_file = Path(config_path)
    if not config_file.exists():
        msg = f"Config file {config_path} not found."
        raise FileNotFoundError(msg)

    with open(config_file, encoding="utf-8") as f:
        config = yaml.safe_load(f)

    return config.get("documents", [])

load_document_sources

load_document_sources(config_path: str = 'configs/documents_config.yml') -> list[DocumentSource]

Load document sources using the plugin-based architecture.

This is the new API that returns DocumentSource objects instead of raw configuration dictionaries.

Parameters:

Name Type Description Default
config_path str

Path to documents configuration file

'configs/documents_config.yml'

Returns:

Type Description
list[DocumentSource]

List of DocumentSource plugin instances

Example

sources = load_document_sources() for source in sources: ... metadata = source.get_metadata() ... urls = source.get_download_urls() ... validation = source.validate()

Source code in green_gov_rag/etl/loader.py
def load_document_sources(
    config_path: str = "configs/documents_config.yml",
) -> list[DocumentSource]:
    """Load document sources using the plugin-based architecture.

    This is the new API that returns DocumentSource objects instead of
    raw configuration dictionaries.

    Args:
        config_path: Path to documents configuration file

    Returns:
        List of DocumentSource plugin instances

    Example:
        >>> sources = load_document_sources()
        >>> for source in sources:
        ...     metadata = source.get_metadata()
        ...     urls = source.get_download_urls()
        ...     validation = source.validate()
    """
    configs = load_documents_config(config_path)
    factory = DocumentSourceFactory()
    return factory.create_sources_from_list(configs)

get_document_sources

get_document_sources()

Returns a list of all source URLs for ingestion.

This function maintains backward compatibility with the original API.

Returns:

Type Description

List of download URLs

Source code in green_gov_rag/etl/loader.py
def get_document_sources():
    """Returns a list of all source URLs for ingestion.

    This function maintains backward compatibility with the original API.

    Returns:
        List of download URLs
    """
    documents = load_documents_config()
    sources = []
    for doc in documents:
        urls = doc.get("download_urls", [])
        sources.extend(urls)
    return sources

get_document_sources_by_type

get_document_sources_by_type(source_type: str) -> list[DocumentSource]

Get document sources filtered by type.

Parameters:

Name Type Description Default
source_type str

Source type identifier (e.g., 'federal_legislation')

required

Returns:

Type Description
list[DocumentSource]

List of DocumentSource instances matching the type

Example

federal_sources = get_document_sources_by_type('federal_legislation') emissions_sources = get_document_sources_by_type('emissions_reporting')

Source code in green_gov_rag/etl/loader.py
def get_document_sources_by_type(source_type: str) -> list[DocumentSource]:
    """Get document sources filtered by type.

    Args:
        source_type: Source type identifier (e.g., 'federal_legislation')

    Returns:
        List of DocumentSource instances matching the type

    Example:
        >>> federal_sources = get_document_sources_by_type('federal_legislation')
        >>> emissions_sources = get_document_sources_by_type('emissions_reporting')
    """
    sources = load_document_sources()
    return [s for s in sources if s.get_source_type() == source_type]

get_document_sources_by_jurisdiction

get_document_sources_by_jurisdiction(jurisdiction: str) -> list[DocumentSource]

Get document sources filtered by jurisdiction.

Parameters:

Name Type Description Default
jurisdiction str

Jurisdiction filter ('federal', 'state', 'local')

required

Returns:

Type Description
list[DocumentSource]

List of DocumentSource instances matching the jurisdiction

Example

federal_sources = get_document_sources_by_jurisdiction('federal') local_sources = get_document_sources_by_jurisdiction('local')

Source code in green_gov_rag/etl/loader.py
def get_document_sources_by_jurisdiction(jurisdiction: str) -> list[DocumentSource]:
    """Get document sources filtered by jurisdiction.

    Args:
        jurisdiction: Jurisdiction filter ('federal', 'state', 'local')

    Returns:
        List of DocumentSource instances matching the jurisdiction

    Example:
        >>> federal_sources = get_document_sources_by_jurisdiction('federal')
        >>> local_sources = get_document_sources_by_jurisdiction('local')
    """
    sources = load_document_sources()
    return [s for s in sources if s.get_metadata().get("jurisdiction") == jurisdiction]

load_yaml

load_yaml(file_path: str) -> dict

Load YAML file and return as dictionary.

:param file_path: Path to YAML file :return: Parsed YAML content as dictionary

Source code in green_gov_rag/etl/loader.py
def load_yaml(file_path: str) -> dict:
    """Load YAML file and return as dictionary.

    :param file_path: Path to YAML file
    :return: Parsed YAML content as dictionary
    """
    yaml_file = Path(file_path)
    if not yaml_file.exists():
        msg = f"YAML file {file_path} not found."
        raise FileNotFoundError(msg)

    with open(yaml_file, encoding="utf-8") as f:
        return yaml.safe_load(f)

load_documents_from_storage

load_documents_from_storage(jurisdiction: str | None = None, category: str | None = None, topic: str | None = None, storage_adapter: ETLStorageAdapter | None = None) -> list[dict]

Load documents from cloud storage with optional filters.

Parameters:

Name Type Description Default
jurisdiction str | None

Filter by jurisdiction

None
category str | None

Filter by category

None
topic str | None

Filter by topic

None
storage_adapter ETLStorageAdapter | None

Optional custom storage adapter

None

Returns:

Type Description
list[dict]

List of document metadata dictionaries

Example
Load all federal documents

docs = load_documents_from_storage(jurisdiction='federal')

Load specific topic

docs = load_documents_from_storage( ... jurisdiction='federal', ... category='environment', ... topic='emissions_reporting' ... )

Source code in green_gov_rag/etl/loader.py
def load_documents_from_storage(
    jurisdiction: str | None = None,
    category: str | None = None,
    topic: str | None = None,
    storage_adapter: ETLStorageAdapter | None = None,
) -> list[dict]:
    """Load documents from cloud storage with optional filters.

    Args:
        jurisdiction: Filter by jurisdiction
        category: Filter by category
        topic: Filter by topic
        storage_adapter: Optional custom storage adapter

    Returns:
        List of document metadata dictionaries

    Example:
        >>> # Load all federal documents
        >>> docs = load_documents_from_storage(jurisdiction='federal')
        >>> # Load specific topic
        >>> docs = load_documents_from_storage(
        ...     jurisdiction='federal',
        ...     category='environment',
        ...     topic='emissions_reporting'
        ... )
    """
    if storage_adapter is None:
        storage_adapter = ETLStorageAdapter()

    documents = storage_adapter.list_documents(
        jurisdiction=jurisdiction,
        category=category,
        topic=topic,
    )

    logger.info(
        f"Loaded {len(documents)} documents from storage "
        f"(jurisdiction={jurisdiction}, category={category}, topic={topic})"
    )

    return documents

get_document_content_from_storage

get_document_content_from_storage(document_id: str, storage_adapter: ETLStorageAdapter | None = None) -> tuple[bytes, dict]

Load document content and metadata from storage.

Parameters:

Name Type Description Default
document_id str

Document ID

required
storage_adapter ETLStorageAdapter | None

Optional custom storage adapter

None

Returns:

Type Description
tuple[bytes, dict]

Tuple of (content bytes, metadata dict)

Example

content, metadata = get_document_content_from_storage('abc123') print(metadata['title']) 'NGER Guidelines 2024'

Source code in green_gov_rag/etl/loader.py
def get_document_content_from_storage(
    document_id: str,
    storage_adapter: ETLStorageAdapter | None = None,
) -> tuple[bytes, dict]:
    """Load document content and metadata from storage.

    Args:
        document_id: Document ID
        storage_adapter: Optional custom storage adapter

    Returns:
        Tuple of (content bytes, metadata dict)

    Example:
        >>> content, metadata = get_document_content_from_storage('abc123')
        >>> print(metadata['title'])
        'NGER Guidelines 2024'
    """
    if storage_adapter is None:
        storage_adapter = ETLStorageAdapter()

    # Load metadata first to get storage path
    metadata = storage_adapter.load_metadata(document_id)

    # Load document content
    content = storage_adapter.load_document(document_id, metadata)

    return content, metadata

get_document_chunks_from_storage

get_document_chunks_from_storage(document_id: str, storage_adapter: ETLStorageAdapter | None = None) -> list[dict]

Load processed chunks for a document from storage.

Parameters:

Name Type Description Default
document_id str

Document ID

required
storage_adapter ETLStorageAdapter | None

Optional custom storage adapter

None

Returns:

Type Description
list[dict]

List of chunk dictionaries

Example

chunks = get_document_chunks_from_storage('abc123') print(f"Loaded {len(chunks)} chunks") print(chunks[0]['content'])

Source code in green_gov_rag/etl/loader.py
def get_document_chunks_from_storage(
    document_id: str,
    storage_adapter: ETLStorageAdapter | None = None,
) -> list[dict]:
    """Load processed chunks for a document from storage.

    Args:
        document_id: Document ID
        storage_adapter: Optional custom storage adapter

    Returns:
        List of chunk dictionaries

    Example:
        >>> chunks = get_document_chunks_from_storage('abc123')
        >>> print(f"Loaded {len(chunks)} chunks")
        >>> print(chunks[0]['content'])
    """
    if storage_adapter is None:
        storage_adapter = ETLStorageAdapter()

    chunks = storage_adapter.load_chunks(document_id)

    logger.info(f"Loaded {len(chunks)} chunks for document {document_id}")

    return chunks

Chunker

green_gov_rag.etl.chunker

Chunker module for splitting documents into smaller text chunks using LangChain text splitters.

  1. Uses RecursiveCharacterTextSplitter (handles paragraphs → sentences → words).
  2. Configurable chunk_size, chunk_overlap, and separators.
  3. Skips empty/whitespace-only texts.
  4. Returns flat list of chunks for indexing or embeddings.

TextChunker

Source code in green_gov_rag/etl/chunker.py
class TextChunker:
    def __init__(
        self,
        chunk_size: int = DEFAULT_CHUNK_SIZE,
        chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
        splitter_type: str = "recursive",
    ):
        """Initialize text chunker.
        :param chunk_size: Max characters or tokens per chunk
        :param chunk_overlap: Overlap between chunks
        :param splitter_type: "recursive" or "token".
        """
        self.splitter_type = splitter_type
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

        if splitter_type == "recursive":
            self.splitter: RecursiveCharacterTextSplitter | TokenTextSplitter = (
                RecursiveCharacterTextSplitter(
                    chunk_size=chunk_size,
                    chunk_overlap=chunk_overlap,
                    separators=["\n\n", "\n", " ", ""],
                )
            )
        elif splitter_type == "token":
            self.splitter = TokenTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=chunk_overlap,
            )
        else:
            msg = f"Unsupported splitter_type: {splitter_type}"
            raise ValueError(msg)

    def chunk_text(self, text: str) -> list[str]:
        """Split raw text into chunks."""
        return self.splitter.split_text(text)

    def chunk_docs(self, docs: list[dict[str, Any]]) -> list[dict[str, Any]]:
        """Split list of documents into smaller chunks.
        Each doc should have: {"content": str, "metadata": dict}.
        """
        chunked_docs = []
        for doc in docs:
            content = doc.get("content", "")
            metadata = doc.get("metadata", {})

            chunks = self.chunk_text(content)
            for i, chunk in enumerate(chunks):
                chunked_docs.append(
                    {"content": chunk, "metadata": {**metadata, "chunk_id": i}},
                )
        return chunked_docs

    def chunk_with_hierarchy(
        self,
        hierarchical_chunks: list[dict[str, Any]],
    ) -> list[dict[str, Any]]:
        """Chunk hierarchical documents while preserving section metadata.

        For documents parsed with HierarchicalPDFParser, this preserves
        section hierarchy, page numbers, and structural context.

        Args:
        ----
            hierarchical_chunks: Chunks from HierarchicalPDFParser with metadata

        Returns:
        -------
            List of smaller chunks with preserved hierarchical metadata

        """
        chunked_docs = []
        global_chunk_id = 0  # Sequential integer ID across all chunks

        for doc_chunk in hierarchical_chunks:
            content = doc_chunk.get("content", "")
            metadata = doc_chunk.get("metadata", {})

            # Split content into smaller chunks
            text_chunks = self.chunk_text(content)

            # Preserve hierarchical metadata for each sub-chunk
            for i, text_chunk in enumerate(text_chunks):
                chunked_docs.append(
                    {
                        "content": text_chunk,
                        "metadata": {
                            **metadata,  # Preserve all hierarchical metadata
                            "original_chunk_id": metadata.get(
                                "chunk_id"
                            ),  # Original from parser
                            "sub_chunk_id": i,  # Track sub-chunks within section
                            # Use global sequential integer chunk_id (required for DB)
                            "chunk_id": global_chunk_id,
                        },
                    },
                )
                global_chunk_id += 1

        return chunked_docs
__init__
__init__(chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = DEFAULT_CHUNK_OVERLAP, splitter_type: str = 'recursive')

Initialize text chunker. :param chunk_size: Max characters or tokens per chunk :param chunk_overlap: Overlap between chunks :param splitter_type: "recursive" or "token".

Source code in green_gov_rag/etl/chunker.py
def __init__(
    self,
    chunk_size: int = DEFAULT_CHUNK_SIZE,
    chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
    splitter_type: str = "recursive",
):
    """Initialize text chunker.
    :param chunk_size: Max characters or tokens per chunk
    :param chunk_overlap: Overlap between chunks
    :param splitter_type: "recursive" or "token".
    """
    self.splitter_type = splitter_type
    self.chunk_size = chunk_size
    self.chunk_overlap = chunk_overlap

    if splitter_type == "recursive":
        self.splitter: RecursiveCharacterTextSplitter | TokenTextSplitter = (
            RecursiveCharacterTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=chunk_overlap,
                separators=["\n\n", "\n", " ", ""],
            )
        )
    elif splitter_type == "token":
        self.splitter = TokenTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
        )
    else:
        msg = f"Unsupported splitter_type: {splitter_type}"
        raise ValueError(msg)
chunk_text
chunk_text(text: str) -> list[str]

Split raw text into chunks.

Source code in green_gov_rag/etl/chunker.py
def chunk_text(self, text: str) -> list[str]:
    """Split raw text into chunks."""
    return self.splitter.split_text(text)
chunk_docs
chunk_docs(docs: list[dict[str, Any]]) -> list[dict[str, Any]]

Split list of documents into smaller chunks. Each doc should have: {"content": str, "metadata": dict}.

Source code in green_gov_rag/etl/chunker.py
def chunk_docs(self, docs: list[dict[str, Any]]) -> list[dict[str, Any]]:
    """Split list of documents into smaller chunks.
    Each doc should have: {"content": str, "metadata": dict}.
    """
    chunked_docs = []
    for doc in docs:
        content = doc.get("content", "")
        metadata = doc.get("metadata", {})

        chunks = self.chunk_text(content)
        for i, chunk in enumerate(chunks):
            chunked_docs.append(
                {"content": chunk, "metadata": {**metadata, "chunk_id": i}},
            )
    return chunked_docs
chunk_with_hierarchy
chunk_with_hierarchy(hierarchical_chunks: list[dict[str, Any]]) -> list[dict[str, Any]]

Chunk hierarchical documents while preserving section metadata.

For documents parsed with HierarchicalPDFParser, this preserves section hierarchy, page numbers, and structural context.


hierarchical_chunks: Chunks from HierarchicalPDFParser with metadata

List of smaller chunks with preserved hierarchical metadata
Source code in green_gov_rag/etl/chunker.py
def chunk_with_hierarchy(
    self,
    hierarchical_chunks: list[dict[str, Any]],
) -> list[dict[str, Any]]:
    """Chunk hierarchical documents while preserving section metadata.

    For documents parsed with HierarchicalPDFParser, this preserves
    section hierarchy, page numbers, and structural context.

    Args:
    ----
        hierarchical_chunks: Chunks from HierarchicalPDFParser with metadata

    Returns:
    -------
        List of smaller chunks with preserved hierarchical metadata

    """
    chunked_docs = []
    global_chunk_id = 0  # Sequential integer ID across all chunks

    for doc_chunk in hierarchical_chunks:
        content = doc_chunk.get("content", "")
        metadata = doc_chunk.get("metadata", {})

        # Split content into smaller chunks
        text_chunks = self.chunk_text(content)

        # Preserve hierarchical metadata for each sub-chunk
        for i, text_chunk in enumerate(text_chunks):
            chunked_docs.append(
                {
                    "content": text_chunk,
                    "metadata": {
                        **metadata,  # Preserve all hierarchical metadata
                        "original_chunk_id": metadata.get(
                            "chunk_id"
                        ),  # Original from parser
                        "sub_chunk_id": i,  # Track sub-chunks within section
                        # Use global sequential integer chunk_id (required for DB)
                        "chunk_id": global_chunk_id,
                    },
                },
            )
            global_chunk_id += 1

    return chunked_docs

chunk_text

chunk_text(text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = DEFAULT_CHUNK_OVERLAP, splitter_type: str = 'recursive') -> list[str]

Convenience function to chunk text without creating a TextChunker instance.

Source code in green_gov_rag/etl/chunker.py
def chunk_text(
    text: str,
    chunk_size: int = DEFAULT_CHUNK_SIZE,
    chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
    splitter_type: str = "recursive",
) -> list[str]:
    """Convenience function to chunk text without creating a TextChunker instance."""
    # Ensure chunk_overlap is not larger than chunk_size
    actual_overlap = min(chunk_overlap, chunk_size - 1) if chunk_size > 1 else 0
    chunker = TextChunker(
        chunk_size=chunk_size,
        chunk_overlap=actual_overlap,
        splitter_type=splitter_type,
    )
    return chunker.chunk_text(text)

Metadata Tagger

green_gov_rag.etl.metadata_tagger

Automated Metadata Tagging using LangChain.

This module provides LLM-powered metadata extraction for ESG/NGER documents, automating the tagging process to ensure consistent categorization.

ESGMetadata

Bases: BaseModel

ESG metadata schema for automated tagging.

Source code in green_gov_rag/etl/metadata_tagger.py
class ESGMetadata(BaseModel):
    """ESG metadata schema for automated tagging."""

    emission_scopes: list[str] = Field(
        description=(
            "Emission scopes covered in this document. Options: "
            "scope_1 (direct emissions from owned sources), "
            "scope_2 (indirect emissions from purchased energy), "
            "scope_3 (indirect emissions in value chain)"
        ),
        default_factory=list,
    )

    scope_3_categories: list[str] = Field(
        description=(
            "Scope 3 categories if applicable. Options: "
            "purchased_goods_services, capital_goods, fuel_energy_activities, "
            "upstream_transport, waste_generated, business_travel, "
            "employee_commuting, upstream_leased_assets, downstream_transport, "
            "processing_sold_products, use_of_sold_products, end_of_life_treatment, "
            "downstream_leased_assets, franchises, investments"
        ),
        default_factory=list,
    )

    greenhouse_gases: list[str] = Field(
        description=(
            "Greenhouse gases mentioned. Options: "
            "CO2, CH4, N2O, SF6, HFCs, PFCs, NF3"
        ),
        default_factory=list,
    )

    frameworks: list[str] = Field(
        description=(
            "ESG frameworks or standards referenced. Options: "
            "NGER, ISSB, GHG_Protocol, GRI, TCFD, CDP, Safeguard_Mechanism"
        ),
        default_factory=list,
    )

    consolidation_method: str | None = Field(
        description=(
            "Consolidation approach for emissions accounting. Options: "
            "operational_control, equity_share, financial_control, or null if not specified"
        ),
        default=None,
    )

    methodology_type: str | None = Field(
        description=(
            "Type of methodology described. Options: "
            "calculation, reporting, verification, or null if not specified"
        ),
        default=None,
    )

    activity_types: list[str] = Field(
        description=(
            "Activity types covered. Examples: "
            "fuel_combustion, electricity_consumption, fugitive_emissions, "
            "refrigerant_use, transport, waste, supply_chain, etc."
        ),
        default_factory=list,
    )

    facility_types: list[str] = Field(
        description=(
            "Facility or organization types this applies to. Examples: "
            "coal_mine, power_station, manufacturing, all, large_emitters, etc."
        ),
        default_factory=list,
    )

    industry_applicability: list[str] = Field(
        description=(
            "Industries this document applies to (ANZSIC codes or names). "
            "Examples: B0600 (Coal Mining), C1700 (Manufacturing), D2610 (Electricity), etc."
        ),
        default_factory=list,
    )

    regulatory_authority: str | None = Field(
        description=(
            "Regulatory authority or organization that issued/enforces this. "
            "Examples: Clean Energy Regulator, NSW EPA, Victorian EPA, IFRS Foundation, etc."
        ),
        default=None,
    )

    reportable_under_nger: bool | None = Field(
        description="Whether this is reportable under NGER (true/false or null if not specified)",
        default=None,
    )

DocumentMetadata

Bases: BaseModel

General document metadata schema.

Source code in green_gov_rag/etl/metadata_tagger.py
class DocumentMetadata(BaseModel):
    """General document metadata schema."""

    jurisdiction: str | None = Field(
        description="Jurisdiction level: federal, state, or local",
        default=None,
    )

    category: str | None = Field(
        description=(
            "Document category: environment, planning, legislation, regulation, "
            "guidelines, policy, building, etc."
        ),
        default=None,
    )

    topic: str | None = Field(
        description=(
            "Specific topic: emissions_reporting, biodiversity, tree_management, "
            "climate_change, vegetation_management, etc."
        ),
        default=None,
    )

    region: str | None = Field(
        description="Geographic region: Australia, New South Wales, Victoria, South Australia, etc.",
        default=None,
    )

MetadataTagger

LLM-powered metadata tagger for automatic document categorization.

Source code in green_gov_rag/etl/metadata_tagger.py
class MetadataTagger:
    """LLM-powered metadata tagger for automatic document categorization."""

    def __init__(self, model_name: str = "gpt-4", temperature: float = 0.0):
        """Initialize metadata tagger.

        Args:
        ----
            model_name: OpenAI model to use for tagging
            temperature: Model temperature (0 for deterministic)

        """
        from green_gov_rag.rag.llm_factory import get_llm

        self.llm: Any = get_llm(model=model_name, temperature=temperature)

        # Create tagging chains
        self.esg_tagger = create_tagging_chain_pydantic(ESGMetadata, self.llm)
        self.doc_tagger = create_tagging_chain_pydantic(DocumentMetadata, self.llm)

    def tag_esg_metadata(self, text: str) -> dict[str, Any]:
        """Extract ESG metadata from document text.

        Args:
        ----
            text: Document text to analyze

        Returns:
        -------
            Dict of ESG metadata

        """
        result = self.esg_tagger.run(text)
        return result.dict(exclude_none=True)

    def tag_document_metadata(self, text: str) -> dict[str, Any]:
        """Extract general document metadata from text.

        Args:
        ----
            text: Document text to analyze

        Returns:
        -------
            Dict of document metadata

        """
        result = self.doc_tagger.run(text)
        return result.dict(exclude_none=True)

    def tag_document(self, document: Document, include_esg: bool = True) -> Document:
        """Tag a LangChain Document with metadata.

        Args:
        ----
            document: LangChain Document to tag
            include_esg: Whether to extract ESG metadata

        Returns:
        -------
            Document with enriched metadata

        """
        text = document.page_content

        # Extract general metadata
        doc_metadata = self.tag_document_metadata(text)

        # Merge with existing metadata
        for key, value in doc_metadata.items():
            if value is not None:
                document.metadata[key] = value

        # Extract ESG metadata if requested
        if include_esg:
            esg_metadata = self.tag_esg_metadata(text)
            if esg_metadata:
                document.metadata["esg_metadata"] = esg_metadata

        return document

    def tag_documents(
        self,
        documents: list[Document],
        include_esg: bool = True,
    ) -> list[Document]:
        """Tag multiple documents with metadata.

        Args:
        ----
            documents: List of LangChain Documents
            include_esg: Whether to extract ESG metadata

        Returns:
        -------
            List of Documents with enriched metadata

        """
        return [self.tag_document(doc, include_esg) for doc in documents]
__init__
__init__(model_name: str = 'gpt-4', temperature: float = 0.0)

Initialize metadata tagger.


model_name: OpenAI model to use for tagging
temperature: Model temperature (0 for deterministic)
Source code in green_gov_rag/etl/metadata_tagger.py
def __init__(self, model_name: str = "gpt-4", temperature: float = 0.0):
    """Initialize metadata tagger.

    Args:
    ----
        model_name: OpenAI model to use for tagging
        temperature: Model temperature (0 for deterministic)

    """
    from green_gov_rag.rag.llm_factory import get_llm

    self.llm: Any = get_llm(model=model_name, temperature=temperature)

    # Create tagging chains
    self.esg_tagger = create_tagging_chain_pydantic(ESGMetadata, self.llm)
    self.doc_tagger = create_tagging_chain_pydantic(DocumentMetadata, self.llm)
tag_esg_metadata
tag_esg_metadata(text: str) -> dict[str, Any]

Extract ESG metadata from document text.


text: Document text to analyze

Dict of ESG metadata
Source code in green_gov_rag/etl/metadata_tagger.py
def tag_esg_metadata(self, text: str) -> dict[str, Any]:
    """Extract ESG metadata from document text.

    Args:
    ----
        text: Document text to analyze

    Returns:
    -------
        Dict of ESG metadata

    """
    result = self.esg_tagger.run(text)
    return result.dict(exclude_none=True)
tag_document_metadata
tag_document_metadata(text: str) -> dict[str, Any]

Extract general document metadata from text.


text: Document text to analyze

Dict of document metadata
Source code in green_gov_rag/etl/metadata_tagger.py
def tag_document_metadata(self, text: str) -> dict[str, Any]:
    """Extract general document metadata from text.

    Args:
    ----
        text: Document text to analyze

    Returns:
    -------
        Dict of document metadata

    """
    result = self.doc_tagger.run(text)
    return result.dict(exclude_none=True)
tag_document
tag_document(document: Document, include_esg: bool = True) -> Document

Tag a LangChain Document with metadata.


document: LangChain Document to tag
include_esg: Whether to extract ESG metadata

Document with enriched metadata
Source code in green_gov_rag/etl/metadata_tagger.py
def tag_document(self, document: Document, include_esg: bool = True) -> Document:
    """Tag a LangChain Document with metadata.

    Args:
    ----
        document: LangChain Document to tag
        include_esg: Whether to extract ESG metadata

    Returns:
    -------
        Document with enriched metadata

    """
    text = document.page_content

    # Extract general metadata
    doc_metadata = self.tag_document_metadata(text)

    # Merge with existing metadata
    for key, value in doc_metadata.items():
        if value is not None:
            document.metadata[key] = value

    # Extract ESG metadata if requested
    if include_esg:
        esg_metadata = self.tag_esg_metadata(text)
        if esg_metadata:
            document.metadata["esg_metadata"] = esg_metadata

    return document
tag_documents
tag_documents(documents: list[Document], include_esg: bool = True) -> list[Document]

Tag multiple documents with metadata.


documents: List of LangChain Documents
include_esg: Whether to extract ESG metadata

List of Documents with enriched metadata
Source code in green_gov_rag/etl/metadata_tagger.py
def tag_documents(
    self,
    documents: list[Document],
    include_esg: bool = True,
) -> list[Document]:
    """Tag multiple documents with metadata.

    Args:
    ----
        documents: List of LangChain Documents
        include_esg: Whether to extract ESG metadata

    Returns:
    -------
        List of Documents with enriched metadata

    """
    return [self.tag_document(doc, include_esg) for doc in documents]

CustomPromptTagger

Metadata tagger with custom prompts for specific use cases.

Source code in green_gov_rag/etl/metadata_tagger.py
class CustomPromptTagger:
    """Metadata tagger with custom prompts for specific use cases."""

    def __init__(self, model_name: str = "gpt-4", temperature: float = 0.0):
        """Initialize custom prompt tagger.

        Args:
        ----
            model_name: OpenAI model to use
            temperature: Model temperature

        """
        from green_gov_rag.rag.llm_factory import get_llm

        self.llm: Any = get_llm(model=model_name, temperature=temperature)

    def extract_scope_3_categories(self, text: str) -> list[str]:
        """Extract Scope 3 categories from text using custom prompt.

        Args:
        ----
            text: Document text

        Returns:
        -------
            List of Scope 3 category names

        """
        prompt = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    "You are an ESG expert analyzing emissions reporting documents. "
                    "Extract Scope 3 emission categories mentioned in the text.",
                ),
                (
                    "human",
                    "Analyze this text and list all Scope 3 categories mentioned. "
                    "Use these exact category names:\n"
                    "- purchased_goods_services (Category 1)\n"
                    "- capital_goods (Category 2)\n"
                    "- fuel_energy_activities (Category 3)\n"
                    "- upstream_transport (Category 4)\n"
                    "- waste_generated (Category 5)\n"
                    "- business_travel (Category 6)\n"
                    "- employee_commuting (Category 7)\n"
                    "- upstream_leased_assets (Category 8)\n"
                    "- downstream_transport (Category 9)\n"
                    "- processing_sold_products (Category 10)\n"
                    "- use_of_sold_products (Category 11)\n"
                    "- end_of_life_treatment (Category 12)\n"
                    "- downstream_leased_assets (Category 13)\n"
                    "- franchises (Category 14)\n"
                    "- investments (Category 15)\n\n"
                    "Text: {text}\n\n"
                    "Return only the category names as a comma-separated list.",
                ),
            ],
        )

        chain: Any = prompt | self.llm
        response = chain.invoke({"text": text})

        # Parse response
        return [cat.strip() for cat in response.content.split(",") if cat.strip()]

    def identify_regulatory_framework(self, text: str) -> dict[str, Any]:
        """Identify regulatory framework and compliance requirements.

        Args:
        ----
            text: Document text

        Returns:
        -------
            Dict with framework, regulator, and compliance info

        """
        prompt = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    "You are a regulatory compliance expert analyzing environmental documents.",
                ),
                (
                    "human",
                    "Analyze this text and identify:\n"
                    "1. Regulatory framework (NGER, ISSB, GHG Protocol, GRI, etc.)\n"
                    "2. Regulatory authority (Clean Energy Regulator, EPA, etc.)\n"
                    "3. Whether it's reportable under NGER (yes/no)\n"
                    "4. Emission scopes covered (scope_1, scope_2, scope_3)\n\n"
                    "Text: {text}\n\n"
                    "Format your response as JSON with keys: frameworks, regulator, "
                    "reportable_under_nger, emission_scopes",
                ),
            ],
        )

        chain: Any = prompt | self.llm
        response = chain.invoke({"text": text})

        # Parse LLM response - extract structured data
        try:
            # Try to extract JSON from response content
            import json
            import re

            content = (
                response.content if hasattr(response, "content") else str(response)
            )

            # Extract JSON from markdown code blocks if present
            json_match = re.search(
                r"```(?:json)?\s*(\{.*?\})\s*```", content, re.DOTALL
            )
            if json_match:
                data = json.loads(json_match.group(1))
            else:
                # Try to parse entire content as JSON
                data = json.loads(content)

            return {
                "frameworks": data.get("frameworks", []),
                "regulator": data.get("regulator"),
                "reportable_under_nger": data.get("reportable_under_nger"),
                "emission_scopes": data.get("emission_scopes", []),
            }
        except (json.JSONDecodeError, AttributeError):
            # If parsing fails, return empty structure
            return {
                "frameworks": [],
                "regulator": None,
                "reportable_under_nger": None,
                "emission_scopes": [],
            }
__init__
__init__(model_name: str = 'gpt-4', temperature: float = 0.0)

Initialize custom prompt tagger.


model_name: OpenAI model to use
temperature: Model temperature
Source code in green_gov_rag/etl/metadata_tagger.py
def __init__(self, model_name: str = "gpt-4", temperature: float = 0.0):
    """Initialize custom prompt tagger.

    Args:
    ----
        model_name: OpenAI model to use
        temperature: Model temperature

    """
    from green_gov_rag.rag.llm_factory import get_llm

    self.llm: Any = get_llm(model=model_name, temperature=temperature)
extract_scope_3_categories
extract_scope_3_categories(text: str) -> list[str]

Extract Scope 3 categories from text using custom prompt.


text: Document text

List of Scope 3 category names
Source code in green_gov_rag/etl/metadata_tagger.py
def extract_scope_3_categories(self, text: str) -> list[str]:
    """Extract Scope 3 categories from text using custom prompt.

    Args:
    ----
        text: Document text

    Returns:
    -------
        List of Scope 3 category names

    """
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                "You are an ESG expert analyzing emissions reporting documents. "
                "Extract Scope 3 emission categories mentioned in the text.",
            ),
            (
                "human",
                "Analyze this text and list all Scope 3 categories mentioned. "
                "Use these exact category names:\n"
                "- purchased_goods_services (Category 1)\n"
                "- capital_goods (Category 2)\n"
                "- fuel_energy_activities (Category 3)\n"
                "- upstream_transport (Category 4)\n"
                "- waste_generated (Category 5)\n"
                "- business_travel (Category 6)\n"
                "- employee_commuting (Category 7)\n"
                "- upstream_leased_assets (Category 8)\n"
                "- downstream_transport (Category 9)\n"
                "- processing_sold_products (Category 10)\n"
                "- use_of_sold_products (Category 11)\n"
                "- end_of_life_treatment (Category 12)\n"
                "- downstream_leased_assets (Category 13)\n"
                "- franchises (Category 14)\n"
                "- investments (Category 15)\n\n"
                "Text: {text}\n\n"
                "Return only the category names as a comma-separated list.",
            ),
        ],
    )

    chain: Any = prompt | self.llm
    response = chain.invoke({"text": text})

    # Parse response
    return [cat.strip() for cat in response.content.split(",") if cat.strip()]
identify_regulatory_framework
identify_regulatory_framework(text: str) -> dict[str, Any]

Identify regulatory framework and compliance requirements.


text: Document text

Dict with framework, regulator, and compliance info
Source code in green_gov_rag/etl/metadata_tagger.py
def identify_regulatory_framework(self, text: str) -> dict[str, Any]:
    """Identify regulatory framework and compliance requirements.

    Args:
    ----
        text: Document text

    Returns:
    -------
        Dict with framework, regulator, and compliance info

    """
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                "You are a regulatory compliance expert analyzing environmental documents.",
            ),
            (
                "human",
                "Analyze this text and identify:\n"
                "1. Regulatory framework (NGER, ISSB, GHG Protocol, GRI, etc.)\n"
                "2. Regulatory authority (Clean Energy Regulator, EPA, etc.)\n"
                "3. Whether it's reportable under NGER (yes/no)\n"
                "4. Emission scopes covered (scope_1, scope_2, scope_3)\n\n"
                "Text: {text}\n\n"
                "Format your response as JSON with keys: frameworks, regulator, "
                "reportable_under_nger, emission_scopes",
            ),
        ],
    )

    chain: Any = prompt | self.llm
    response = chain.invoke({"text": text})

    # Parse LLM response - extract structured data
    try:
        # Try to extract JSON from response content
        import json
        import re

        content = (
            response.content if hasattr(response, "content") else str(response)
        )

        # Extract JSON from markdown code blocks if present
        json_match = re.search(
            r"```(?:json)?\s*(\{.*?\})\s*```", content, re.DOTALL
        )
        if json_match:
            data = json.loads(json_match.group(1))
        else:
            # Try to parse entire content as JSON
            data = json.loads(content)

        return {
            "frameworks": data.get("frameworks", []),
            "regulator": data.get("regulator"),
            "reportable_under_nger": data.get("reportable_under_nger"),
            "emission_scopes": data.get("emission_scopes", []),
        }
    except (json.JSONDecodeError, AttributeError):
        # If parsing fails, return empty structure
        return {
            "frameworks": [],
            "regulator": None,
            "reportable_under_nger": None,
            "emission_scopes": [],
        }

ESGOpenAITagger

Wrapper for OpenAIMetadataTagger with ESG-specific schemas.

Source code in green_gov_rag/etl/metadata_tagger.py
class ESGOpenAITagger:
    """Wrapper for OpenAIMetadataTagger with ESG-specific schemas."""

    def __init__(self):
        """Initialize ESG metadata tagger with predefined schemas."""
        # ESG Metadata Schema
        self.esg_schema = {
            "properties": {
                "emission_scope": {
                    "type": "string",
                    "enum": ["scope_1", "scope_2", "scope_3"],
                    "description": "Which emission scope this section covers",
                },
                "scope_3_categories": {
                    "type": "array",
                    "items": {
                        "type": "string",
                        "enum": [
                            "purchased_goods_services",
                            "capital_goods",
                            "fuel_energy_activities",
                            "upstream_transport",
                            "waste_generated",
                            "business_travel",
                            "employee_commuting",
                            "upstream_leased_assets",
                            "downstream_transport",
                            "processing_sold_products",
                            "use_of_sold_products",
                            "end_of_life_treatment",
                            "downstream_leased_assets",
                            "franchises",
                            "investments",
                        ],
                    },
                    "description": "Scope 3 categories covered (only if emission_scope is scope_3)",
                },
                "greenhouse_gases": {
                    "type": "array",
                    "items": {
                        "type": "string",
                        "enum": ["CO2", "CH4", "N2O", "SF6", "HFCs", "PFCs", "NF3"],
                    },
                    "description": "Greenhouse gases mentioned in this section",
                },
                "calculation_method": {
                    "type": "string",
                    "description": "Calculation methodology described (if any)",
                },
                "consolidation_approach": {
                    "type": "string",
                    "enum": [
                        "operational_control",
                        "equity_share",
                        "financial_control",
                    ],
                    "description": "Consolidation approach for emissions accounting",
                },
                "industry_applicability": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": "ANZSIC industry codes this applies to",
                },
                "regulatory_authority": {
                    "type": "string",
                    "description": "Which regulator enforces this",
                },
                "frameworks": {
                    "type": "array",
                    "items": {
                        "type": "string",
                        "enum": [
                            "NGER",
                            "ISSB",
                            "GHG_Protocol",
                            "GRI",
                            "TCFD",
                            "Safeguard_Mechanism",
                        ],
                    },
                    "description": "ESG frameworks or standards referenced",
                },
            },
            "required": ["emission_scope"],
        }

        # Document Metadata Schema
        self.doc_schema = {
            "properties": {
                "jurisdiction": {
                    "type": "string",
                    "enum": ["federal", "state", "local"],
                    "description": "Jurisdiction level of the document",
                },
                "category": {
                    "type": "string",
                    "enum": [
                        "environment",
                        "planning",
                        "legislation",
                        "regulation",
                        "guidelines",
                        "policy",
                    ],
                    "description": "Document category",
                },
                "topic": {
                    "type": "string",
                    "description": "Specific topic covered (e.g., emissions_reporting, biodiversity)",
                },
                "region": {
                    "type": "string",
                    "description": "Geographic region (state or territory name)",
                },
            },
        }

        # Initialize OpenAI taggers
        self.esg_tagger = OpenAIMetadataTagger(self.esg_schema)  # type: ignore[misc]
        self.doc_tagger = OpenAIMetadataTagger(self.doc_schema)  # type: ignore[misc]

    def tag_esg_metadata(self, documents: list[Document]) -> list[Document]:
        """Tag documents with ESG metadata.

        Args:
        ----
            documents: List of LangChain Documents

        Returns:
        -------
            Documents with ESG metadata added

        """
        return list(self.esg_tagger.transform_documents(documents))

    def tag_document_metadata(self, documents: list[Document]) -> list[Document]:
        """Tag documents with general metadata.

        Args:
        ----
            documents: List of LangChain Documents

        Returns:
        -------
            Documents with general metadata added

        """
        return list(self.doc_tagger.transform_documents(documents))

    def tag_all(self, documents: list[Document]) -> list[Document]:
        """Tag documents with both ESG and general metadata.

        Args:
        ----
            documents: List of LangChain Documents

        Returns:
        -------
            Fully tagged documents

        """
        # First tag with general metadata
        docs = self.tag_document_metadata(documents)

        # Then tag with ESG metadata
        return self.tag_esg_metadata(docs)
__init__
__init__()

Initialize ESG metadata tagger with predefined schemas.

Source code in green_gov_rag/etl/metadata_tagger.py
def __init__(self):
    """Initialize ESG metadata tagger with predefined schemas."""
    # ESG Metadata Schema
    self.esg_schema = {
        "properties": {
            "emission_scope": {
                "type": "string",
                "enum": ["scope_1", "scope_2", "scope_3"],
                "description": "Which emission scope this section covers",
            },
            "scope_3_categories": {
                "type": "array",
                "items": {
                    "type": "string",
                    "enum": [
                        "purchased_goods_services",
                        "capital_goods",
                        "fuel_energy_activities",
                        "upstream_transport",
                        "waste_generated",
                        "business_travel",
                        "employee_commuting",
                        "upstream_leased_assets",
                        "downstream_transport",
                        "processing_sold_products",
                        "use_of_sold_products",
                        "end_of_life_treatment",
                        "downstream_leased_assets",
                        "franchises",
                        "investments",
                    ],
                },
                "description": "Scope 3 categories covered (only if emission_scope is scope_3)",
            },
            "greenhouse_gases": {
                "type": "array",
                "items": {
                    "type": "string",
                    "enum": ["CO2", "CH4", "N2O", "SF6", "HFCs", "PFCs", "NF3"],
                },
                "description": "Greenhouse gases mentioned in this section",
            },
            "calculation_method": {
                "type": "string",
                "description": "Calculation methodology described (if any)",
            },
            "consolidation_approach": {
                "type": "string",
                "enum": [
                    "operational_control",
                    "equity_share",
                    "financial_control",
                ],
                "description": "Consolidation approach for emissions accounting",
            },
            "industry_applicability": {
                "type": "array",
                "items": {"type": "string"},
                "description": "ANZSIC industry codes this applies to",
            },
            "regulatory_authority": {
                "type": "string",
                "description": "Which regulator enforces this",
            },
            "frameworks": {
                "type": "array",
                "items": {
                    "type": "string",
                    "enum": [
                        "NGER",
                        "ISSB",
                        "GHG_Protocol",
                        "GRI",
                        "TCFD",
                        "Safeguard_Mechanism",
                    ],
                },
                "description": "ESG frameworks or standards referenced",
            },
        },
        "required": ["emission_scope"],
    }

    # Document Metadata Schema
    self.doc_schema = {
        "properties": {
            "jurisdiction": {
                "type": "string",
                "enum": ["federal", "state", "local"],
                "description": "Jurisdiction level of the document",
            },
            "category": {
                "type": "string",
                "enum": [
                    "environment",
                    "planning",
                    "legislation",
                    "regulation",
                    "guidelines",
                    "policy",
                ],
                "description": "Document category",
            },
            "topic": {
                "type": "string",
                "description": "Specific topic covered (e.g., emissions_reporting, biodiversity)",
            },
            "region": {
                "type": "string",
                "description": "Geographic region (state or territory name)",
            },
        },
    }

    # Initialize OpenAI taggers
    self.esg_tagger = OpenAIMetadataTagger(self.esg_schema)  # type: ignore[misc]
    self.doc_tagger = OpenAIMetadataTagger(self.doc_schema)  # type: ignore[misc]
tag_esg_metadata
tag_esg_metadata(documents: list[Document]) -> list[Document]

Tag documents with ESG metadata.


documents: List of LangChain Documents

Documents with ESG metadata added
Source code in green_gov_rag/etl/metadata_tagger.py
def tag_esg_metadata(self, documents: list[Document]) -> list[Document]:
    """Tag documents with ESG metadata.

    Args:
    ----
        documents: List of LangChain Documents

    Returns:
    -------
        Documents with ESG metadata added

    """
    return list(self.esg_tagger.transform_documents(documents))
tag_document_metadata
tag_document_metadata(documents: list[Document]) -> list[Document]

Tag documents with general metadata.


documents: List of LangChain Documents

Documents with general metadata added
Source code in green_gov_rag/etl/metadata_tagger.py
def tag_document_metadata(self, documents: list[Document]) -> list[Document]:
    """Tag documents with general metadata.

    Args:
    ----
        documents: List of LangChain Documents

    Returns:
    -------
        Documents with general metadata added

    """
    return list(self.doc_tagger.transform_documents(documents))
tag_all
tag_all(documents: list[Document]) -> list[Document]

Tag documents with both ESG and general metadata.


documents: List of LangChain Documents

Fully tagged documents
Source code in green_gov_rag/etl/metadata_tagger.py
def tag_all(self, documents: list[Document]) -> list[Document]:
    """Tag documents with both ESG and general metadata.

    Args:
    ----
        documents: List of LangChain Documents

    Returns:
    -------
        Fully tagged documents

    """
    # First tag with general metadata
    docs = self.tag_document_metadata(documents)

    # Then tag with ESG metadata
    return self.tag_esg_metadata(docs)

DB Writer

green_gov_rag.etl.db_writer

Database writer for ETL pipeline.

Supports tracking both local filesystem and cloud storage paths for documents and chunks with normalized schema: - document_sources: Config entries (1:many with files) - document_files: Individual PDFs - document_chunks: Text segments

create_document_id

create_document_id(source_url: str, title: str) -> str

Generate unique document ID from URL and title.

Source code in green_gov_rag/etl/db_writer.py
def create_document_id(source_url: str, title: str) -> str:
    """Generate unique document ID from URL and title."""
    content = f"{source_url}::{title}"
    return hashlib.md5(content.encode()).hexdigest()[:16]

save_document_source

save_document_source(title: str, source_url: str, jurisdiction: str, topic: str, region: Optional[str] = None, category: Optional[str] = None, metadata: Optional[dict] = None, esg_metadata: Optional[dict] = None, spatial_metadata: Optional[dict] = None, status: str = 'pending') -> DocumentSource

Save document source (config entry) to database.

Parameters:

Name Type Description Default
title str

Source title

required
source_url str

Source website URL (homepage)

required
jurisdiction str

Federal/State/Local

required
topic str

Document topic

required
region Optional[str]

Geographic region

None
category Optional[str]

Document category

None
metadata Optional[dict]

Additional metadata

None
esg_metadata Optional[dict]

ESG/emissions metadata (frameworks, scopes, gases, etc.)

None
spatial_metadata Optional[dict]

Spatial metadata (LGA codes, state, spatial scope, etc.)

None
status str

Processing status

'pending'

Returns:

Name Type Description
DocumentSource DocumentSource

Saved document source

Source code in green_gov_rag/etl/db_writer.py
def save_document_source(
    title: str,
    source_url: str,
    jurisdiction: str,
    topic: str,
    region: Optional[str] = None,
    category: Optional[str] = None,
    metadata: Optional[dict] = None,
    esg_metadata: Optional[dict] = None,
    spatial_metadata: Optional[dict] = None,
    status: str = "pending",
) -> DocumentSource:
    """Save document source (config entry) to database.

    Args:
        title: Source title
        source_url: Source website URL (homepage)
        jurisdiction: Federal/State/Local
        topic: Document topic
        region: Geographic region
        category: Document category
        metadata: Additional metadata
        esg_metadata: ESG/emissions metadata (frameworks, scopes, gases, etc.)
        spatial_metadata: Spatial metadata (LGA codes, state, spatial scope, etc.)
        status: Processing status

    Returns:
        DocumentSource: Saved document source
    """
    source_id = create_document_id(source_url, title)

    with Session(engine) as session:
        # Check if source already exists
        statement = select(DocumentSource).where(DocumentSource.id == source_id)
        existing_source = session.exec(statement).first()

        if existing_source:
            # Update existing source
            existing_source.title = title
            existing_source.source_url = source_url
            existing_source.jurisdiction = jurisdiction
            existing_source.topic = topic
            existing_source.region = region
            existing_source.category = category
            existing_source.metadata_ = metadata
            existing_source.esg_metadata = esg_metadata
            existing_source.spatial_metadata = spatial_metadata
            existing_source.status = status
            existing_source.updated_at = datetime.now(timezone.utc)

            session.add(existing_source)
            session.commit()
            session.refresh(existing_source)

            logger.info(f"Updated document source {source_id}")
            return existing_source
        else:
            # Create new source
            source = DocumentSource(
                id=source_id,
                title=title,
                source_url=source_url,
                jurisdiction=jurisdiction,
                topic=topic,
                region=region,
                category=category,
                metadata_=metadata,
                esg_metadata=esg_metadata,
                spatial_metadata=spatial_metadata,
                status=status,
            )

            session.add(source)
            session.commit()
            session.refresh(source)

            logger.info(f"Created document source {source_id}")
            return source

save_document_file

save_document_file(source_id: str, filename: str, file_url: str, content_hash: str, file_size_bytes: Optional[int] = None, file_metadata: Optional[dict] = None, status: str = 'pending') -> DocumentFile

Save document file (individual PDF) to database.

Parameters:

Name Type Description Default
source_id str

Parent document source ID

required
filename str

Original filename

required
file_url str

Direct download URL for this file

required
content_hash str

SHA256 hash of file content

required
file_size_bytes Optional[int]

File size in bytes

None
file_metadata Optional[dict]

File-specific metadata (page count, format, etc.)

None
status str

Processing status

'pending'

Returns:

Name Type Description
DocumentFile DocumentFile

Saved document file

Source code in green_gov_rag/etl/db_writer.py
def save_document_file(
    source_id: str,
    filename: str,
    file_url: str,
    content_hash: str,
    file_size_bytes: Optional[int] = None,
    file_metadata: Optional[dict] = None,
    status: str = "pending",
) -> DocumentFile:
    """Save document file (individual PDF) to database.

    Args:
        source_id: Parent document source ID
        filename: Original filename
        file_url: Direct download URL for this file
        content_hash: SHA256 hash of file content
        file_size_bytes: File size in bytes
        file_metadata: File-specific metadata (page count, format, etc.)
        status: Processing status

    Returns:
        DocumentFile: Saved document file
    """
    # Generate file_id from source_id and filename
    file_id = f"{source_id}_{hashlib.md5(filename.encode()).hexdigest()[:8]}"

    with Session(engine) as session:
        # Check if file already exists
        statement = select(DocumentFile).where(DocumentFile.id == file_id)
        existing_file = session.exec(statement).first()

        if existing_file:
            # Update existing file
            existing_file.filename = filename
            existing_file.file_url = file_url
            existing_file.content_hash = content_hash
            existing_file.file_size_bytes = file_size_bytes
            existing_file.file_metadata = file_metadata
            existing_file.status = status

            session.add(existing_file)
            session.commit()
            session.refresh(existing_file)

            logger.info(f"Updated document file {file_id}")
            return existing_file
        else:
            # Create new file
            doc_file = DocumentFile(
                id=file_id,
                source_id=source_id,
                filename=filename,
                file_url=file_url,
                content_hash=content_hash,
                file_size_bytes=file_size_bytes,
                file_metadata=file_metadata,
                status=status,
            )

            session.add(doc_file)
            session.commit()
            session.refresh(doc_file)

            logger.info(f"Created document file {file_id}")
            return doc_file

update_document_source_status

update_document_source_status(source_id: str, status: str, error_message: Optional[str] = None, chunk_count: Optional[int] = None, embedding_model: Optional[str] = None) -> None

Update document source processing status.

Parameters:

Name Type Description Default
source_id str

Document source ID

required
status str

New status (pending/processing/completed/failed)

required
error_message Optional[str]

Error message if failed

None
chunk_count Optional[int]

Number of chunks created

None
embedding_model Optional[str]

Embedding model used

None
Source code in green_gov_rag/etl/db_writer.py
def update_document_source_status(
    source_id: str,
    status: str,
    error_message: Optional[str] = None,
    chunk_count: Optional[int] = None,
    embedding_model: Optional[str] = None,
) -> None:
    """Update document source processing status.

    Args:
        source_id: Document source ID
        status: New status (pending/processing/completed/failed)
        error_message: Error message if failed
        chunk_count: Number of chunks created
        embedding_model: Embedding model used
    """
    with Session(engine) as session:
        statement = select(DocumentSource).where(DocumentSource.id == source_id)
        source = session.exec(statement).first()

        if source:
            source.status = status
            source.updated_at = datetime.now(timezone.utc)

            if error_message:
                source.error_message = error_message

            if chunk_count is not None:
                source.chunk_count = chunk_count

            if embedding_model:
                source.embedding_model = embedding_model

            if status == "completed":
                source.processed_at = datetime.now(timezone.utc)

            session.add(source)
            session.commit()

update_document_file_status

update_document_file_status(file_id: str, status: str, error_message: Optional[str] = None, chunk_count: Optional[int] = None) -> None

Update document file processing status.

Parameters:

Name Type Description Default
file_id str

Document file ID

required
status str

New status (pending/downloading/processing/completed/failed)

required
error_message Optional[str]

Error message if failed

None
chunk_count Optional[int]

Number of chunks from this file

None
Source code in green_gov_rag/etl/db_writer.py
def update_document_file_status(
    file_id: str,
    status: str,
    error_message: Optional[str] = None,
    chunk_count: Optional[int] = None,
) -> None:
    """Update document file processing status.

    Args:
        file_id: Document file ID
        status: New status (pending/downloading/processing/completed/failed)
        error_message: Error message if failed
        chunk_count: Number of chunks from this file
    """
    with Session(engine) as session:
        statement = select(DocumentFile).where(DocumentFile.id == file_id)
        doc_file = session.exec(statement).first()

        if doc_file:
            doc_file.status = status

            if error_message:
                doc_file.error_message = error_message

            if chunk_count is not None:
                doc_file.chunk_count = chunk_count

            if status == "completed":
                doc_file.processed_at = datetime.now(timezone.utc)
            elif status == "downloading":
                doc_file.downloaded_at = datetime.now(timezone.utc)

            session.add(doc_file)
            session.commit()

save_chunk

save_chunk(source_id: str, file_id: str, chunk_index: int, text: str, page_number: Optional[int] = None, page_range: Optional[list[int]] = None, section_title: Optional[str] = None, section_hierarchy: Optional[list[str]] = None, clause_reference: Optional[str] = None, source_pdf_url: Optional[str] = None, deep_link: Optional[str] = None, citation: Optional[str] = None, embedding_index: Optional[int] = None, embedding_model: Optional[str] = None, metadata: Optional[dict] = None) -> Chunk

Save text chunk to database.

Parameters:

Name Type Description Default
source_id str

Parent document source ID (config entry)

required
file_id str

Specific file this chunk came from

required
chunk_index int

Chunk position in document

required
text str

Chunk text

required
page_number Optional[int]

Page number if PDF

None
page_range Optional[list[int]]

Page range if chunk spans multiple pages [start, end]

None
section_title Optional[str]

Section title

None
section_hierarchy Optional[list[str]]

Full section hierarchy from document root

None
clause_reference Optional[str]

Legal clause/section reference (e.g., 's.3.2.1')

None
source_pdf_url Optional[str]

Direct PDF URL for deep linking

None
deep_link Optional[str]

Deep link to specific section/page in source document

None
citation Optional[str]

Formatted citation string for display

None
embedding_index Optional[int]

Index in vector store

None
embedding_model Optional[str]

Embedding model used

None
metadata Optional[dict]

Additional chunk metadata

None

Returns:

Name Type Description
Chunk Chunk

Saved chunk

Source code in green_gov_rag/etl/db_writer.py
def save_chunk(
    source_id: str,
    file_id: str,
    chunk_index: int,
    text: str,
    page_number: Optional[int] = None,
    page_range: Optional[list[int]] = None,
    section_title: Optional[str] = None,
    section_hierarchy: Optional[list[str]] = None,
    clause_reference: Optional[str] = None,
    source_pdf_url: Optional[str] = None,
    deep_link: Optional[str] = None,
    citation: Optional[str] = None,
    embedding_index: Optional[int] = None,
    embedding_model: Optional[str] = None,
    metadata: Optional[dict] = None,
) -> Chunk:
    """Save text chunk to database.

    Args:
        source_id: Parent document source ID (config entry)
        file_id: Specific file this chunk came from
        chunk_index: Chunk position in document
        text: Chunk text
        page_number: Page number if PDF
        page_range: Page range if chunk spans multiple pages [start, end]
        section_title: Section title
        section_hierarchy: Full section hierarchy from document root
        clause_reference: Legal clause/section reference (e.g., 's.3.2.1')
        source_pdf_url: Direct PDF URL for deep linking
        deep_link: Deep link to specific section/page in source document
        citation: Formatted citation string for display
        embedding_index: Index in vector store
        embedding_model: Embedding model used
        metadata: Additional chunk metadata

    Returns:
        Chunk: Saved chunk
    """
    with Session(engine) as session:
        # Ensure file_id is in metadata for citation verification
        if metadata is None:
            metadata = {}
        metadata = metadata.copy()  # Don't modify caller's dict
        metadata["file_id"] = file_id
        metadata["source_id"] = source_id

        # Check if chunk already exists (by file_id + chunk_index)
        statement = select(Chunk).where(
            Chunk.file_id == file_id, Chunk.chunk_index == chunk_index
        )
        existing_chunk = session.exec(statement).first()

        if existing_chunk:
            # Update existing chunk
            existing_chunk.source_id = source_id
            existing_chunk.text = text
            existing_chunk.char_count = len(text)
            existing_chunk.page_number = page_number
            existing_chunk.page_range = page_range
            existing_chunk.section_title = section_title
            existing_chunk.section_hierarchy = section_hierarchy
            existing_chunk.clause_reference = clause_reference
            existing_chunk.source_pdf_url = source_pdf_url
            existing_chunk.deep_link = deep_link
            existing_chunk.citation = citation
            existing_chunk.embedding_index = embedding_index
            existing_chunk.embedding_model = embedding_model
            existing_chunk.metadata_ = metadata

            session.add(existing_chunk)
            session.commit()
            session.refresh(existing_chunk)

            logger.debug(
                f"Updated chunk {file_id}[{chunk_index}] "
                f"(page: {page_number}, section: {section_title})"
            )
            return existing_chunk
        else:
            # Create new chunk
            chunk = Chunk(
                source_id=source_id,
                file_id=file_id,
                chunk_index=chunk_index,
                text=text,
                char_count=len(text),
                page_number=page_number,
                page_range=page_range,
                section_title=section_title,
                section_hierarchy=section_hierarchy,
                clause_reference=clause_reference,
                source_pdf_url=source_pdf_url,
                deep_link=deep_link,
                citation=citation,
                embedding_index=embedding_index,
                embedding_model=embedding_model,
                metadata_=metadata,
            )

            session.add(chunk)
            session.commit()
            session.refresh(chunk)

            logger.debug(
                f"Created chunk {file_id}[{chunk_index}] "
                f"(page: {page_number}, section: {section_title})"
            )
            return chunk

get_document_source_by_id

get_document_source_by_id(source_id: str) -> Optional[DocumentSource]

Get document source by ID.

Source code in green_gov_rag/etl/db_writer.py
def get_document_source_by_id(source_id: str) -> Optional[DocumentSource]:
    """Get document source by ID."""
    with Session(engine) as session:
        statement = select(DocumentSource).where(DocumentSource.id == source_id)
        return session.exec(statement).first()

get_document_file_by_id

get_document_file_by_id(file_id: str) -> Optional[DocumentFile]

Get document file by ID.

Source code in green_gov_rag/etl/db_writer.py
def get_document_file_by_id(file_id: str) -> Optional[DocumentFile]:
    """Get document file by ID."""
    with Session(engine) as session:
        statement = select(DocumentFile).where(DocumentFile.id == file_id)
        return session.exec(statement).first()

get_chunks_by_source

get_chunks_by_source(source_id: str) -> list[Chunk]

Get all chunks for a document source.

Source code in green_gov_rag/etl/db_writer.py
def get_chunks_by_source(source_id: str) -> list[Chunk]:
    """Get all chunks for a document source."""
    with Session(engine) as session:
        statement = select(Chunk).where(Chunk.source_id == source_id)
        return list(session.exec(statement).all())

get_chunks_by_file

get_chunks_by_file(file_id: str) -> list[Chunk]

Get all chunks for a document file.

Source code in green_gov_rag/etl/db_writer.py
def get_chunks_by_file(file_id: str) -> list[Chunk]:
    """Get all chunks for a document file."""
    with Session(engine) as session:
        statement = select(Chunk).where(Chunk.file_id == file_id)
        return list(session.exec(statement).all())

save_document_source_from_storage_metadata

save_document_source_from_storage_metadata(storage_metadata: dict[str, Any]) -> DocumentSource

Save document source to database from cloud storage metadata.

Parameters:

Name Type Description Default
storage_metadata dict[str, Any]

Metadata dict from ETL storage adapter

required

Returns:

Name Type Description
DocumentSource DocumentSource

Saved document source

Source code in green_gov_rag/etl/db_writer.py
def save_document_source_from_storage_metadata(
    storage_metadata: dict[str, Any],
) -> DocumentSource:
    """Save document source to database from cloud storage metadata.

    Args:
        storage_metadata: Metadata dict from ETL storage adapter

    Returns:
        DocumentSource: Saved document source
    """
    return save_document_source(
        title=storage_metadata.get("title", "Untitled"),
        source_url=storage_metadata.get("source_url", ""),
        jurisdiction=storage_metadata.get("jurisdiction", "unknown"),
        topic=storage_metadata.get("topic", "general"),
        region=storage_metadata.get("region"),
        category=storage_metadata.get("category"),
        metadata=storage_metadata,
        esg_metadata=storage_metadata.get("esg_metadata"),
        spatial_metadata=storage_metadata.get("spatial_metadata"),
        status="pending",
    )

save_chunks_from_storage

save_chunks_from_storage(source_id: str, file_id: str, chunks: list[dict[str, Any]], embedding_model: Optional[str] = None) -> list[Chunk]

Save chunks to database from cloud storage chunk data.

Parameters:

Name Type Description Default
source_id str

Parent document source ID

required
file_id str

Parent document file ID

required
chunks list[dict[str, Any]]

List of chunk dicts from ETL storage adapter

required
embedding_model Optional[str]

Optional embedding model name

None

Returns:

Type Description
list[Chunk]

List of saved Chunk objects

Source code in green_gov_rag/etl/db_writer.py
def save_chunks_from_storage(
    source_id: str,
    file_id: str,
    chunks: list[dict[str, Any]],
    embedding_model: Optional[str] = None,
) -> list[Chunk]:
    """Save chunks to database from cloud storage chunk data.

    Args:
        source_id: Parent document source ID
        file_id: Parent document file ID
        chunks: List of chunk dicts from ETL storage adapter
        embedding_model: Optional embedding model name

    Returns:
        List of saved Chunk objects
    """
    saved_chunks = []

    for i, chunk_data in enumerate(chunks):
        metadata = chunk_data.get("metadata", {})
        chunk = save_chunk(
            source_id=source_id,
            file_id=file_id,
            chunk_index=metadata.get("chunk_id", i),
            text=chunk_data.get("content", ""),
            page_number=metadata.get("page_number"),
            page_range=metadata.get("page_range"),
            section_title=metadata.get("section_title"),
            section_hierarchy=metadata.get("section_hierarchy"),
            clause_reference=metadata.get("clause_reference"),
            source_pdf_url=metadata.get("source_pdf_url"),
            deep_link=metadata.get("deep_link"),
            citation=metadata.get("citation"),
            embedding_model=embedding_model,
            metadata=metadata,
        )
        saved_chunks.append(chunk)

    logger.info(f"Saved {len(saved_chunks)} chunks for file {file_id}")
    return saved_chunks

save_document_version

save_document_version(file_id: str, source_id: str, content_hash: str, source_url: str, file_size_bytes: Optional[int] = None, change_type: str = 'new', remote_last_modified: Optional[datetime] = None, remote_etag: Optional[str] = None, metadata: Optional[dict] = None) -> DocumentVersion

Create a new document version record.

Parameters:

Name Type Description Default
file_id str

Specific document file ID

required
source_id str

Parent document source ID

required
content_hash str

SHA256 hash of document content

required
source_url str

URL where document was retrieved

required
file_size_bytes Optional[int]

File size in bytes

None
change_type str

Type of change ('new', 'updated', 'unchanged')

'new'
remote_last_modified Optional[datetime]

Last-Modified header from server

None
remote_etag Optional[str]

ETag header from server

None
metadata Optional[dict]

Additional version metadata

None

Returns:

Name Type Description
DocumentVersion DocumentVersion

Created version record

Source code in green_gov_rag/etl/db_writer.py
def save_document_version(
    file_id: str,
    source_id: str,
    content_hash: str,
    source_url: str,
    file_size_bytes: Optional[int] = None,
    change_type: str = "new",
    remote_last_modified: Optional[datetime] = None,
    remote_etag: Optional[str] = None,
    metadata: Optional[dict] = None,
) -> DocumentVersion:
    """Create a new document version record.

    Args:
        file_id: Specific document file ID
        source_id: Parent document source ID
        content_hash: SHA256 hash of document content
        source_url: URL where document was retrieved
        file_size_bytes: File size in bytes
        change_type: Type of change ('new', 'updated', 'unchanged')
        remote_last_modified: Last-Modified header from server
        remote_etag: ETag header from server
        metadata: Additional version metadata

    Returns:
        DocumentVersion: Created version record
    """
    with Session(engine) as session:
        # Get current version number for this file
        statement = (
            select(DocumentVersion)
            .where(DocumentVersion.file_id == file_id)
            .order_by(DocumentVersion.version_number.desc())  # type: ignore[attr-defined]
        )
        latest_version = session.exec(statement).first()

        version_number = (latest_version.version_number + 1) if latest_version else 1

        # Mark previous version as superseded if this is an update
        if latest_version and latest_version.is_current:
            latest_version.is_current = False
            latest_version.superseded_at = datetime.now(timezone.utc)
            session.add(latest_version)

        # Create new version
        version = DocumentVersion(
            file_id=file_id,
            source_id=source_id,
            version_number=version_number,
            content_hash=content_hash,
            source_url=source_url,
            file_size_bytes=file_size_bytes,
            change_type=change_type,
            remote_last_modified=remote_last_modified,
            remote_etag=remote_etag,
            metadata_=metadata,
            downloaded_at=datetime.now(timezone.utc),
            status="completed",
            is_current=True,
        )

        session.add(version)
        session.commit()
        session.refresh(version)

        logger.info(
            f"Created version {version_number} for file {file_id} "
            f"(source: {source_id}, type: {change_type})"
        )

        return version