Hey, Why do you say the way you did it, does not work? The logs you posted say the classes cannot be handled by Flink's built-in mechanism for serializing POJOs and it falls back to a GenericType which is serialized with Kryo and should go through your registered serializer.
Best,
Dawid
On 14/02/2021 11:44, Svend Vanderveken wrote:
>
>
> Hi all,
>
> I'm failing to setup an example of wire serialization with Protobuf,
> could you help me figure out what I'm doing wrong?
>
> I'm using a simple protobuf schema:
> ```
> syntax = "proto3";
>
> import "google/protobuf/wrappers.proto";
> option java_multiple_files = true;
> message DemoUserEvent {
> Metadata metadata = 1;
> oneof payload {
> Created created = 10;
> Updated updated = 11;
> }
> message Created {...}
> message Updated{...}
> ...
> }
> ```
>
> From which I'm generating java from this Gradle plugin:
>
>
> ```
> plugins { id "com.google.protobuf" version "0.8.15"}
> ```
> And I'm generating DemoUserEvent instances with Java Iterator looking
> like this:
> ```
> public class UserEventGenerator implements Iterator<DemoUserEvent>,
> Serializable {
> transient public final static Faker faker = new Faker();
> ...
> @Overridepublic DemoUserEvent next() {
> return randomCreatedEvent();
> }
> ...
> ```
>
> I read those two pieces of documentation:
> *
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
> *
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>
> And tried the demo app below:
>
> ```
> import com.twitter.chill.protobuf.ProtobufSerializer;
> ...
> public static void main(String[] args) {
> final StreamExecutionEnvironment flinkEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class,
> ProtobufSerializer.class);
> flinkEnv.fromCollection(new UserEventGenerator(),
> DemoUserEvent.class).print();
> }
> ```
> But the serialization mechanism still fails to handle my protobuf class:
> 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor
> [] - class live.schema.event.user.v1.DemoUserEvent does not contain a
> getter for field payloadCase_
> 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor
> [] - class live.schema.event.user.v1.DemoUserEvent does not contain a
> setter for field payloadCase_
> 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor
> [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be used as
> a POJO type because not all fields are valid POJO fields, and must be
> processed as GenericType. Please read the Flink documentation on "Data Types
> & Serialization" for details of the effect on performance.
>
> I've also tried this, without success:
>
> ```
> flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class,
> ProtobufSerializer.class);
> ```
>
> I'm using those versions:
>
> ```
> ext { javaVersion = '11' flinkVersion = '1.12.1' scalaBinaryVersion = '2.12' }
> dependencies { compileOnly
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"implementation
> ("com.twitter:chill-protobuf:0.9.5") { exclude group:
> 'com.esotericsoftware.kryo', module: 'kryo' } implementation
> "com.google.protobuf:protobuf-java:3.14.0"implementation
> 'com.github.javafaker:javafaker:1.0.2'}
> ```
>
> Any idea what I should try next?
>
> Thanks in advance!
signature.asc
Description: OpenPGP digital signature
