[ https://issues.apache.org/jira/browse/KAFKA-8486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax resolved KAFKA-8486. ------------------------------------ Resolution: Invalid > How to commit offset via Kafka > ------------------------------- > > Key: KAFKA-8486 > URL: https://issues.apache.org/jira/browse/KAFKA-8486 > Project: Kafka > Issue Type: Wish > Components: consumer > Affects Versions: 2.2.1 > Reporter: Stanislav > Priority: Trivial > > from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata > from json import loads > import vertica_python > from datetime import datetime as dt > from time import sleep > conn_vertica = > {'host': '', 'port': 5433, 'user': '', 'password': '', 'database': '', > 'use_prepared_statements': True} > conn_to = conn_vertica > def load():( > parsed_topic_name = 'orderSummary' > consumer = KafkaConsumer(parsed_topic_name, auto_offset_reset='earliest', > bootstrap_servers=['us-kafka-broker:9092'], > enable_auto_commit=False, > group_id="my_group", > value_deserializer=lambda x: loads(x.decode('utf-8')) > ) > timeout = 20 > max_len = 10 > res = [] > t1 = dt.now() > while (dt.now()-t1).seconds < timeout or len(res) < max_len: > msgs = consumer.poll() > print(msgs) > for v in msgs.values():( > res += v > with vertica_python.connect(**conn_to) as conn_2: > curs2 = conn_2.cursor() > if res: > curs2.executemany(''' > INSERT INTO stage.FS_Orders_from_kafka (load_dtm,topic_name, partition_id, > "offset", value) > VALUES (?, ?, ?, ?, ?)''', [(r.timestamp, r.topic, r.partition, r.offset, > r.value) for r in res]) > curs2.execute('COMMIT') > else: > print('Nothing!') > consumer.close() > #sleep(5) > load() -- This message was sent by Atlassian JIRA (v7.6.3#76005)