Hi, I'm trying to send data to a Kafka topic using PyFlink (DataStream API), while setting a key on the Kakfa record. The key is a simple string, the value is a JSON string. What I have so far basically works, except the whole record is sent as both the key and the value. How do I specify that I want only the record["id"] to be the key? Unfortunately, the docs <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#serializer> on the topic don't indicate how to specify a key field.
Here's a simplified version of my code: kafka_sink = ( KafkaSink.builder() .set_bootstrap_servers(broker_url) .set_property("security.protocol", "SASL_SSL") .set_property("sasl.mechanism", "PLAIN") .set_property( "sasl.jaas.config", f'org.apache.kafka.common.security.plain.PlainLoginModule required \ username="{my_username}" \ password="{my_password}";', ) .set_record_serializer( KafkaRecordSerializationSchema.builder() .set_topic("myTopic") .set_value_serialization_schema(SimpleStringSchema()) .set_key_serialization_schema(SimpleStringSchema()) .build() ) .build() ) stream = env.add_source(kinesis_source) .map(lambda event: json.loads(event)) .map(add_event_timestamps) .assign_timestamps_and_watermarks(get_watermark_strategy()) .key_by(lambda record: record["id"], key_type=Types.STRING()) .window(EventTimeSessionWindows.with_gap(Time.seconds(30))) .reduce(reduce_events) .map(lambda e: json.dumps(e), output_type=Types.STRING()) One thing I've tried is changing the last line to: .map(lambda e: (e["id"], json.dumps(e))) That is, have the stream be a (key, value) tuple. When I do that, though, I get an exception: RuntimeError: java.lang.UnsupportedOperationException: A serializer has > already been registered for the state; re-registration is not allowed. Thanks, Andrew