Hello,
It doesn't seem that the following transform outputs a tuple of bytes.
| "temp convert" >> beam.Map(temp_convert)
You should convert the output into something like ('key'.encode('utf-8'),
'value'.encode('utf-8')).
I wrote a simple post about Kafka I/O read/write on the Flink Runner, and
hope it helps - https://jaehyeon.me/blog/2024-04-18-beam-local-dev-3/
Cheers,
Jaehyeon
On Fri, 4 Oct 2024 at 08:21, Henry Tremblay via user <[email protected]>
wrote:
> I am creating a simple pipeline to read and then write to Kafka. Here is
> my code:
>
> 156 *with* Pipeline(options=pipeline_options) *as* pipeline:
>
> 157 main = (
>
> 158 pipeline
>
> 159 | ReadFromKafka(
>
> 160 consumer_config={'bootstrap.servers': configs['
> bootstrap_servers'],
>
> 161 'group.id': 'my-group',
>
> 162 'isolation.level': 'read_uncommitted',
>
> 163 },
>
> 164 topics= configs['topics'],
>
> 165 max_num_records = max_num_records,
>
> 166 commit_offset_in_finalize = True,
>
> 167 with_metadata=True)
>
> 168 | "temp convert" >> beam.Map(temp_convert)
>
> 169 | "Write to Kafka" >> WriteToKafka(
>
> 170 producer_config={'bootstrap.servers': configs['
> bootstrap_servers']},
>
> 171 topic=configs['out_topic'],
>
> 172 )
>
> 173 )
>
>
>
>
>
> This is giving me
>
>
>
> RuntimeError: java.lang.ClassCastException: class
> org.apache.beam.sdk.util.construction.UnknownCoderWrapper cannot be cast to
> class org.apache.beam.sdk.coders.KvCoder
> (org.apache.beam.sdk.util.construction.UnknownCoderWrapper and
> org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app')’
>
>
>
> Can someone point me to a simple snipet for writing to Kafka in Python? I
> have looked in vain on the web.
>
>
>
> Thanks!
>