[ 
https://issues.apache.org/jira/browse/KAFKA-8486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-8486.
------------------------------------
    Resolution: Invalid

> How to commit offset via Kafka 
> -------------------------------
>
>                 Key: KAFKA-8486
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8486
>             Project: Kafka
>          Issue Type: Wish
>          Components: consumer
>    Affects Versions: 2.2.1
>            Reporter: Stanislav 
>            Priority: Trivial
>
> from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
>  from json import loads
>  import vertica_python
>  from datetime import datetime as dt
>  from time import sleep
> conn_vertica =
> {'host': '', 'port': 5433, 'user': '', 'password': '', 'database': '', 
> 'use_prepared_statements': True}
> conn_to = conn_vertica
> def load():(
>  parsed_topic_name = 'orderSummary'
> consumer = KafkaConsumer(parsed_topic_name, auto_offset_reset='earliest',
>  bootstrap_servers=['us-kafka-broker:9092'],
>  enable_auto_commit=False,
>  group_id="my_group",
>  value_deserializer=lambda x: loads(x.decode('utf-8'))
>  )
>  timeout = 20
>  max_len = 10
>  res = []
>  t1 = dt.now()
>  while (dt.now()-t1).seconds < timeout or len(res) < max_len:
>  msgs = consumer.poll()
>  print(msgs)
>  for v in msgs.values():(
>  res += v
> with vertica_python.connect(**conn_to) as conn_2:
>  curs2 = conn_2.cursor()
>  if res:
>  curs2.executemany('''
>  INSERT INTO stage.FS_Orders_from_kafka (load_dtm,topic_name, partition_id, 
> "offset", value) 
>  VALUES (?, ?, ?, ?, ?)''', [(r.timestamp, r.topic, r.partition, r.offset, 
> r.value) for r in res])
>  curs2.execute('COMMIT')
> else:
>  print('Nothing!')
> consumer.close()
>  #sleep(5)
> load()



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to