Hi Flink Community, I am trying to prove out the new protobuf functionality added to 1.16 ([1]). I have built master locally and have attempted following the Protobuf Format doc ([2]) to create a table with the kafka connector using the protobuf format.
I compiled the sample .proto file using protoc version 3.2.0, compiled the .java output files using javac, linking to protobuf-java-3.5.1.jar (using earlier versions gives me compiler errors about UnusedPrivateParameter) and packaged the resulting class files into SimpleTest.jar. However, when I try to select the table, I get the following error: % ./sql-client.sh --jar ~/repos/simple_protobuf/SimpleTest/SimpleTest.jar --jar ~/repos/flink/flink-connectors/flink-connector-kafka/target/flink-connector-kafka-1.17-SNAPSHOT.jar --jar ~/repos/flink/flink-formats/flink-sql-protobuf/target/flink-sql-protobuf-1.17-SNAPSHOT.jar Flink SQL> CREATE TABLE simple_test ( > uid BIGINT, > name STRING, > category_type INT, > content BINARY, > price DOUBLE, > value_map map<BIGINT, row<v1 BIGINT, v2 INT>>, > value_arr array<row<v1 BIGINT, v2 INT>>, > corpus_int INT, > corpus_str STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'protobuf', > 'protobuf.message-class-name' = 'com.example.SimpleTest', > 'protobuf.ignore-parse-errors' = 'true' > ) > ; [INFO] Execute statement succeed. Flink SQL> select * from simple_test; [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: com.example.SimpleTest Flink SQL> Any advice greatly appreciated, thank you. [1] https://github.com/apache/flink/commit/5c87b69b5300e8678629aa8b769d60ec2fdbf3d1 [2] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/