Hello,

It doesn't seem that the following transform outputs a tuple of bytes.

| "temp convert" >> beam.Map(temp_convert)

You should convert the output into something like ('key'.encode('utf-8'),
'value'.encode('utf-8')).

I wrote a simple post about Kafka I/O read/write on the Flink Runner, and
hope it helps - https://jaehyeon.me/blog/2024-04-18-beam-local-dev-3/

Cheers,
Jaehyeon



On Fri, 4 Oct 2024 at 08:21, Henry Tremblay via user <[email protected]>
wrote:

> I am creating a simple pipeline to read and then write to Kafka. Here is
> my code:
>
> 156     *with* Pipeline(options=pipeline_options) *as* pipeline:
>
> 157         main = (
>
> 158             pipeline
>
> 159             | ReadFromKafka(
>
> 160                 consumer_config={'bootstrap.servers': configs['
> bootstrap_servers'],
>
> 161                     'group.id': 'my-group',
>
> 162                     'isolation.level': 'read_uncommitted',
>
> 163                     },
>
> 164                 topics= configs['topics'],
>
> 165                 max_num_records = max_num_records,
>
> 166                 commit_offset_in_finalize = True,
>
> 167                 with_metadata=True)
>
> 168             | "temp convert" >> beam.Map(temp_convert)
>
> 169             | "Write to Kafka" >>  WriteToKafka(
>
> 170                 producer_config={'bootstrap.servers': configs['
> bootstrap_servers']},
>
> 171                     topic=configs['out_topic'],
>
> 172                     )
>
> 173             )
>
>
>
>
>
> This is giving me
>
>
>
> RuntimeError: java.lang.ClassCastException: class
> org.apache.beam.sdk.util.construction.UnknownCoderWrapper cannot be cast to
> class org.apache.beam.sdk.coders.KvCoder
> (org.apache.beam.sdk.util.construction.UnknownCoderWrapper and
> org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app')’
>
>
>
> Can someone point me to a simple snipet for writing to Kafka in Python? I
> have looked in vain on the web.
>
>
>
> Thanks!
>

Reply via email to