Sebastian YEPES FERNANDEZ created FLINK-38519:
-------------------------------------------------

             Summary: Custom Message Keys with PyFlink Kafka Sink
                 Key: FLINK-38519
                 URL: https://issues.apache.org/jira/browse/FLINK-38519
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: kafka-4.0.1, 2.1.0
         Environment: Apache Flink 2.1.0 with the bellow jars

https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/4.0.1-2.0/flink-sql-connector-kafka-4.0.1-2.0.jar
https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/4.1.0/kafka-clients-4.1.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-avro/2.1.0/flink-avro-2.1.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/2.1.0/flink-avro-confluent-registry-2.1.0.jar
https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/8.0.0/kafka-schema-registry-client-8.0.0.jar
https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/8.0.0/kafka-avro-serializer-8.0.0.jar
https://repo1.maven.org/maven2/org/apache/avro/avro/1.12.0/avro-1.12.0.jar
https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.7-4/zstd-jni-1.5.7-4.jar
https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.20.0/jackson-databind-2.20.0.jar
https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.20.0/jackson-core-2.20.0.jar
https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.20/jackson-annotations-2.20.jar
            Reporter: Sebastian YEPES FERNANDEZ
         Attachments: Issue-Alternative.md, issue.md

Hi all,

I’m currently using a custom Kafka producer but am considering switching to the 
native Flink Kafka Sink Connector. However, I’ve encountered some issues and 
exceptions in the process.
At the moment, I’m publishing JSON payloads with custom message keys to 
optimize partitioning and ordering. From what I can tell, the PyFlink interface 
doesn’t support this functionality directly. I even consulted ClaudeCode, which 
suggested that the only solution is to implement a custom Java serializer—but 
I’d prefer to avoid creating custom JARs if possible.

Below is a simplified example of what I’m trying implement:

 
{code:java}
  def create_kafka_record(event):
    """Create Kafka record with kafkaKey = client:client_id"""
    client = event.get("name", "")
    client_id = event.get("id", "")
    kafka_key = f"{client}:{client_id}"
    return (kafka_key, json.dumps(event, default=str))
    
  
  kafka_records = processed_events.map(create_kafka_record, 
output_type=Types.TUPLE([Types.STRING(), Types.STRING()])).name("Prepare Kafka 
Records")
  
  kafka_sink = 
KafkaSink.builder().set_bootstrap_servers(bootstrap_servers).build()
      
  kafka_records.sink_to(kafka_sink).name("Sink to Kafka") {code}
 

 

Does anyone know of any alternatives or workarounds for this? Or could someone 
clarify if there are plans to enhance the PyFlink Java interface to support 
custom message keys and serializers?

I have attached the Claude Code analysis of the problem, hope this can help.


Thanks in advance for your help!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to