kafka
Kafka base classes for consumer and publisher.
Provides reusable Kafka consumer and publisher implementations using confluent-kafka.
Classes
ProcessingResult
Result of message processing.
CompletedTask
Represents a completed processing task to be handled on the poll thread.
BaseConsumer
Base Kafka consumer with ThreadPoolExecutor for async processing.
This is a reusable base class that can be extended for specific topics. Handles message validation, async processing, backpressure, and graceful shutdown.
Constructor:
def __init__(self, config: Any, topic: str, consumer_group: str, message_handler: Callable[[dict], None], max_workers: int, max_messages: int, dlq_topic: Optional[str] = None, required_fields: Optional[list[str]] = None) -> None
Methods
start
def start(self) -> None
Start listening for messages with async processing.
stop
def stop(self) -> None
Stop the consumer.
KafkaPublisher
Kafka publisher for cross-service messaging.
Constructor:
def __init__(self, config: Any) -> None
Methods
publish
def publish(self, topic: str, message: dict[str, Any], headers: dict[str, str] | None = None) -> None
Publish a message to a Kafka topic.
Args: topic: Topic name message: Message payload (will be JSON serialized) headers: Optional message headers
flush
def flush(self, timeout: float = 5.0) -> None
Flush pending messages.
Args: timeout: Maximum time to wait for flush
close
def close(self) -> None
Close the producer.