KafkaThreadManager¶
KafkaThreadManager is a utility class to manage a background thread that consumes messages from a Kafka topic using a given Kafka consumer and a user-defined callback.
1. Initialization¶
from cofmupy.data_stream_handler.kafka_utils import KafkaThreadManager
manager = KafkaThreadManager(
consumer=my_kafka_consumer,
callback=my_callback_function,
thread_lifetime=40 # optional, default 40 seconds
)
Parameters¶
-
consumer(Kafka consumer object) -
Typically a
confluent_kafka.Consumer. -
Handles connection and message polling from Kafka.
-
callback(function) -
Function to process each consumed message.
- Signature:
callback(msg)wheremsgis a Kafka message object. -
Error handling should be implemented in the callback.
-
thread_lifetime(float, optional, default40) -
Maximum time (in seconds) the consuming thread should run.
- If
Noneor0, the thread runs indefinitely untilstop()is called.
2. Start the consumer thread¶
manager.start()
- Spawns a daemon thread to run
_consume_loop. - Polls messages from Kafka and calls
callback(msg)for each message. - Logs when the thread starts:
Kafka consumer started consuming in thread '<thread_name>'
3. Consuming loop (_consume_loop)¶
- Runs internally in a separate thread.
-
Logic:
- Track elapsed time (if
thread_lifetimeis set). - Poll Kafka messages with
consumer.poll(timeout=1).
Note:pollis aconfluent_kafka.Consumermethod. - If a message is received, call
callback(msg). -
Stops when:
- Thread lifetime is exceeded, or
stop()is called manually.
- Track elapsed time (if
-
Logs errors during polling and a message when the thread stops.
4. Stop the thread gracefully¶
manager.stop()
- Sets
running = False. - Waits for the thread to finish (
thread.join()). - Closes the Kafka consumer.
- Logs:
"Kafka consumer thread stopped."
⚠️ Notes¶
- The callback function must handle its own exceptions; otherwise, they propagate to the consuming thread.
- Thread is a daemon, so it won’t block Python shutdown if the main program exits.
- Use
thread_lifetimeto limit how long the consumer runs automatically; otherwise, callstop()manually.