I am creating a simple pipeline to read and then write to Kafka. Here is my 
code:
156     with Pipeline(options=pipeline_options) as pipeline:
157         main = (
158             pipeline
159             | ReadFromKafka(
160                 consumer_config={'bootstrap.servers': 
configs['bootstrap_servers'],
161                     'group.id': 'my-group',
162                     'isolation.level': 'read_uncommitted',
163                     },
164                 topics= configs['topics'],
165                 max_num_records = max_num_records,
166                 commit_offset_in_finalize = True,
167                 with_metadata=True)
168             | "temp convert" >> beam.Map(temp_convert)
169             | "Write to Kafka" >>  WriteToKafka(
170                 producer_config={'bootstrap.servers': 
configs['bootstrap_servers']},
171                     topic=configs['out_topic'],
172                     )
173             )


This is giving me

RuntimeError: java.lang.ClassCastException: class 
org.apache.beam.sdk.util.construction.UnknownCoderWrapper cannot be cast to 
class org.apache.beam.sdk.coders.KvCoder 
(org.apache.beam.sdk.util.construction.UnknownCoderWrapper and 
org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app')'

Can someone point me to a simple snipet for writing to Kafka in Python? I have 
looked in vain on the web.

Thanks!

Reply via email to