Hello,

I want to read data from Kafka topics which have Confluent-encoded Protobuf
messages (not plain Protobuf, Confluent adds a "magic byte" and schema ID)
with Python Datastream API. I have found that Confluent has a Java class[1]
which implements org.apache.kafka.common.serialization.Deserializer and
thus I could use for it as a custom DeserializationSchema, set with
KafkaSource.builder().set_value_only_deserializer(<custom deserialization
schema>) .
So far so good, but: I have added the jars to the stream execution
environment (add_jars() ,add_classpaths()) but they cannot be found on the
Java gateway when I am trying to get the Java class. Other Flink classes
coming from other jars (flink-sql-connector-kafka-3.2.0-1.19.jar,
flink-table-api-java-bridge-1.19.1.jar) are available. What can be the
problem? Is it even supposed to work?
A snippet demonstrating my idea:
------------------------------------------------------
    from pyflink.common import Configuration
    env =
StreamExecutionEnvironment.get_execution_environment(Configuration())

    project_root = "/Users/aksb/projects/flink"
    confluent_serde_jar =
f"file://{project_root}/lib/kafka-protobuf-serializer-7.7.1.jar"

    env.add_jars(
        confluent_serde_jar,

f"file://{project_root}/lib/flink-sql-connector-kafka-3.2.0-1.19.jar",
        f"file://{project_root}/lib/flink-table-api-java-bridge-1.19.1.jar",
        )

    env.add_classpaths(confluent_serde_jar)

    _jvm = get_gateway().jvm

    env_config =
_jvm.org.apache.flink.python.util.PythonConfigUtil.getEnvironmentConfig(env._j_stream_execution_environment)

    print(f".........env config: {env_config}") # output contains the
kafka-protobuf-serializer-7.7.1.jar

    pkgs = _jvm.Package.getPackages()
    package_names = [pkg.getName() for pkg in pkgs]
    print(f"--------- Packages : {package_names}") # output doesn’t contain
any io.confluent package but contain org.apache.flink.* ones

    # when trying to load the Java class, I get py4j.protocol.Py4JError:
io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer does not
exist in the JVM
    ff =
_jvm.io.confluent.kafka.formatter.protobuf.KafkaProtobufDeSerializer
------------------------------------------------------

Thanks for the answers/help in advance.

Regards,
Akos

[1]
https://github.com/confluentinc/schema-registry/blob/master/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/KafkaProtobufDeserializer.java

Reply via email to