Hi,

I'm trying to send data to a Kafka topic using PyFlink (DataStream API),
while setting a key on the Kakfa record. The key is a simple string, the
value is a JSON string. What I have so far basically works, except the
whole record is sent as both the key and the value. How do I specify that I
want only the record["id"] to be the key? Unfortunately, the docs
<https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#serializer>
on
the topic don't indicate how to specify a key field.

Here's a simplified version of my code:

kafka_sink = (
    KafkaSink.builder()
    .set_bootstrap_servers(broker_url)
    .set_property("security.protocol", "SASL_SSL")
    .set_property("sasl.mechanism", "PLAIN")
    .set_property(
        "sasl.jaas.config",
        f'org.apache.kafka.common.security.plain.PlainLoginModule required \
      username="{my_username}" \
      password="{my_password}";',
    )
    .set_record_serializer(
        KafkaRecordSerializationSchema.builder()
        .set_topic("myTopic")
        .set_value_serialization_schema(SimpleStringSchema())
        .set_key_serialization_schema(SimpleStringSchema())
        .build()
    )
    .build()
)


stream = env.add_source(kinesis_source)
        .map(lambda event: json.loads(event))
        .map(add_event_timestamps)
        .assign_timestamps_and_watermarks(get_watermark_strategy())
        .key_by(lambda record: record["id"], key_type=Types.STRING())
        .window(EventTimeSessionWindows.with_gap(Time.seconds(30)))
        .reduce(reduce_events)
        .map(lambda e: json.dumps(e), output_type=Types.STRING())

One thing I've tried is changing the last line to:
        .map(lambda e: (e["id"], json.dumps(e)))

That is, have the stream be a (key, value) tuple. When I do that, though, I
get an exception:

RuntimeError: java.lang.UnsupportedOperationException: A serializer has
> already been registered for the state; re-registration is not allowed.


Thanks,
Andrew

Reply via email to