Skip to content Skip to sidebar Skip to footer

How To Connect Python Consumer To Aws Msk

I'm trying to connect my python consumer to AWS MSK cluster. how can I do that? Have an AWS MSK Cluster running I'm trying consume message from the MSK cluster using python and kaf

Solution 1:

Using kafka-python:

from kafka import KafkaConsumer

if __name__ == '__main__':
    topic_name = 'example-topic'

    consumer = KafkaConsumer(topic_name, auto_offset_reset='earliest',
                             bootstrap_servers=['kafka2:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
    for msg in consumer:
        print(msg.value)

    if consumer is not None:
        consumer.close()

from time import sleep

from kafka import KafkaProducer

# publish messages on topic
def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message ' + key + ' published successfully.')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))

# establish kafka connection
def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['kafka1:9092'])
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer

if __name__ == '__main__':
    kafka_producer = connect_kafka_producer()
    x = 0
    while True:
        publish_message(kafka_producer, 'raw_recipes', str(x), 'This is message ' + str(x))
        x += 1
    
    if kafka_producer is not None:
            kafka_producer.close()

Post a Comment for "How To Connect Python Consumer To Aws Msk"