[ https://issues.apache.org/jira/browse/FLINK-18770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167957#comment-17167957 ]
Leonid Ilyevsky commented on FLINK-18770: ----------------------------------------- [~aljoscha] I fixed the build, and I see the same problem. But I need to correct my initial description of what is going on. Sorry, I somewhat lied about comparing tests in local environment vs. cluster: in local environment I actually did the simplified test for my connector only, where I don't do much with the data. See this test code in [^FlinkTest.scala] Also I tested the full application in local environment, and it fails with the same error. So now I see this problem has nothing to do with differences of local vs. cluster, it is something else. The data coming from Solace is actually protobuf messages. The test app that works has no way of knowing what that is, maybe this is why it just uses the simple serializer. The actual app has all that protobuf stuff loaded, registered marshalling services, etc. Maybe this is why Flink decides to use Kryo, and we see that error message actually mentions my protobuf classes: {quote}com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: parameter_ (com.liquidnet.tcap.protobuf.TcapMessages$Strategy) strategy_ (com.liquidnet.tcap.protobuf.TcapMessages$OrderProperties) {quote} In my case I need to make Flink use BytePrimitiveArraySerializer; all the transformations are taken care of in the next steps in pipeline. Please let me know what can I do to achieve that. > Emitting element fails in KryoSerializer > ---------------------------------------- > > Key: FLINK-18770 > URL: https://issues.apache.org/jira/browse/FLINK-18770 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System > Affects Versions: 1.11.1 > Environment: Flink 1.11.1, Linux > Reporter: Leonid Ilyevsky > Priority: Major > Attachments: AppMain.java, FlinkTest.scala, KryoException.txt, > SolaceSource.java, run_command.txt > > > I wrote a simple Flink connector for Solace, see attached java file. It works > fine under local execution environment. However, when I deployed it in the > real Flink cluster, it failed with the Kryo exception, see attached. > After a few hours of search and debugging, I can see now what is going on. > The data I want to emit from this source is a simple byte array. In the > exception stack you can see that when I call 'collect' on the context, it > goes into OperatorChain.java:715, and then to KryoSerializer, where it > ultimately fails. I didn't have a chance to learn what KryoSerializer is and > why it would not know what to do with byte[], but that is not the point now. > Then I used debugger in my local test, in order to figure out how it manages > to work. I saw that after OperatorChain.java:715 it goes into > BytePrimitiveArraySerializer, and then everything is working as expected. > Obviously BytePrimitiveArraySerializer makes sense for byte[] data. > The question is, how can I configure the execution environment under cluster > so that it does serialization the same way as the local one? I looked at > [https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html] > , and I was thinking of setting disableForceKryo, but it says it is disabled > by default anyway. > > Another question is, why cluster execution environment has different default > settings compare to local? This makes it difficult to rely on local tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)