Hi Flink Community,
I am trying to prove out the new protobuf functionality added to 1.16
([1]).  I have built master locally and have attempted following the
Protobuf Format doc ([2]) to create a table with the kafka connector using
the protobuf format.

I compiled the sample .proto file using protoc version 3.2.0, compiled the
.java output files using javac, linking to protobuf-java-3.5.1.jar (using
earlier versions gives me compiler errors about UnusedPrivateParameter) and
packaged the resulting class files into SimpleTest.jar.

However, when I try to select the table, I get the following error:
% ./sql-client.sh --jar ~/repos/simple_protobuf/SimpleTest/SimpleTest.jar
--jar
~/repos/flink/flink-connectors/flink-connector-kafka/target/flink-connector-kafka-1.17-SNAPSHOT.jar
--jar
~/repos/flink/flink-formats/flink-sql-protobuf/target/flink-sql-protobuf-1.17-SNAPSHOT.jar
Flink SQL> CREATE TABLE simple_test (
>   uid BIGINT,
>   name STRING,
>   category_type INT,
>   content BINARY,
>   price DOUBLE,
>   value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
>   value_arr array<row<v1 BIGINT, v2 INT>>,
>   corpus_int INT,
>   corpus_str STRING
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'protobuf',
>  'protobuf.message-class-name' = 'com.example.SimpleTest',
>  'protobuf.ignore-parse-errors' = 'true'
> )
> ;
[INFO] Execute statement succeed.

Flink SQL> select * from simple_test;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: com.example.SimpleTest

Flink SQL>

Any advice greatly appreciated, thank you.

[1]
https://github.com/apache/flink/commit/5c87b69b5300e8678629aa8b769d60ec2fdbf3d1
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/

Reply via email to