Good finding! 

I think we should handle this case more friendly as I guess this issue should 
be very common for Python users since Python is dynamic language. I have 
created https://issues.apache.org/jira/browse/FLINK-21876 
<https://issues.apache.org/jira/browse/FLINK-21876> to follow up with this 
issue.

Regards,
Dian

> 2021年3月19日 下午6:57,Xingbo Huang <hxbks...@gmail.com> 写道:
> 
> Yes, you need to ensure that the key and value types of the Map are determined
> 
> Best,
> Xingbo
> 
> Yik San Chan <evan.chanyik...@gmail.com <mailto:evan.chanyik...@gmail.com>> 
> 于2021年3月19日周五 下午3:41写道:
> I got why regarding the simplified question - the dummy parser should return 
> key(string)-value(string), otherwise it fails the result_type spec
> 
> On Fri, Mar 19, 2021 at 3:37 PM Yik San Chan <evan.chanyik...@gmail.com 
> <mailto:evan.chanyik...@gmail.com>> wrote:
> Hi Dian,
> 
> I simplify the question in 
> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully
>  
> <https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully>.
>  You can also find the updated question below:
> 
> I have a PyFlink job that reads from a file, filter based on a condition, and 
> print. This is a `tree` view of my working directory. This is the PyFlink 
> script main.py:
> 
> ```python
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> 
> # https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html 
> <https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html>
> # 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html>
> 
> @udf(input_types=[DataTypes.STRING()], 
> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
> def parse(s):
>   import json
>   # a dummy parser
>   res = {'item_id': 123, 'tag': 'a'}
>   return res
> 
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env)
> 
> t_env.register_function("parse", parse)
> 
> my_source_ddl = """
> create table mySource (
>     id BIGINT,
>     contentstr STRING
> ) with (
>     'connector' = 'filesystem',
>     'format' = 'json',
>     'path' = '/tmp/input'
> )
> """
> 
> my_sink_ddl = """
> create table mySink (
>     id BIGINT
> ) with (
>     'connector' = 'print'
> )
> """
> 
> my_transform_dml = """
> insert into mySink
> with t1 as (
>     select id, parse(contentstr) as content
>     from mySource
> )
> select id
> from t1
> where content['item_id'] is not null
> and content['tag'] = 'a'
> """
> 
> t_env.execute_sql(my_source_ddl)
> t_env.execute_sql(my_sink_ddl)
> t_env.execute_sql(my_transform_dml).wait()
> ```
> 
> To run the `main.py`:
> - Ensure installing pyflink==1.12.0 in my conda env
> - /tmp/input has a single row of content `{"id":1,"tag":"a"}`
> 
> Then I run `main.py` and I get the exception:
> 
> ```
> Traceback (most recent call last):
>   File "udf_parse.py", line 53, in <module>
>     t_env.execute_sql(my_transform_dml).wait()
>   File 
> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table_result.py",
>  line 76, in wait
>     get_method(self._j_table_result, "await")()
>   File 
> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py",
>  line 1286, in __call__
>     answer, self.gateway_client, self.target_id, self.name 
> <http://self.name/>)
>   File 
> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>  line 147, in deco
>     return f(*a, **kw)
>   File 
> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/protocol.py",
>  line 328, in get_return_value
>     format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o53.await.
> : java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123)
> at 
> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.table.api.TableException: Failed to wait job 
> finish
> at 
> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)
> at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
> at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:363)
> at 
> org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:110)
> at 
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
> ... 7 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
> at akka.dispatch.OnComplete.internal(Future.scala:264)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
> at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ... 4 more
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readFully(DataInputStream.java:197)
> at java.io.DataInputStream.readFully(DataInputStream.java:169)
> at 
> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:88)
> at 
> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:82)
> at 
> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:34)
> at 
> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:129)
> at 
> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:110)
> at 
> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46)
> at 
> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106)
> at 
> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
> at 
> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:81)
> at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250)
> at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273)
> at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:199)
> at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitWatermark(SourceOperatorStreamTask.java:170)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.advanceToEndOfEventTime(SourceOperatorStreamTask.java:110)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:116)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:748)
> ```
> 
> The issue is probably related to the udf. 
> 
> Any help? Thanks!
> 
> Best,
> Yik San
> 
> On Fri, Mar 19, 2021 at 11:58 AM Yik San Chan <evan.chanyik...@gmail.com 
> <mailto:evan.chanyik...@gmail.com>> wrote:
> Hi Dian,
> 
> I am able to reproduce this issue in a much simpler setup. Let me update with 
> the simpler reproducible example shortly.
> 
> Best,
> Yik San
> 
> On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan <evan.chanyik...@gmail.com 
> <mailto:evan.chanyik...@gmail.com>> wrote:
> Hi Dian,
> 
> It is a good catch, though after changing to use 
> flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same error.
> 
> Best,
> Yik San
> 
> On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <dian0511...@gmail.com 
> <mailto:dian0511...@gmail.com>> wrote:
> 
> I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does the 
> jar files in the cluster nodes are also built with Scala 2.12? PyFlink 
> package bundles jar files with Scala 2.11 by default. I’m still not sure if 
> it’s related to this issue. However, I think this is problematic. Could you 
> make sure that they are consistent?
> 
> 
>> 2021年3月19日 上午10:40,Yik San Chan <evan.chanyik...@gmail.com 
>> <mailto:evan.chanyik...@gmail.com>> 写道:
>> 
>> Hi Dian,
>> 
>> The PyFlink version is 1.12.0 and the Flink version in the cluster nodes is 
>> also 1.12.0
>> 
>> $ which flink
>> /data/apache/flink/flink-1.12.0/bin/flink
>> 
>> Best,
>> Yik San
>> 
>> On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <dian0511...@gmail.com 
>> <mailto:dian0511...@gmail.com>> wrote:
>> Hi,
>> 
>> What’s the Flink version in the cluster nodes? It should matches the PyFlink 
>> version.
>> 
>> Regards,
>> Dian
>> 
>>> 2021年3月18日 下午5:01,Yik San Chan <evan.chanyik...@gmail.com 
>>> <mailto:evan.chanyik...@gmail.com>> 写道:
>>> 
>>> This question is cross-posted on StackOverflow 
>>> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint
>>>  
>>> <https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint>
>>> 
>>> I have a PyFlink job that reads from Kafka source, transform, and write to 
>>> Kafka sink. This is a `tree` view of my working directory.
>>> 
>>> ```
>>> > tree
>>> .
>>> ├── deps
>>> │   └── flink-sql-connector-kafka_2.12-1.12.0.jar
>>> ├── flink_run.sh
>>> ├── main.py
>>> ├── pyflink1.12.0.zip
>>> └── tasks
>>>     └── user_last_n_clicks
>>>         ├── sink_ddl.sql
>>>         ├── source_ddl.sql
>>>         └── transform_dml.sql
>>> ```
>>> 
>>> This is the `flink_run.sh`:
>>> 
>>> ```
>>> flink run \
>>> --yarnname test-pyflink \
>>> -m yarn-cluster \
>>> -yD yarn.application.queue=tech_platform \
>>> -pyarch pyflink1.12.0.zip \
>>> -pyexec /data/software/pyflink1.12.0/bin/python \
>>> -py main.py testing user_last_n_clicks
>>> ```
>>> 
>>> This is the `main.py`. The key logic is in:
>>> - `parse_content` udf.
>>> - load sql files from tasks subfolder, and execute_sql
>>> 
>>> ```python
>>> import os
>>> from sys import argv
>>> from pyflink.datastream import StreamExecutionEnvironment
>>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings, 
>>> DataTypes
>>> from pyflink.table.udf import udf
>>> 
>>> def read_file_content(filepath):
>>>     with open(filepath) as f:
>>>         return f.read()
>>> 
>>> @udf(input_types=[DataTypes.STRING()], 
>>> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
>>> def parse_content(content_str):
>>>     import json
>>>     res = {}
>>>     content = json.loads(content_str)
>>>     if 'postId' in content:
>>>         res['item_id'] = content['postId']
>>>     if 'lid' in content:
>>>         res['item_id'] = content['lid']
>>>     if 'param' in content and 'tag' in content['param']:
>>>         res['tag'] = content['param']['tag']
>>>     return res
>>> 
>>> CWD = os.getcwd()
>>> _, palfish_env, task = argv
>>> 
>>> VALID_PALFISH_ENVS = ['development', 'testing', 'production']
>>> if palfish_env not in VALID_PALFISH_ENVS:
>>>     raise Exception(f"{palfish_env} is not a valid env, should be one of 
>>> [{', '.join(VALID_PALFISH_ENVS)}].")
>>> 
>>> VALID_TASKS = os.listdir(f"{CWD}/tasks")
>>> if task not in VALID_TASKS:
>>>     raise Exception(f"{task} is not a valid task, should be one of [{', 
>>> '.join(VALID_TASKS)}].")
>>> 
>>> config = {
>>>     "development": {
>>>         "${generation.kafka.source.servers}": "localhost:9094",
>>>         "${generation.kafka.sink.servers}": "localhost:9094"
>>>     },
>>>     "testing": {
>>>         "${generation.kafka.source.servers}": "10.111.135.233:9092 
>>> <http://10.111.135.233:9092/>,10.111.130.11:9092 
>>> <http://10.111.130.11:9092/>,10.111.130.12:9092 
>>> <http://10.111.130.12:9092/>",
>>>         "${generation.kafka.sink.servers}": "10.111.135.233:9092 
>>> <http://10.111.135.233:9092/>,10.111.130.11:9092 
>>> <http://10.111.130.11:9092/>,10.111.130.12:9092 
>>> <http://10.111.130.12:9092/>"
>>>     },
>>>     "production": {
>>>         "${generation.kafka.source.servers}": "10.111.203.9:9092 
>>> <http://10.111.203.9:9092/>,10.111.203.10:9092 
>>> <http://10.111.203.10:9092/>,10.111.203.13:9092 
>>> <http://10.111.203.13:9092/>,10.111.204.163:9092 
>>> <http://10.111.204.163:9092/>,10.111.204.164:9092 
>>> <http://10.111.204.164:9092/>,10.111.204.165:9092 
>>> <http://10.111.204.165:9092/>",
>>>         "${generation.kafka.sink.servers}": "10.111.209.219:9092 
>>> <http://10.111.209.219:9092/>,10.111.209.220:9092 
>>> <http://10.111.209.220:9092/>,10.111.209.221:9092 
>>> <http://10.111.209.221:9092/>"
>>>     }
>>> }
>>> 
>>> FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"
>>> 
>>> source_ddl = 
>>> read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}',
>>>  config[palfish_env]['${generation.kafka.source.servers}'])
>>> sink_ddl = 
>>> read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}',
>>>  config[palfish_env]['${generation.kafka.sink.servers}'])
>>> transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')
>>> 
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>> env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
>>> t_env = 
>>> StreamTableEnvironment.create(stream_execution_environment=exec_env, 
>>> environment_settings=env_settings)
>>> 
>>> t_env.get_config().get_configuration().set_string("pipeline.jars", 
>>> f"file://{FAT_JAR_PATH}")
>>> t_env.create_temporary_function("ParseContent", parse_content)
>>> 
>>> t_env.execute_sql(source_ddl)
>>> t_env.execute_sql(sink_ddl)
>>> t_env.execute_sql(transform_dml).wait()
>>> ```
>>> 
>>> See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.
>>> 
>>> ```sql
>>> # source_ddl.sql
>>> CREATE TABLE kafka_source (
>>> `body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = 'data-report-stat-old-logtype7',
>>> 'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
>>> 'properties.group.id <http://properties.group.id/>' = 
>>> 'flink-featurepipelines',
>>> 'format' = 'json'
>>> )
>>> 
>>> # transform_ddl.sql
>>> INSERT INTO kafka_sink
>>> WITH t1 AS (
>>> SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) 
>>> content, body['log']['serverts'] server_ts
>>> FROM kafka_source
>>> ),
>>> t2 AS (
>>> SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
>>> FROM t1
>>> WHERE content['item_id'] IS NOT NULL
>>> AND content['tag'] = '点击帖子卡片'
>>> ),
>>> last_n AS (
>>> SELECT user_id, item_id, server_ts
>>> FROM (
>>> SELECT *,
>>> ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num
>>> FROM t2)
>>> WHERE row_num <= 5
>>> )
>>> SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, 
>>> LISTAGG(CAST(item_id AS STRING)) last_5_clicks
>>> FROM last_n
>>> GROUP BY user_id
>>> 
>>> # sink_ddl.sql
>>> CREATE TABLE kafka_sink (
>>>     user_id BIGINT,
>>>     datetime TIMESTAMP(3),
>>>     last_5_clicks STRING,
>>>     PRIMARY KEY (user_id) NOT ENFORCED
>>> ) WITH (
>>>     'connector' = 'upsert-kafka',
>>>     'topic' = 'aiinfra.fct.userfeature.0',
>>>     'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
>>>     'key.format' = 'json',
>>>     'value.format' = 'json'
>>> )
>>> ```
>>> 
>>> I got the error when running the PyFlink program in my testing environment 
>>> machine.
>>> 
>>> ```
>>> Caused by: java.io.EOFException
>>>     at java.io.DataInputStream.readInt(DataInputStream.java:392) 
>>> ~[?:1.8.0_261]
>>>     at 
>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91)
>>>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>>>     at 
>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87)
>>>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>>>     at 
>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36)
>>>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>>>     at 
>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124)
>>>  ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>     at 
>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107)
>>>  ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>     at 
>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46)
>>>  ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>     at 
>>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104)
>>>  ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>     at 
>>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
>>>  ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>     at 
>>> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84)
>>>  ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>> ```
>>> 
>>> Here are the full logs, see 
>>> https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc 
>>> <https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc>.
>>> 
>>> Any idea why the exception? Thanks.
>>> 
>>> Yik San
>> 
> 

Reply via email to