Hi Dian, I simplify the question in https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully. You can also find the updated question below:
I have a PyFlink job that reads from a file, filter based on a condition, and print. This is a `tree` view of my working directory. This is the PyFlink script main.py: ```python from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.udf import udf # https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html # https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) def parse(s): import json # a dummy parser res = {'item_id': 123, 'tag': 'a'} return res env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.register_function("parse", parse) my_source_ddl = """ create table mySource ( id BIGINT, contentstr STRING ) with ( 'connector' = 'filesystem', 'format' = 'json', 'path' = '/tmp/input' ) """ my_sink_ddl = """ create table mySink ( id BIGINT ) with ( 'connector' = 'print' ) """ my_transform_dml = """ insert into mySink with t1 as ( select id, parse(contentstr) as content from mySource ) select id from t1 where content['item_id'] is not null and content['tag'] = 'a' """ t_env.execute_sql(my_source_ddl) t_env.execute_sql(my_sink_ddl) t_env.execute_sql(my_transform_dml).wait() ``` To run the `main.py`: - Ensure installing pyflink==1.12.0 in my conda env - /tmp/input has a single row of content `{"id":1,"tag":"a"}` Then I run `main.py` and I get the exception: ``` Traceback (most recent call last): File "udf_parse.py", line 53, in <module> t_env.execute_sql(my_transform_dml).wait() File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table_result.py", line 76, in wait get_method(self._j_table_result, "await")() File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco return f(*a, **kw) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o53.await. : java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123) at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56) at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350) at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:363) at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:110) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54) ... 7 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:88) at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:82) at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:34) at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:129) at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:110) at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106) at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:81) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:199) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitWatermark(SourceOperatorStreamTask.java:170) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.advanceToEndOfEventTime(SourceOperatorStreamTask.java:110) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:116) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) ``` The issue is probably related to the udf. Any help? Thanks! Best, Yik San On Fri, Mar 19, 2021 at 11:58 AM Yik San Chan <evan.chanyik...@gmail.com> wrote: > Hi Dian, > > I am able to reproduce this issue in a much simpler setup. Let me update > with the simpler reproducible example shortly. > > Best, > Yik San > > On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan <evan.chanyik...@gmail.com> > wrote: > >> Hi Dian, >> >> It is a good catch, though after changing to use >> flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same >> error. >> >> Best, >> Yik San >> >> On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <dian0511...@gmail.com> wrote: >> >>> >>> I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does >>> the jar files in the cluster nodes are also built with Scala 2.12? PyFlink >>> package bundles jar files with Scala 2.11 by default. I’m still not sure if >>> it’s related to this issue. However, I think this is problematic. Could you >>> make sure that they are consistent? >>> >>> >>> 2021年3月19日 上午10:40,Yik San Chan <evan.chanyik...@gmail.com> 写道: >>> >>> Hi Dian, >>> >>> The PyFlink version is 1.12.0 and the Flink version in the cluster nodes >>> is also 1.12.0 >>> >>> $ which flink >>> /data/apache/flink/flink-1.12.0/bin/flink >>> >>> Best, >>> Yik San >>> >>> On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <dian0511...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> What’s the Flink version in the cluster nodes? It should matches the >>>> PyFlink version. >>>> >>>> Regards, >>>> Dian >>>> >>>> 2021年3月18日 下午5:01,Yik San Chan <evan.chanyik...@gmail.com> 写道: >>>> >>>> This question is cross-posted on StackOverflow >>>> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint >>>> >>>> I have a PyFlink job that reads from Kafka source, transform, and write >>>> to Kafka sink. This is a `tree` view of my working directory. >>>> >>>> ``` >>>> > tree >>>> . >>>> ├── deps >>>> │ └── flink-sql-connector-kafka_2.12-1.12.0.jar >>>> ├── flink_run.sh >>>> ├── main.py >>>> ├── pyflink1.12.0.zip >>>> └── tasks >>>> └── user_last_n_clicks >>>> ├── sink_ddl.sql >>>> ├── source_ddl.sql >>>> └── transform_dml.sql >>>> ``` >>>> >>>> This is the `flink_run.sh`: >>>> >>>> ``` >>>> flink run \ >>>> --yarnname test-pyflink \ >>>> -m yarn-cluster \ >>>> -yD yarn.application.queue=tech_platform \ >>>> -pyarch pyflink1.12.0.zip \ >>>> -pyexec /data/software/pyflink1.12.0/bin/python \ >>>> -py main.py testing user_last_n_clicks >>>> ``` >>>> >>>> This is the `main.py`. The key logic is in: >>>> - `parse_content` udf. >>>> - load sql files from tasks subfolder, and execute_sql >>>> >>>> ```python >>>> import os >>>> from sys import argv >>>> from pyflink.datastream import StreamExecutionEnvironment >>>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings, >>>> DataTypes >>>> from pyflink.table.udf import udf >>>> >>>> def read_file_content(filepath): >>>> with open(filepath) as f: >>>> return f.read() >>>> >>>> @udf(input_types=[DataTypes.STRING()], >>>> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) >>>> def parse_content(content_str): >>>> import json >>>> res = {} >>>> content = json.loads(content_str) >>>> if 'postId' in content: >>>> res['item_id'] = content['postId'] >>>> if 'lid' in content: >>>> res['item_id'] = content['lid'] >>>> if 'param' in content and 'tag' in content['param']: >>>> res['tag'] = content['param']['tag'] >>>> return res >>>> >>>> CWD = os.getcwd() >>>> _, palfish_env, task = argv >>>> >>>> VALID_PALFISH_ENVS = ['development', 'testing', 'production'] >>>> if palfish_env not in VALID_PALFISH_ENVS: >>>> raise Exception(f"{palfish_env} is not a valid env, should be one >>>> of [{', '.join(VALID_PALFISH_ENVS)}].") >>>> >>>> VALID_TASKS = os.listdir(f"{CWD}/tasks") >>>> if task not in VALID_TASKS: >>>> raise Exception(f"{task} is not a valid task, should be one of [{', >>>> '.join(VALID_TASKS)}].") >>>> >>>> config = { >>>> "development": { >>>> "${generation.kafka.source.servers}": "localhost:9094", >>>> "${generation.kafka.sink.servers}": "localhost:9094" >>>> }, >>>> "testing": { >>>> "${generation.kafka.source.servers}": "10.111.135.233:9092, >>>> 10.111.130.11:9092,10.111.130.12:9092", >>>> "${generation.kafka.sink.servers}": "10.111.135.233:9092, >>>> 10.111.130.11:9092,10.111.130.12:9092" >>>> }, >>>> "production": { >>>> "${generation.kafka.source.servers}": "10.111.203.9:9092, >>>> 10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092, >>>> 10.111.204.164:9092,10.111.204.165:9092", >>>> "${generation.kafka.sink.servers}": "10.111.209.219:9092, >>>> 10.111.209.220:9092,10.111.209.221:9092" >>>> } >>>> } >>>> >>>> FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar" >>>> >>>> source_ddl = >>>> read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', >>>> config[palfish_env]['${generation.kafka.source.servers}']) >>>> sink_ddl = >>>> read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', >>>> config[palfish_env]['${generation.kafka.sink.servers}']) >>>> transform_dml = >>>> read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql') >>>> >>>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>>> env_settings = EnvironmentSettings.Builder().use_blink_planner().build() >>>> t_env = >>>> StreamTableEnvironment.create(stream_execution_environment=exec_env, >>>> environment_settings=env_settings) >>>> >>>> t_env.get_config().get_configuration().set_string("pipeline.jars", >>>> f"file://{FAT_JAR_PATH}") >>>> t_env.create_temporary_function("ParseContent", parse_content) >>>> >>>> t_env.execute_sql(source_ddl) >>>> t_env.execute_sql(sink_ddl) >>>> t_env.execute_sql(transform_dml).wait() >>>> ``` >>>> >>>> See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`. >>>> >>>> ```sql >>>> # source_ddl.sql >>>> CREATE TABLE kafka_source ( >>>> `body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` >>>> STRING>> >>>> ) WITH ( >>>> 'connector' = 'kafka', >>>> 'topic' = 'data-report-stat-old-logtype7', >>>> 'properties.bootstrap.servers' = '${generation.kafka.source.servers}', >>>> 'properties.group.id' = 'flink-featurepipelines', >>>> 'format' = 'json' >>>> ) >>>> >>>> # transform_ddl.sql >>>> INSERT INTO kafka_sink >>>> WITH t1 AS ( >>>> SELECT body['log']['uid'] user_id, >>>> ParseContent(body['log']['contentstr']) content, body['log']['serverts'] >>>> server_ts >>>> FROM kafka_source >>>> ), >>>> t2 AS ( >>>> SELECT user_id, content['item_id'] item_id, content['tag'] tag, >>>> server_ts >>>> FROM t1 >>>> WHERE content['item_id'] IS NOT NULL >>>> AND content['tag'] = '点击帖子卡片' >>>> ), >>>> last_n AS ( >>>> SELECT user_id, item_id, server_ts >>>> FROM ( >>>> SELECT *, >>>> ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as >>>> row_num >>>> FROM t2) >>>> WHERE row_num <= 5 >>>> ) >>>> SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, >>>> LISTAGG(CAST(item_id AS STRING)) last_5_clicks >>>> FROM last_n >>>> GROUP BY user_id >>>> >>>> # sink_ddl.sql >>>> CREATE TABLE kafka_sink ( >>>> user_id BIGINT, >>>> datetime TIMESTAMP(3), >>>> last_5_clicks STRING, >>>> PRIMARY KEY (user_id) NOT ENFORCED >>>> ) WITH ( >>>> 'connector' = 'upsert-kafka', >>>> 'topic' = 'aiinfra.fct.userfeature.0', >>>> 'properties.bootstrap.servers' = '${generation.kafka.sink.servers}', >>>> 'key.format' = 'json', >>>> 'value.format' = 'json' >>>> ) >>>> ``` >>>> >>>> I got the error when running the PyFlink program in my testing >>>> environment machine. >>>> >>>> ``` >>>> Caused by: java.io.EOFException >>>> at java.io.DataInputStream.readInt(DataInputStream.java:392) >>>> ~[?:1.8.0_261] >>>> at >>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) >>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>>> at >>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) >>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>>> at >>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) >>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>>> at >>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) >>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>> at >>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) >>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>> at >>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) >>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>> at >>>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) >>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>> at >>>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) >>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>> at >>>> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) >>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>> ``` >>>> >>>> Here are the full logs, see >>>> https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc. >>>> >>>> Any idea why the exception? Thanks. >>>> >>>> Yik San >>>> >>>> >>>> >>>