Sebastian YEPES FERNANDEZ created FLINK-38519:
-------------------------------------------------
Summary: Custom Message Keys with PyFlink Kafka Sink
Key: FLINK-38519
URL: https://issues.apache.org/jira/browse/FLINK-38519
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: kafka-4.0.1, 2.1.0
Environment: Apache Flink 2.1.0 with the bellow jars
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/4.0.1-2.0/flink-sql-connector-kafka-4.0.1-2.0.jar
https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/4.1.0/kafka-clients-4.1.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-avro/2.1.0/flink-avro-2.1.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/2.1.0/flink-avro-confluent-registry-2.1.0.jar
https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/8.0.0/kafka-schema-registry-client-8.0.0.jar
https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/8.0.0/kafka-avro-serializer-8.0.0.jar
https://repo1.maven.org/maven2/org/apache/avro/avro/1.12.0/avro-1.12.0.jar
https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.7-4/zstd-jni-1.5.7-4.jar
https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.20.0/jackson-databind-2.20.0.jar
https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.20.0/jackson-core-2.20.0.jar
https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.20/jackson-annotations-2.20.jar
Reporter: Sebastian YEPES FERNANDEZ
Attachments: Issue-Alternative.md, issue.md
Hi all,
I’m currently using a custom Kafka producer but am considering switching to the
native Flink Kafka Sink Connector. However, I’ve encountered some issues and
exceptions in the process.
At the moment, I’m publishing JSON payloads with custom message keys to
optimize partitioning and ordering. From what I can tell, the PyFlink interface
doesn’t support this functionality directly. I even consulted ClaudeCode, which
suggested that the only solution is to implement a custom Java serializer—but
I’d prefer to avoid creating custom JARs if possible.
Below is a simplified example of what I’m trying implement:
{code:java}
def create_kafka_record(event):
"""Create Kafka record with kafkaKey = client:client_id"""
client = event.get("name", "")
client_id = event.get("id", "")
kafka_key = f"{client}:{client_id}"
return (kafka_key, json.dumps(event, default=str))
kafka_records = processed_events.map(create_kafka_record,
output_type=Types.TUPLE([Types.STRING(), Types.STRING()])).name("Prepare Kafka
Records")
kafka_sink =
KafkaSink.builder().set_bootstrap_servers(bootstrap_servers).build()
kafka_records.sink_to(kafka_sink).name("Sink to Kafka") {code}
Does anyone know of any alternatives or workarounds for this? Or could someone
clarify if there are plans to enhance the PyFlink Java interface to support
custom message keys and serializers?
I have attached the Claude Code analysis of the problem, hope this can help.
Thanks in advance for your help!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)