Skip to main content

kafka

Kafka base classes for consumer and publisher.

Provides reusable Kafka consumer and publisher implementations using confluent-kafka.

Classes

ProcessingResult

Result of message processing.

CompletedTask

Represents a completed processing task to be handled on the poll thread.

BaseConsumer

Base Kafka consumer with ThreadPoolExecutor for async processing.

This is a reusable base class that can be extended for specific topics. Handles message validation, async processing, backpressure, and graceful shutdown.

Constructor:

def __init__(self, config: Any, topic: str, consumer_group: str, message_handler: Callable[[dict], None], max_workers: int, max_messages: int, dlq_topic: Optional[str] = None, required_fields: Optional[list[str]] = None) -> None

Methods

start

def start(self) -> None

Start listening for messages with async processing.

stop

def stop(self) -> None

Stop the consumer.

KafkaPublisher

Kafka publisher for cross-service messaging.

Constructor:

def __init__(self, config: Any) -> None

Methods

publish

def publish(self, topic: str, message: dict[str, Any], headers: dict[str, str] | None = None) -> None

Publish a message to a Kafka topic.

Args: topic: Topic name message: Message payload (will be JSON serialized) headers: Optional message headers

flush

def flush(self, timeout: float = 5.0) -> None

Flush pending messages.

Args: timeout: Maximum time to wait for flush

close

def close(self) -> None

Close the producer.