Skip to main content

postgres

PostgreSQL client for storing documents and chunks with embeddings.

Connects to managed PostgreSQL database with SSL authentication. Handles both RAG database (chunks) and app database (documents, locations, folders). Implements Row-Level Security (RLS) for organization isolation.

Classes

PostgresClient

Client for interacting with PostgreSQL database.

Constructor:

def __init__(self, config: Any) -> None

Methods

get_connection

def get_connection(self, organization_id: str | None = None, use_app_db: bool = False) -> Iterator[pg8000.dbapi.Connection]

Get a database connection with RLS context.

Uses pg8000 driver with SSL for managed PostgreSQL database.

If organization_id is provided, sets the organization context for RLS. If organization_id is None, RLS is bypassed ONLY in development environment. In production, organization_id is required for security.

Args: organization_id: Optional organization UUID to set RLS context. If None, RLS is bypassed (only allowed in development). use_app_db: If True, connect to app database (coda_app). If False, connect to RAG database (coda_rag).

Yields: pg8000 PostgreSQL connection

Raises: ValueError: If organization_id is None and not in development environment

create_or_update_document

def create_or_update_document(self, document_id: str, organization_id: str, storage_path: str, file_size_bytes: int | None = None, content_type: str | None = None, content_hash: str | None = None, total_pages: int | None = None, metadata: dict[str, Any] | None = None) -> None

Create or update a document record.

Args: document_id: Document UUID organization_id: Organization UUID storage_path: Object storage path (relative path in Spaces bucket) file_size_bytes: File size in bytes content_type: MIME type content_hash: SHA256 hash for deduplication total_pages: Total page count (stored in metadata JSONB as 'page_count') metadata: Additional metadata as JSONB (extraction details in metadata.extraction)

update_document_status

def update_document_status(self, document_id: str, status: str, organization_id: str, metadata: dict[str, Any] | None = None) -> None

Update document processing status.

Args: document_id: Document UUID status: New overall status (pending, extracting, completed, failed) organization_id: Organization UUID for RLS metadata: Optional metadata to merge (extraction details go in metadata.extraction)

get_fast_extraction_status

def get_fast_extraction_status(self, document_id: str, organization_id: str | None = None) -> str | None

Get Phase 1 (fast) extraction status from metadata.extraction.

get_docling_extraction_status

def get_docling_extraction_status(self, document_id: str, organization_id: str | None = None) -> str | None

Get Phase 2 (Docling) extraction status from metadata.extraction.

update_fast_extraction_status

def update_fast_extraction_status(self, document_id: str, fast_status: str, organization_id: str, metadata: dict[str, Any] | None = None) -> None

Update Phase 1 (fast) extraction status and keep overall status valid.

Overall status semantics:

  • fast_extracting -> status becomes 'extracting'
  • fast_completed -> status becomes 'completed' (document is queryable)

Stores fast_extraction_status in metadata.extraction JSONB field.

update_docling_extraction_status

def update_docling_extraction_status(self, document_id: str, docling_status: str, organization_id: str, metadata: dict[str, Any] | None = None) -> None

Update Phase 2 (Docling) extraction status and keep overall status valid.

Overall status semantics:

  • If fast extraction completed, overall status remains 'completed'
  • If fast extraction did not complete:
    • docling_completed -> overall status becomes 'completed'
    • docling_failed -> overall status becomes 'failed'

Stores docling_extraction_status, docling_queued_at, and docling_completed_at in metadata.extraction JSONB field.

store_chunks

def store_chunks(self, document_id: str, chunks: list[dict[str, Any]], organization_id: str, max_chunk_size: int = 500) -> list[str]

Store document chunks with embeddings using batch insert with idempotency.

Uses multi-row VALUES INSERT with ON CONFLICT DO UPDATE for idempotent writes. This enables batch processing with resume capability - replaying the same batch updates existing chunks instead of creating duplicates.

Performance: Single DB round-trip for entire batch (40-80× faster than individual inserts).

Args: document_id: Document UUID chunks: List of chunk dictionaries with:

  • chunk_index: int (required)
  • text: str (required)
  • embedding: list[float] (768 dimensions, required)
  • page_numbers: list[int] (optional)
  • section_reference: dict (optional)
  • structure_metadata: dict (optional)
  • character_range: dict (optional)
  • token_count: int (optional) organization_id: Organization UUID for RLS max_chunk_size: Maximum chunks per INSERT to avoid PostgreSQL 65k param limit

Returns: List of created/updated chunk UUIDs

Raises: ValueError: If chunks fail validation (empty embeddings, wrong dimensions)

Note: Requires unique constraint on (document_id, chunk_index) for ON CONFLICT. See migration: infrastructure/database/migrations/001_add_chunk_unique_constraint.sql

get_document_status

def get_document_status(self, document_id: str, organization_id: str | None = None) -> str | None

Get document processing status.

Args: document_id: Document UUID organization_id: Organization UUID for RLS

Returns: Status string or None if not found

get_document_metadata

def get_document_metadata(self, document_id: str, organization_id: str | None = None) -> dict[str, Any] | None

Fetch document metadata JSON for checkpoint/resume logic.

cleanup_document_chunks

def cleanup_document_chunks(self, document_id: str, organization_id: str) -> int

Delete all chunks for a document.

Args: document_id: Document UUID organization_id: Organization UUID for RLS

Returns: Number of chunks deleted

has_chunks

def has_chunks(self, document_id: str, organization_id: str | None = None) -> bool

Check if document has chunks stored.

Args: document_id: Document UUID organization_id: Organization UUID for RLS (optional in development)

Returns: True if document has chunks, False otherwise

is_document_stale

def is_document_stale(self, document_id: str, status: str, minutes: int, organization_id: str | None = None) -> bool

Check if document has been in a status for longer than specified minutes.

Args: document_id: Document UUID status: Status to check minutes: Number of minutes to consider stale organization_id: Organization UUID for RLS (optional in development)

Returns: True if document is stale, False otherwise

atomic_swap_chunks

def atomic_swap_chunks(self, target_document_id: str, staging_document_id: str, organization_id: str) -> None

Atomically replace a document's chunks with staged chunks.

This is used by Phase 2 (Docling) to generate chunks under a staging document_id and then swap them in a single transaction.

Safety:

  • If staging has zero chunks, the swap is aborted (old chunks are preserved).
  • The operation is transactional (readers see either old or new after commit).