Skip to main content

base_handler

Base handler class with common patterns for extraction handlers.

Extracts common logic for status updates, error handling, idempotency checks, and cross-service messaging.

Classes

BaseHandler

Base class for extraction handlers with common patterns.

Constructor:

def __init__(self, postgres: PostgresClient, storage: StorageClient, valkey: ValkeyClient, embedding: GraniteEmbeddingClient) -> None

Methods

check_document_deleted_or_cancelled

def check_document_deleted_or_cancelled(self, document_id: str, organization_id: str, trace_id: str = 'unknown') -> bool

Check if document was deleted or cancelled, and skip processing if so.

Args: document_id: Document UUID organization_id: Organization UUID trace_id: Trace ID for logging

Returns: True if document was deleted/cancelled (should skip processing), False otherwise

check_idempotency

def check_idempotency(self, document_id: str, organization_id: str, expected_status: str = 'completed') -> tuple[bool, str | None]

Check if document is already processed (idempotency check).

Args: document_id: Document UUID organization_id: Organization UUID expected_status: Status to check for (default: "completed")

Returns: Tuple of (is_already_processed, current_status)

publish_progress

def publish_progress(self, organization_id: str, document_id: str, stage: str, percent: int, metadata: dict[str, Any] | None = None, fast_extraction_status: str | None = None, docling_extraction_status: str | None = None) -> None

Publish progress update via Valkey.

Args: organization_id: Organization UUID document_id: Document UUID stage: Processing stage (e.g., "extracting", "chunking", "generating_embeddings") percent: Percent complete (0-100) metadata: Additional metadata to include

handle_error

def handle_error(self, document_id: str, organization_id: str, error: Exception, is_permanent: bool = False, trace_id: str = 'unknown') -> None

Handle processing error with consistent logging and status updates.

Args: document_id: Document UUID organization_id: Organization UUID error: Exception that occurred is_permanent: Whether error is permanent (should not retry) trace_id: Trace ID for logging

cleanup_on_failure

def cleanup_on_failure(self, document_id: str, organization_id: str) -> int

Clean up partial chunks on failure.

Args: document_id: Document UUID (or staging document ID) organization_id: Organization UUID

Returns: Number of chunks deleted

publish_to_docling_queue

def publish_to_docling_queue(self, message_data: dict[str, Any], docling_topic: str, kafka_publisher: Any) -> None

Publish message to Docling extraction queue.

Args: message_data: Message data to publish docling_topic: Kafka topic name for Docling queue kafka_publisher: KafkaPublisher instance for publishing