I am creating a simple pipeline to read and then write to Kafka. Here is my
code:
156 with Pipeline(options=pipeline_options) as pipeline:
157 main = (
158 pipeline
159 | ReadFromKafka(
160 consumer_config={'bootstrap.servers':
configs['bootstrap_servers'],
161 'group.id': 'my-group',
162 'isolation.level': 'read_uncommitted',
163 },
164 topics= configs['topics'],
165 max_num_records = max_num_records,
166 commit_offset_in_finalize = True,
167 with_metadata=True)
168 | "temp convert" >> beam.Map(temp_convert)
169 | "Write to Kafka" >> WriteToKafka(
170 producer_config={'bootstrap.servers':
configs['bootstrap_servers']},
171 topic=configs['out_topic'],
172 )
173 )
This is giving me
RuntimeError: java.lang.ClassCastException: class
org.apache.beam.sdk.util.construction.UnknownCoderWrapper cannot be cast to
class org.apache.beam.sdk.coders.KvCoder
(org.apache.beam.sdk.util.construction.UnknownCoderWrapper and
org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app')'
Can someone point me to a simple snipet for writing to Kafka in Python? I have
looked in vain on the web.
Thanks!