Yes, you need to ensure that the key and value types of the Map are determined
Best, Xingbo Yik San Chan <evan.chanyik...@gmail.com> 于2021年3月19日周五 下午3:41写道: > I got why regarding the simplified question - the dummy parser should > return key(string)-value(string), otherwise it fails the result_type spec > > On Fri, Mar 19, 2021 at 3:37 PM Yik San Chan <evan.chanyik...@gmail.com> > wrote: > >> Hi Dian, >> >> I simplify the question in >> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully. >> You can also find the updated question below: >> >> I have a PyFlink job that reads from a file, filter based on a condition, >> and print. This is a `tree` view of my working directory. This is the >> PyFlink script main.py: >> >> ```python >> from pyflink.datastream import StreamExecutionEnvironment >> from pyflink.table import StreamTableEnvironment, DataTypes >> from pyflink.table.udf import udf >> >> # https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html >> # >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html >> >> @udf(input_types=[DataTypes.STRING()], >> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) >> def parse(s): >> import json >> # a dummy parser >> res = {'item_id': 123, 'tag': 'a'} >> return res >> >> env = StreamExecutionEnvironment.get_execution_environment() >> t_env = StreamTableEnvironment.create(env) >> >> t_env.register_function("parse", parse) >> >> my_source_ddl = """ >> create table mySource ( >> id BIGINT, >> contentstr STRING >> ) with ( >> 'connector' = 'filesystem', >> 'format' = 'json', >> 'path' = '/tmp/input' >> ) >> """ >> >> my_sink_ddl = """ >> create table mySink ( >> id BIGINT >> ) with ( >> 'connector' = 'print' >> ) >> """ >> >> my_transform_dml = """ >> insert into mySink >> with t1 as ( >> select id, parse(contentstr) as content >> from mySource >> ) >> select id >> from t1 >> where content['item_id'] is not null >> and content['tag'] = 'a' >> """ >> >> t_env.execute_sql(my_source_ddl) >> t_env.execute_sql(my_sink_ddl) >> t_env.execute_sql(my_transform_dml).wait() >> ``` >> >> To run the `main.py`: >> - Ensure installing pyflink==1.12.0 in my conda env >> - /tmp/input has a single row of content `{"id":1,"tag":"a"}` >> >> Then I run `main.py` and I get the exception: >> >> ``` >> Traceback (most recent call last): >> File "udf_parse.py", line 53, in <module> >> t_env.execute_sql(my_transform_dml).wait() >> File >> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table_result.py", >> line 76, in wait >> get_method(self._j_table_result, "await")() >> File >> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", >> line 1286, in __call__ >> answer, self.gateway_client, self.target_id, self.name) >> File >> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", >> line 147, in deco >> return f(*a, **kw) >> File >> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/protocol.py", >> line 328, in get_return_value >> format(target_id, ".", name), value) >> py4j.protocol.Py4JJavaError: An error occurred while calling o53.await. >> : java.util.concurrent.ExecutionException: >> org.apache.flink.table.api.TableException: Failed to wait job finish >> at >> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >> at >> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123) >> at >> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >> at >> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >> at >> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >> at >> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >> at >> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.flink.table.api.TableException: Failed to wait job >> finish >> at >> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56) >> at >> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350) >> at >> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:363) >> at >> org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:110) >> at >> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> ... 1 more >> Caused by: java.util.concurrent.ExecutionException: >> org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >> at >> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >> at >> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54) >> ... 7 more >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >> execution failed. >> at >> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) >> at >> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) >> at >> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) >> at >> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) >> at >> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >> at >> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) >> at >> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237) >> at >> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) >> at >> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) >> at >> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >> at >> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) >> at >> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) >> at akka.dispatch.OnComplete.internal(Future.scala:264) >> at akka.dispatch.OnComplete.internal(Future.scala:261) >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >> at >> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) >> at >> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) >> at >> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) >> at >> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) >> at >> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) >> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) >> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >> at >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) >> at >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) >> at >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >> at >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >> at >> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) >> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed >> by NoRestartBackoffTimeStrategy >> at >> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) >> at >> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) >> at >> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669) >> at >> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) >> at >> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >> ... 4 more >> Caused by: java.io.EOFException >> at java.io.DataInputStream.readFully(DataInputStream.java:197) >> at java.io.DataInputStream.readFully(DataInputStream.java:169) >> at >> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:88) >> at >> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:82) >> at >> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:34) >> at >> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:129) >> at >> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:110) >> at >> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) >> at >> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106) >> at >> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) >> at >> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:81) >> at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250) >> at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273) >> at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:199) >> at >> org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123) >> at >> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitWatermark(SourceOperatorStreamTask.java:170) >> at >> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.advanceToEndOfEventTime(SourceOperatorStreamTask.java:110) >> at >> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:116) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >> at java.lang.Thread.run(Thread.java:748) >> ``` >> >> The issue is probably related to the udf. >> >> Any help? Thanks! >> >> Best, >> Yik San >> >> On Fri, Mar 19, 2021 at 11:58 AM Yik San Chan <evan.chanyik...@gmail.com> >> wrote: >> >>> Hi Dian, >>> >>> I am able to reproduce this issue in a much simpler setup. Let me update >>> with the simpler reproducible example shortly. >>> >>> Best, >>> Yik San >>> >>> On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan <evan.chanyik...@gmail.com> >>> wrote: >>> >>>> Hi Dian, >>>> >>>> It is a good catch, though after changing to use >>>> flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same >>>> error. >>>> >>>> Best, >>>> Yik San >>>> >>>> On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <dian0511...@gmail.com> wrote: >>>> >>>>> >>>>> I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. >>>>> Does the jar files in the cluster nodes are also built with Scala 2.12? >>>>> PyFlink package bundles jar files with Scala 2.11 by default. I’m still >>>>> not >>>>> sure if it’s related to this issue. However, I think this is problematic. >>>>> Could you make sure that they are consistent? >>>>> >>>>> >>>>> 2021年3月19日 上午10:40,Yik San Chan <evan.chanyik...@gmail.com> 写道: >>>>> >>>>> Hi Dian, >>>>> >>>>> The PyFlink version is 1.12.0 and the Flink version in the cluster >>>>> nodes is also 1.12.0 >>>>> >>>>> $ which flink >>>>> /data/apache/flink/flink-1.12.0/bin/flink >>>>> >>>>> Best, >>>>> Yik San >>>>> >>>>> On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <dian0511...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> What’s the Flink version in the cluster nodes? It should matches the >>>>>> PyFlink version. >>>>>> >>>>>> Regards, >>>>>> Dian >>>>>> >>>>>> 2021年3月18日 下午5:01,Yik San Chan <evan.chanyik...@gmail.com> 写道: >>>>>> >>>>>> This question is cross-posted on StackOverflow >>>>>> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint >>>>>> >>>>>> I have a PyFlink job that reads from Kafka source, transform, and >>>>>> write to Kafka sink. This is a `tree` view of my working directory. >>>>>> >>>>>> ``` >>>>>> > tree >>>>>> . >>>>>> ├── deps >>>>>> │ └── flink-sql-connector-kafka_2.12-1.12.0.jar >>>>>> ├── flink_run.sh >>>>>> ├── main.py >>>>>> ├── pyflink1.12.0.zip >>>>>> └── tasks >>>>>> └── user_last_n_clicks >>>>>> ├── sink_ddl.sql >>>>>> ├── source_ddl.sql >>>>>> └── transform_dml.sql >>>>>> ``` >>>>>> >>>>>> This is the `flink_run.sh`: >>>>>> >>>>>> ``` >>>>>> flink run \ >>>>>> --yarnname test-pyflink \ >>>>>> -m yarn-cluster \ >>>>>> -yD yarn.application.queue=tech_platform \ >>>>>> -pyarch pyflink1.12.0.zip \ >>>>>> -pyexec /data/software/pyflink1.12.0/bin/python \ >>>>>> -py main.py testing user_last_n_clicks >>>>>> ``` >>>>>> >>>>>> This is the `main.py`. The key logic is in: >>>>>> - `parse_content` udf. >>>>>> - load sql files from tasks subfolder, and execute_sql >>>>>> >>>>>> ```python >>>>>> import os >>>>>> from sys import argv >>>>>> from pyflink.datastream import StreamExecutionEnvironment >>>>>> from pyflink.table import StreamTableEnvironment, >>>>>> EnvironmentSettings, DataTypes >>>>>> from pyflink.table.udf import udf >>>>>> >>>>>> def read_file_content(filepath): >>>>>> with open(filepath) as f: >>>>>> return f.read() >>>>>> >>>>>> @udf(input_types=[DataTypes.STRING()], >>>>>> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) >>>>>> def parse_content(content_str): >>>>>> import json >>>>>> res = {} >>>>>> content = json.loads(content_str) >>>>>> if 'postId' in content: >>>>>> res['item_id'] = content['postId'] >>>>>> if 'lid' in content: >>>>>> res['item_id'] = content['lid'] >>>>>> if 'param' in content and 'tag' in content['param']: >>>>>> res['tag'] = content['param']['tag'] >>>>>> return res >>>>>> >>>>>> CWD = os.getcwd() >>>>>> _, palfish_env, task = argv >>>>>> >>>>>> VALID_PALFISH_ENVS = ['development', 'testing', 'production'] >>>>>> if palfish_env not in VALID_PALFISH_ENVS: >>>>>> raise Exception(f"{palfish_env} is not a valid env, should be one >>>>>> of [{', '.join(VALID_PALFISH_ENVS)}].") >>>>>> >>>>>> VALID_TASKS = os.listdir(f"{CWD}/tasks") >>>>>> if task not in VALID_TASKS: >>>>>> raise Exception(f"{task} is not a valid task, should be one of >>>>>> [{', '.join(VALID_TASKS)}].") >>>>>> >>>>>> config = { >>>>>> "development": { >>>>>> "${generation.kafka.source.servers}": "localhost:9094", >>>>>> "${generation.kafka.sink.servers}": "localhost:9094" >>>>>> }, >>>>>> "testing": { >>>>>> "${generation.kafka.source.servers}": "10.111.135.233:9092, >>>>>> 10.111.130.11:9092,10.111.130.12:9092", >>>>>> "${generation.kafka.sink.servers}": "10.111.135.233:9092, >>>>>> 10.111.130.11:9092,10.111.130.12:9092" >>>>>> }, >>>>>> "production": { >>>>>> "${generation.kafka.source.servers}": "10.111.203.9:9092, >>>>>> 10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092, >>>>>> 10.111.204.164:9092,10.111.204.165:9092", >>>>>> "${generation.kafka.sink.servers}": "10.111.209.219:9092, >>>>>> 10.111.209.220:9092,10.111.209.221:9092" >>>>>> } >>>>>> } >>>>>> >>>>>> FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar" >>>>>> >>>>>> source_ddl = >>>>>> read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', >>>>>> config[palfish_env]['${generation.kafka.source.servers}']) >>>>>> sink_ddl = >>>>>> read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', >>>>>> config[palfish_env]['${generation.kafka.sink.servers}']) >>>>>> transform_dml = >>>>>> read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql') >>>>>> >>>>>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>>>>> env_settings = >>>>>> EnvironmentSettings.Builder().use_blink_planner().build() >>>>>> t_env = >>>>>> StreamTableEnvironment.create(stream_execution_environment=exec_env, >>>>>> environment_settings=env_settings) >>>>>> >>>>>> t_env.get_config().get_configuration().set_string("pipeline.jars", >>>>>> f"file://{FAT_JAR_PATH}") >>>>>> t_env.create_temporary_function("ParseContent", parse_content) >>>>>> >>>>>> t_env.execute_sql(source_ddl) >>>>>> t_env.execute_sql(sink_ddl) >>>>>> t_env.execute_sql(transform_dml).wait() >>>>>> ``` >>>>>> >>>>>> See my sqls. Note the udf `ParseContent` is used in >>>>>> `transform_dml.sql`. >>>>>> >>>>>> ```sql >>>>>> # source_ddl.sql >>>>>> CREATE TABLE kafka_source ( >>>>>> `body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` >>>>>> STRING>> >>>>>> ) WITH ( >>>>>> 'connector' = 'kafka', >>>>>> 'topic' = 'data-report-stat-old-logtype7', >>>>>> 'properties.bootstrap.servers' = '${generation.kafka.source.servers}', >>>>>> 'properties.group.id' = 'flink-featurepipelines', >>>>>> 'format' = 'json' >>>>>> ) >>>>>> >>>>>> # transform_ddl.sql >>>>>> INSERT INTO kafka_sink >>>>>> WITH t1 AS ( >>>>>> SELECT body['log']['uid'] user_id, >>>>>> ParseContent(body['log']['contentstr']) content, body['log']['serverts'] >>>>>> server_ts >>>>>> FROM kafka_source >>>>>> ), >>>>>> t2 AS ( >>>>>> SELECT user_id, content['item_id'] item_id, content['tag'] tag, >>>>>> server_ts >>>>>> FROM t1 >>>>>> WHERE content['item_id'] IS NOT NULL >>>>>> AND content['tag'] = '点击帖子卡片' >>>>>> ), >>>>>> last_n AS ( >>>>>> SELECT user_id, item_id, server_ts >>>>>> FROM ( >>>>>> SELECT *, >>>>>> ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as >>>>>> row_num >>>>>> FROM t2) >>>>>> WHERE row_num <= 5 >>>>>> ) >>>>>> SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, >>>>>> LISTAGG(CAST(item_id AS STRING)) last_5_clicks >>>>>> FROM last_n >>>>>> GROUP BY user_id >>>>>> >>>>>> # sink_ddl.sql >>>>>> CREATE TABLE kafka_sink ( >>>>>> user_id BIGINT, >>>>>> datetime TIMESTAMP(3), >>>>>> last_5_clicks STRING, >>>>>> PRIMARY KEY (user_id) NOT ENFORCED >>>>>> ) WITH ( >>>>>> 'connector' = 'upsert-kafka', >>>>>> 'topic' = 'aiinfra.fct.userfeature.0', >>>>>> 'properties.bootstrap.servers' = >>>>>> '${generation.kafka.sink.servers}', >>>>>> 'key.format' = 'json', >>>>>> 'value.format' = 'json' >>>>>> ) >>>>>> ``` >>>>>> >>>>>> I got the error when running the PyFlink program in my testing >>>>>> environment machine. >>>>>> >>>>>> ``` >>>>>> Caused by: java.io.EOFException >>>>>> at java.io.DataInputStream.readInt(DataInputStream.java:392) >>>>>> ~[?:1.8.0_261] >>>>>> at >>>>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) >>>>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>>>>> at >>>>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) >>>>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>>>>> at >>>>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) >>>>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>>>>> at >>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) >>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>>>> at >>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) >>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>>>> at >>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) >>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>>>> at >>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) >>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>>>> at >>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) >>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>>>> at >>>>>> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) >>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>>>> ``` >>>>>> >>>>>> Here are the full logs, see >>>>>> https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc. >>>>>> >>>>>> Any idea why the exception? Thanks. >>>>>> >>>>>> Yik San >>>>>> >>>>>> >>>>>> >>>>>