base_handler
Base handler class with common patterns for extraction handlers.
Extracts common logic for status updates, error handling, idempotency checks, and cross-service messaging.
Classes
BaseHandler
Base class for extraction handlers with common patterns.
Constructor:
def __init__(self, postgres: PostgresClient, storage: StorageClient, valkey: ValkeyClient, embedding: GraniteEmbeddingClient) -> None
Methods
check_document_deleted_or_cancelled
def check_document_deleted_or_cancelled(self, document_id: str, organization_id: str, trace_id: str = 'unknown') -> bool
Check if document was deleted or cancelled, and skip processing if so.
Args: document_id: Document UUID organization_id: Organization UUID trace_id: Trace ID for logging
Returns: True if document was deleted/cancelled (should skip processing), False otherwise
check_idempotency
def check_idempotency(self, document_id: str, organization_id: str, expected_status: str = 'completed') -> tuple[bool, str | None]
Check if document is already processed (idempotency check).
Args: document_id: Document UUID organization_id: Organization UUID expected_status: Status to check for (default: "completed")
Returns: Tuple of (is_already_processed, current_status)
publish_progress
def publish_progress(self, organization_id: str, document_id: str, stage: str, percent: int, metadata: dict[str, Any] | None = None, fast_extraction_status: str | None = None, docling_extraction_status: str | None = None) -> None
Publish progress update via Valkey.
Args: organization_id: Organization UUID document_id: Document UUID stage: Processing stage (e.g., "extracting", "chunking", "generating_embeddings") percent: Percent complete (0-100) metadata: Additional metadata to include
handle_error
def handle_error(self, document_id: str, organization_id: str, error: Exception, is_permanent: bool = False, trace_id: str = 'unknown') -> None
Handle processing error with consistent logging and status updates.
Args: document_id: Document UUID organization_id: Organization UUID error: Exception that occurred is_permanent: Whether error is permanent (should not retry) trace_id: Trace ID for logging
cleanup_on_failure
def cleanup_on_failure(self, document_id: str, organization_id: str) -> int
Clean up partial chunks on failure.
Args: document_id: Document UUID (or staging document ID) organization_id: Organization UUID
Returns: Number of chunks deleted
publish_to_docling_queue
def publish_to_docling_queue(self, message_data: dict[str, Any], docling_topic: str, kafka_publisher: Any) -> None
Publish message to Docling extraction queue.
Args: message_data: Message data to publish docling_topic: Kafka topic name for Docling queue kafka_publisher: KafkaPublisher instance for publishing