So I changed the temp_convert to return a tuple of bytes, and now am getting:
raise RuntimeError(result.error)
RuntimeError: org.apache.beam.sdk.coders.CoderException: `UnknownCoderWrapper`
was used to perform an actual decoding in the Java SDK. Potentially a Java
transform is being followed by a cross-language transform thatuses a coder that
is not available in the Java SDK. Please make sure that Python transforms at
the multi-language boundary use Beam portable coders.
I am using the DirectRunner
Thanks!
From: Jaehyeon Kim <[email protected]>
Sent: Thursday, October 3, 2024 3:57 PM
To: [email protected]
Subject: Re: How to write to a Kafka topic?
You don't often get email from [email protected]<mailto:[email protected]>.
Learn why this is important<https://aka.ms/LearnAboutSenderIdentification>
Hello,
It doesn't seem that the following transform outputs a tuple of bytes.
| "temp convert" >> beam.Map(temp_convert)
You should convert the output into something like ('key'.encode('utf-8'),
'value'.encode('utf-8')).
I wrote a simple post about Kafka I/O read/write on the Flink Runner, and hope
it helps - https://jaehyeon.me/blog/2024-04-18-beam-local-dev-3/
Cheers,
Jaehyeon
On Fri, 4 Oct 2024 at 08:21, Henry Tremblay via user
<[email protected]<mailto:[email protected]>> wrote:
I am creating a simple pipeline to read and then write to Kafka. Here is my
code:
156 with Pipeline(options=pipeline_options) as pipeline:
157 main = (
158 pipeline
159 | ReadFromKafka(
160 consumer_config={'bootstrap.servers':
configs['bootstrap_servers'],
161 'group.id<http://group.id>': 'my-group',
162 'isolation.level': 'read_uncommitted',
163 },
164 topics= configs['topics'],
165 max_num_records = max_num_records,
166 commit_offset_in_finalize = True,
167 with_metadata=True)
168 | "temp convert" >> beam.Map(temp_convert)
169 | "Write to Kafka" >> WriteToKafka(
170 producer_config={'bootstrap.servers':
configs['bootstrap_servers']},
171 topic=configs['out_topic'],
172 )
173 )
This is giving me
RuntimeError: java.lang.ClassCastException: class
org.apache.beam.sdk.util.construction.UnknownCoderWrapper cannot be cast to
class org.apache.beam.sdk.coders.KvCoder
(org.apache.beam.sdk.util.construction.UnknownCoderWrapper and
org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app')’
Can someone point me to a simple snipet for writing to Kafka in Python? I have
looked in vain on the web.
Thanks!