Good finding! I think we should handle this case more friendly as I guess this issue should be very common for Python users since Python is dynamic language. I have created https://issues.apache.org/jira/browse/FLINK-21876 <https://issues.apache.org/jira/browse/FLINK-21876> to follow up with this issue.
Regards, Dian > 2021年3月19日 下午6:57,Xingbo Huang <hxbks...@gmail.com> 写道: > > Yes, you need to ensure that the key and value types of the Map are determined > > Best, > Xingbo > > Yik San Chan <evan.chanyik...@gmail.com <mailto:evan.chanyik...@gmail.com>> > 于2021年3月19日周五 下午3:41写道: > I got why regarding the simplified question - the dummy parser should return > key(string)-value(string), otherwise it fails the result_type spec > > On Fri, Mar 19, 2021 at 3:37 PM Yik San Chan <evan.chanyik...@gmail.com > <mailto:evan.chanyik...@gmail.com>> wrote: > Hi Dian, > > I simplify the question in > https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully > > <https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully>. > You can also find the updated question below: > > I have a PyFlink job that reads from a file, filter based on a condition, and > print. This is a `tree` view of my working directory. This is the PyFlink > script main.py: > > ```python > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, DataTypes > from pyflink.table.udf import udf > > # https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html > <https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html> > # > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html > > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html> > > @udf(input_types=[DataTypes.STRING()], > result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) > def parse(s): > import json > # a dummy parser > res = {'item_id': 123, 'tag': 'a'} > return res > > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env) > > t_env.register_function("parse", parse) > > my_source_ddl = """ > create table mySource ( > id BIGINT, > contentstr STRING > ) with ( > 'connector' = 'filesystem', > 'format' = 'json', > 'path' = '/tmp/input' > ) > """ > > my_sink_ddl = """ > create table mySink ( > id BIGINT > ) with ( > 'connector' = 'print' > ) > """ > > my_transform_dml = """ > insert into mySink > with t1 as ( > select id, parse(contentstr) as content > from mySource > ) > select id > from t1 > where content['item_id'] is not null > and content['tag'] = 'a' > """ > > t_env.execute_sql(my_source_ddl) > t_env.execute_sql(my_sink_ddl) > t_env.execute_sql(my_transform_dml).wait() > ``` > > To run the `main.py`: > - Ensure installing pyflink==1.12.0 in my conda env > - /tmp/input has a single row of content `{"id":1,"tag":"a"}` > > Then I run `main.py` and I get the exception: > > ``` > Traceback (most recent call last): > File "udf_parse.py", line 53, in <module> > t_env.execute_sql(my_transform_dml).wait() > File > "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table_result.py", > line 76, in wait > get_method(self._j_table_result, "await")() > File > "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name > <http://self.name/>) > File > "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", > line 147, in deco > return f(*a, **kw) > File > "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/protocol.py", > line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o53.await. > : java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123) > at > org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.table.api.TableException: Failed to wait job > finish > at > org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56) > at > org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350) > at > org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:363) > at > org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:110) > at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54) > ... 7 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) > at akka.dispatch.OnComplete.internal(Future.scala:264) > at akka.dispatch.OnComplete.internal(Future.scala:261) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) > at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669) > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > ... 4 more > Caused by: java.io.EOFException > at java.io.DataInputStream.readFully(DataInputStream.java:197) > at java.io.DataInputStream.readFully(DataInputStream.java:169) > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:88) > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:82) > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:34) > at > org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:129) > at > org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:110) > at > org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) > at > org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106) > at > org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) > at > org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:81) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:199) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123) > at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitWatermark(SourceOperatorStreamTask.java:170) > at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.advanceToEndOfEventTime(SourceOperatorStreamTask.java:110) > at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:116) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.lang.Thread.run(Thread.java:748) > ``` > > The issue is probably related to the udf. > > Any help? Thanks! > > Best, > Yik San > > On Fri, Mar 19, 2021 at 11:58 AM Yik San Chan <evan.chanyik...@gmail.com > <mailto:evan.chanyik...@gmail.com>> wrote: > Hi Dian, > > I am able to reproduce this issue in a much simpler setup. Let me update with > the simpler reproducible example shortly. > > Best, > Yik San > > On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan <evan.chanyik...@gmail.com > <mailto:evan.chanyik...@gmail.com>> wrote: > Hi Dian, > > It is a good catch, though after changing to use > flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same error. > > Best, > Yik San > > On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <dian0511...@gmail.com > <mailto:dian0511...@gmail.com>> wrote: > > I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does the > jar files in the cluster nodes are also built with Scala 2.12? PyFlink > package bundles jar files with Scala 2.11 by default. I’m still not sure if > it’s related to this issue. However, I think this is problematic. Could you > make sure that they are consistent? > > >> 2021年3月19日 上午10:40,Yik San Chan <evan.chanyik...@gmail.com >> <mailto:evan.chanyik...@gmail.com>> 写道: >> >> Hi Dian, >> >> The PyFlink version is 1.12.0 and the Flink version in the cluster nodes is >> also 1.12.0 >> >> $ which flink >> /data/apache/flink/flink-1.12.0/bin/flink >> >> Best, >> Yik San >> >> On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <dian0511...@gmail.com >> <mailto:dian0511...@gmail.com>> wrote: >> Hi, >> >> What’s the Flink version in the cluster nodes? It should matches the PyFlink >> version. >> >> Regards, >> Dian >> >>> 2021年3月18日 下午5:01,Yik San Chan <evan.chanyik...@gmail.com >>> <mailto:evan.chanyik...@gmail.com>> 写道: >>> >>> This question is cross-posted on StackOverflow >>> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint >>> >>> <https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint> >>> >>> I have a PyFlink job that reads from Kafka source, transform, and write to >>> Kafka sink. This is a `tree` view of my working directory. >>> >>> ``` >>> > tree >>> . >>> ├── deps >>> │ └── flink-sql-connector-kafka_2.12-1.12.0.jar >>> ├── flink_run.sh >>> ├── main.py >>> ├── pyflink1.12.0.zip >>> └── tasks >>> └── user_last_n_clicks >>> ├── sink_ddl.sql >>> ├── source_ddl.sql >>> └── transform_dml.sql >>> ``` >>> >>> This is the `flink_run.sh`: >>> >>> ``` >>> flink run \ >>> --yarnname test-pyflink \ >>> -m yarn-cluster \ >>> -yD yarn.application.queue=tech_platform \ >>> -pyarch pyflink1.12.0.zip \ >>> -pyexec /data/software/pyflink1.12.0/bin/python \ >>> -py main.py testing user_last_n_clicks >>> ``` >>> >>> This is the `main.py`. The key logic is in: >>> - `parse_content` udf. >>> - load sql files from tasks subfolder, and execute_sql >>> >>> ```python >>> import os >>> from sys import argv >>> from pyflink.datastream import StreamExecutionEnvironment >>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings, >>> DataTypes >>> from pyflink.table.udf import udf >>> >>> def read_file_content(filepath): >>> with open(filepath) as f: >>> return f.read() >>> >>> @udf(input_types=[DataTypes.STRING()], >>> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) >>> def parse_content(content_str): >>> import json >>> res = {} >>> content = json.loads(content_str) >>> if 'postId' in content: >>> res['item_id'] = content['postId'] >>> if 'lid' in content: >>> res['item_id'] = content['lid'] >>> if 'param' in content and 'tag' in content['param']: >>> res['tag'] = content['param']['tag'] >>> return res >>> >>> CWD = os.getcwd() >>> _, palfish_env, task = argv >>> >>> VALID_PALFISH_ENVS = ['development', 'testing', 'production'] >>> if palfish_env not in VALID_PALFISH_ENVS: >>> raise Exception(f"{palfish_env} is not a valid env, should be one of >>> [{', '.join(VALID_PALFISH_ENVS)}].") >>> >>> VALID_TASKS = os.listdir(f"{CWD}/tasks") >>> if task not in VALID_TASKS: >>> raise Exception(f"{task} is not a valid task, should be one of [{', >>> '.join(VALID_TASKS)}].") >>> >>> config = { >>> "development": { >>> "${generation.kafka.source.servers}": "localhost:9094", >>> "${generation.kafka.sink.servers}": "localhost:9094" >>> }, >>> "testing": { >>> "${generation.kafka.source.servers}": "10.111.135.233:9092 >>> <http://10.111.135.233:9092/>,10.111.130.11:9092 >>> <http://10.111.130.11:9092/>,10.111.130.12:9092 >>> <http://10.111.130.12:9092/>", >>> "${generation.kafka.sink.servers}": "10.111.135.233:9092 >>> <http://10.111.135.233:9092/>,10.111.130.11:9092 >>> <http://10.111.130.11:9092/>,10.111.130.12:9092 >>> <http://10.111.130.12:9092/>" >>> }, >>> "production": { >>> "${generation.kafka.source.servers}": "10.111.203.9:9092 >>> <http://10.111.203.9:9092/>,10.111.203.10:9092 >>> <http://10.111.203.10:9092/>,10.111.203.13:9092 >>> <http://10.111.203.13:9092/>,10.111.204.163:9092 >>> <http://10.111.204.163:9092/>,10.111.204.164:9092 >>> <http://10.111.204.164:9092/>,10.111.204.165:9092 >>> <http://10.111.204.165:9092/>", >>> "${generation.kafka.sink.servers}": "10.111.209.219:9092 >>> <http://10.111.209.219:9092/>,10.111.209.220:9092 >>> <http://10.111.209.220:9092/>,10.111.209.221:9092 >>> <http://10.111.209.221:9092/>" >>> } >>> } >>> >>> FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar" >>> >>> source_ddl = >>> read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', >>> config[palfish_env]['${generation.kafka.source.servers}']) >>> sink_ddl = >>> read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', >>> config[palfish_env]['${generation.kafka.sink.servers}']) >>> transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql') >>> >>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>> env_settings = EnvironmentSettings.Builder().use_blink_planner().build() >>> t_env = >>> StreamTableEnvironment.create(stream_execution_environment=exec_env, >>> environment_settings=env_settings) >>> >>> t_env.get_config().get_configuration().set_string("pipeline.jars", >>> f"file://{FAT_JAR_PATH}") >>> t_env.create_temporary_function("ParseContent", parse_content) >>> >>> t_env.execute_sql(source_ddl) >>> t_env.execute_sql(sink_ddl) >>> t_env.execute_sql(transform_dml).wait() >>> ``` >>> >>> See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`. >>> >>> ```sql >>> # source_ddl.sql >>> CREATE TABLE kafka_source ( >>> `body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>> >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'data-report-stat-old-logtype7', >>> 'properties.bootstrap.servers' = '${generation.kafka.source.servers}', >>> 'properties.group.id <http://properties.group.id/>' = >>> 'flink-featurepipelines', >>> 'format' = 'json' >>> ) >>> >>> # transform_ddl.sql >>> INSERT INTO kafka_sink >>> WITH t1 AS ( >>> SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) >>> content, body['log']['serverts'] server_ts >>> FROM kafka_source >>> ), >>> t2 AS ( >>> SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts >>> FROM t1 >>> WHERE content['item_id'] IS NOT NULL >>> AND content['tag'] = '点击帖子卡片' >>> ), >>> last_n AS ( >>> SELECT user_id, item_id, server_ts >>> FROM ( >>> SELECT *, >>> ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num >>> FROM t2) >>> WHERE row_num <= 5 >>> ) >>> SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, >>> LISTAGG(CAST(item_id AS STRING)) last_5_clicks >>> FROM last_n >>> GROUP BY user_id >>> >>> # sink_ddl.sql >>> CREATE TABLE kafka_sink ( >>> user_id BIGINT, >>> datetime TIMESTAMP(3), >>> last_5_clicks STRING, >>> PRIMARY KEY (user_id) NOT ENFORCED >>> ) WITH ( >>> 'connector' = 'upsert-kafka', >>> 'topic' = 'aiinfra.fct.userfeature.0', >>> 'properties.bootstrap.servers' = '${generation.kafka.sink.servers}', >>> 'key.format' = 'json', >>> 'value.format' = 'json' >>> ) >>> ``` >>> >>> I got the error when running the PyFlink program in my testing environment >>> machine. >>> >>> ``` >>> Caused by: java.io.EOFException >>> at java.io.DataInputStream.readInt(DataInputStream.java:392) >>> ~[?:1.8.0_261] >>> at >>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) >>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) >>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) >>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) >>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) >>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) >>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) >>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) >>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) >>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>> ``` >>> >>> Here are the full logs, see >>> https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc >>> <https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc>. >>> >>> Any idea why the exception? Thanks. >>> >>> Yik San >> >