postgres
PostgreSQL client for storing documents and chunks with embeddings.
Connects to managed PostgreSQL database with SSL authentication. Handles both RAG database (chunks) and app database (documents, locations, folders). Implements Row-Level Security (RLS) for organization isolation.
Classes
PostgresClient
Client for interacting with PostgreSQL database.
Constructor:
def __init__(self, config: Any) -> None
Methods
get_connection
def get_connection(self, organization_id: str | None = None, use_app_db: bool = False) -> Iterator[pg8000.dbapi.Connection]
Get a database connection with RLS context.
Uses pg8000 driver with SSL for managed PostgreSQL database.
If organization_id is provided, sets the organization context for RLS. If organization_id is None, RLS is bypassed ONLY in development environment. In production, organization_id is required for security.
Args: organization_id: Optional organization UUID to set RLS context. If None, RLS is bypassed (only allowed in development). use_app_db: If True, connect to app database (coda_app). If False, connect to RAG database (coda_rag).
Yields: pg8000 PostgreSQL connection
Raises: ValueError: If organization_id is None and not in development environment
create_or_update_document
def create_or_update_document(self, document_id: str, organization_id: str, storage_path: str, file_size_bytes: int | None = None, content_type: str | None = None, content_hash: str | None = None, total_pages: int | None = None, metadata: dict[str, Any] | None = None) -> None
Create or update a document record.
Args: document_id: Document UUID organization_id: Organization UUID storage_path: Object storage path (relative path in Spaces bucket) file_size_bytes: File size in bytes content_type: MIME type content_hash: SHA256 hash for deduplication total_pages: Total page count (stored in metadata JSONB as 'page_count') metadata: Additional metadata as JSONB (extraction details in metadata.extraction)
update_document_status
def update_document_status(self, document_id: str, status: str, organization_id: str, metadata: dict[str, Any] | None = None) -> None
Update document processing status.
Args: document_id: Document UUID status: New overall status (pending, extracting, completed, failed) organization_id: Organization UUID for RLS metadata: Optional metadata to merge (extraction details go in metadata.extraction)
get_fast_extraction_status
def get_fast_extraction_status(self, document_id: str, organization_id: str | None = None) -> str | None
Get Phase 1 (fast) extraction status from metadata.extraction.
get_docling_extraction_status
def get_docling_extraction_status(self, document_id: str, organization_id: str | None = None) -> str | None
Get Phase 2 (Docling) extraction status from metadata.extraction.
update_fast_extraction_status
def update_fast_extraction_status(self, document_id: str, fast_status: str, organization_id: str, metadata: dict[str, Any] | None = None) -> None
Update Phase 1 (fast) extraction status and keep overall status valid.
Overall status semantics:
- fast_extracting -> status becomes 'extracting'
- fast_completed -> status becomes 'completed' (document is queryable)
Stores fast_extraction_status in metadata.extraction JSONB field.
update_docling_extraction_status
def update_docling_extraction_status(self, document_id: str, docling_status: str, organization_id: str, metadata: dict[str, Any] | None = None) -> None
Update Phase 2 (Docling) extraction status and keep overall status valid.
Overall status semantics:
- If fast extraction completed, overall status remains 'completed'
- If fast extraction did not complete:
- docling_completed -> overall status becomes 'completed'
- docling_failed -> overall status becomes 'failed'
Stores docling_extraction_status, docling_queued_at, and docling_completed_at in metadata.extraction JSONB field.
store_chunks
def store_chunks(self, document_id: str, chunks: list[dict[str, Any]], organization_id: str, max_chunk_size: int = 500) -> list[str]
Store document chunks with embeddings using batch insert with idempotency.
Uses multi-row VALUES INSERT with ON CONFLICT DO UPDATE for idempotent writes. This enables batch processing with resume capability - replaying the same batch updates existing chunks instead of creating duplicates.
Performance: Single DB round-trip for entire batch (40-80× faster than individual inserts).
Args: document_id: Document UUID chunks: List of chunk dictionaries with:
- chunk_index: int (required)
- text: str (required)
- embedding: list[float] (768 dimensions, required)
- page_numbers: list[int] (optional)
- section_reference: dict (optional)
- structure_metadata: dict (optional)
- character_range: dict (optional)
- token_count: int (optional) organization_id: Organization UUID for RLS max_chunk_size: Maximum chunks per INSERT to avoid PostgreSQL 65k param limit
Returns: List of created/updated chunk UUIDs
Raises: ValueError: If chunks fail validation (empty embeddings, wrong dimensions)
Note: Requires unique constraint on (document_id, chunk_index) for ON CONFLICT. See migration: infrastructure/database/migrations/001_add_chunk_unique_constraint.sql
get_document_status
def get_document_status(self, document_id: str, organization_id: str | None = None) -> str | None
Get document processing status.
Args: document_id: Document UUID organization_id: Organization UUID for RLS
Returns: Status string or None if not found
get_document_metadata
def get_document_metadata(self, document_id: str, organization_id: str | None = None) -> dict[str, Any] | None
Fetch document metadata JSON for checkpoint/resume logic.
cleanup_document_chunks
def cleanup_document_chunks(self, document_id: str, organization_id: str) -> int
Delete all chunks for a document.
Args: document_id: Document UUID organization_id: Organization UUID for RLS
Returns: Number of chunks deleted
has_chunks
def has_chunks(self, document_id: str, organization_id: str | None = None) -> bool
Check if document has chunks stored.
Args: document_id: Document UUID organization_id: Organization UUID for RLS (optional in development)
Returns: True if document has chunks, False otherwise
is_document_stale
def is_document_stale(self, document_id: str, status: str, minutes: int, organization_id: str | None = None) -> bool
Check if document has been in a status for longer than specified minutes.
Args: document_id: Document UUID status: Status to check minutes: Number of minutes to consider stale organization_id: Organization UUID for RLS (optional in development)
Returns: True if document is stale, False otherwise
atomic_swap_chunks
def atomic_swap_chunks(self, target_document_id: str, staging_document_id: str, organization_id: str) -> None
Atomically replace a document's chunks with staged chunks.
This is used by Phase 2 (Docling) to generate chunks under a staging document_id and then swap them in a single transaction.
Safety:
- If staging has zero chunks, the swap is aborted (old chunks are preserved).
- The operation is transactional (readers see either old or new after commit).