I tried ExecutionConfig#disableGenericTypes and I get this error when I try to execute my Flink job with StreamExecutionEnvironment#execute (attached is the full stacktrace):
java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type. I suppose then there is nothing I can do at this time other than wait for the ticket I filed [1] to be resolved. I am assuming if generic types are disabled, then UnsupportedOperationException with be thrown for all generic types, regardless of whether the generic type is registered with Kryo or not. [1] https://issues.apache.org/jira/browse/FLINK-25993 Best regards, Shane ________________________________ From: Chesnay Schepler <ches...@apache.org> Sent: February 7, 2022 3:08 AM To: Shane Bishop <shane.bis...@outlook.com>; user@flink.apache.org <user@flink.apache.org> Subject: Re: Questions about Kryo setRegistrationRequired(false) There isn't any setting to control setRegistrationRequired(). You can however turn Kryo off via ExecutionConfig#disableGenericTypes, although this may require changes to your data types. I'd recommend to file a ticket. On 04/02/2022 20:12, Shane Bishop wrote: Hi all, TL;DR: I am concerned that kryo.setRegistrationRequired(false) in Apache Flink might introduce serialization/deserialization vulnerabilities, and I want to better understand the security implications of its use in Flink. There is an issue on the Kryo GitHub repo (link<https://github.com/EsotericSoftware/kryo/issues/398>) regarding type registration. The "fix" the Kryo developers made was to make setRegistrationRequired(true) the default (comment on GitHub issue<https://github.com/EsotericSoftware/kryo/issues/398#issuecomment-371153541>, commit with this fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00> and the line in the commit that is the fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00#diff-6d4638ca49aa0d0d9171ff04a0faa22e241f8320fda4a8a12c95853188d055a0R130>). This is not a true fix, as the default can still be overridden. This only sets a safe default. In Flink, the default of true is overridden in the 1.14.3 Flink release (see KryoSerializer.java<https://github.com/apache/flink/blob/release-1.14.3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L492> and FlinkScalaKryoInstantiator.scala<https://github.com/apache/flink/blob/release-1.14.3/flink-scala/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala#L46>). I am no Flink contributor, so I might be missing safety mechanisms that are in place to prevent the Kryo serialization/deserialization vulnerability even when registration required is set to false. Are there any such safety mechanisms in place? Is there anything I can do as a user of Flink to protect myself against this Kryo vulnerability? Best regards, Shane Bishop
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application. at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:247) at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) ... 6 more Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:238) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ... 6 more Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ... 9 more Caused by: java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type. at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:346) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:338) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:346) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:338) at org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:947) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:379) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:351) at org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator.translateInternal(AbstractOneInputTransformationTranslator.java:63) at org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:65) at org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:37) at org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:721) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:453) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:303) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2010) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1995) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) at com.trendmicro.c1ws.tdax.App.main(App.java:69) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 12 more