Hello, I want to read data from Kafka topics which have Confluent-encoded Protobuf messages (not plain Protobuf, Confluent adds a "magic byte" and schema ID) with Python Datastream API. I have found that Confluent has a Java class[1] which implements org.apache.kafka.common.serialization.Deserializer and thus I could use for it as a custom DeserializationSchema, set with KafkaSource.builder().set_value_only_deserializer(<custom deserialization schema>) . So far so good, but: I have added the jars to the stream execution environment (add_jars() ,add_classpaths()) but they cannot be found on the Java gateway when I am trying to get the Java class. Other Flink classes coming from other jars (flink-sql-connector-kafka-3.2.0-1.19.jar, flink-table-api-java-bridge-1.19.1.jar) are available. What can be the problem? Is it even supposed to work? A snippet demonstrating my idea: ------------------------------------------------------ from pyflink.common import Configuration env = StreamExecutionEnvironment.get_execution_environment(Configuration())
project_root = "/Users/aksb/projects/flink" confluent_serde_jar = f"file://{project_root}/lib/kafka-protobuf-serializer-7.7.1.jar" env.add_jars( confluent_serde_jar, f"file://{project_root}/lib/flink-sql-connector-kafka-3.2.0-1.19.jar", f"file://{project_root}/lib/flink-table-api-java-bridge-1.19.1.jar", ) env.add_classpaths(confluent_serde_jar) _jvm = get_gateway().jvm env_config = _jvm.org.apache.flink.python.util.PythonConfigUtil.getEnvironmentConfig(env._j_stream_execution_environment) print(f".........env config: {env_config}") # output contains the kafka-protobuf-serializer-7.7.1.jar pkgs = _jvm.Package.getPackages() package_names = [pkg.getName() for pkg in pkgs] print(f"--------- Packages : {package_names}") # output doesn’t contain any io.confluent package but contain org.apache.flink.* ones # when trying to load the Java class, I get py4j.protocol.Py4JError: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer does not exist in the JVM ff = _jvm.io.confluent.kafka.formatter.protobuf.KafkaProtobufDeSerializer ------------------------------------------------------ Thanks for the answers/help in advance. Regards, Akos [1] https://github.com/confluentinc/schema-registry/blob/master/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/KafkaProtobufDeserializer.java