Hi Dian, Thank you for your help!
Best, Yik San On Fri, Mar 19, 2021 at 9:33 PM Dian Fu <dian0511...@gmail.com> wrote: > Good finding! > > I think we should handle this case more friendly as I guess this issue > should be very common for Python users since Python is dynamic language. I > have created https://issues.apache.org/jira/browse/FLINK-21876 to follow > up with this issue. > > Regards, > Dian > > 2021年3月19日 下午6:57,Xingbo Huang <hxbks...@gmail.com> 写道: > > Yes, you need to ensure that the key and value types of the Map are > determined > > Best, > Xingbo > > Yik San Chan <evan.chanyik...@gmail.com> 于2021年3月19日周五 下午3:41写道: > >> I got why regarding the simplified question - the dummy parser should >> return key(string)-value(string), otherwise it fails the result_type spec >> >> On Fri, Mar 19, 2021 at 3:37 PM Yik San Chan <evan.chanyik...@gmail.com> >> wrote: >> >>> Hi Dian, >>> >>> I simplify the question in >>> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully. >>> You can also find the updated question below: >>> >>> I have a PyFlink job that reads from a file, filter based on a >>> condition, and print. This is a `tree` view of my working directory. This >>> is the PyFlink script main.py: >>> >>> ```python >>> from pyflink.datastream import StreamExecutionEnvironment >>> from pyflink.table import StreamTableEnvironment, DataTypes >>> from pyflink.table.udf import udf >>> >>> # https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html >>> # >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html >>> >>> @udf(input_types=[DataTypes.STRING()], >>> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) >>> def parse(s): >>> import json >>> # a dummy parser >>> res = {'item_id': 123, 'tag': 'a'} >>> return res >>> >>> env = StreamExecutionEnvironment.get_execution_environment() >>> t_env = StreamTableEnvironment.create(env) >>> >>> t_env.register_function("parse", parse) >>> >>> my_source_ddl = """ >>> create table mySource ( >>> id BIGINT, >>> contentstr STRING >>> ) with ( >>> 'connector' = 'filesystem', >>> 'format' = 'json', >>> 'path' = '/tmp/input' >>> ) >>> """ >>> >>> my_sink_ddl = """ >>> create table mySink ( >>> id BIGINT >>> ) with ( >>> 'connector' = 'print' >>> ) >>> """ >>> >>> my_transform_dml = """ >>> insert into mySink >>> with t1 as ( >>> select id, parse(contentstr) as content >>> from mySource >>> ) >>> select id >>> from t1 >>> where content['item_id'] is not null >>> and content['tag'] = 'a' >>> """ >>> >>> t_env.execute_sql(my_source_ddl) >>> t_env.execute_sql(my_sink_ddl) >>> t_env.execute_sql(my_transform_dml).wait() >>> ``` >>> >>> To run the `main.py`: >>> - Ensure installing pyflink==1.12.0 in my conda env >>> - /tmp/input has a single row of content `{"id":1,"tag":"a"}` >>> >>> Then I run `main.py` and I get the exception: >>> >>> ``` >>> Traceback (most recent call last): >>> File "udf_parse.py", line 53, in <module> >>> t_env.execute_sql(my_transform_dml).wait() >>> File >>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table_result.py", >>> line 76, in wait >>> get_method(self._j_table_result, "await")() >>> File >>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", >>> line 1286, in __call__ >>> answer, self.gateway_client, self.target_id, self.name) >>> File >>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", >>> line 147, in deco >>> return f(*a, **kw) >>> File >>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/protocol.py", >>> line 328, in get_return_value >>> format(target_id, ".", name), value) >>> py4j.protocol.Py4JJavaError: An error occurred while calling o53.await. >>> : java.util.concurrent.ExecutionException: >>> org.apache.flink.table.api.TableException: Failed to wait job finish >>> at >>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >>> at >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >>> at >>> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123) >>> at >>> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >>> at >>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >>> at >>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >>> at >>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >>> at >>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >>> at >>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.flink.table.api.TableException: Failed to wait job >>> finish >>> at >>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56) >>> at >>> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350) >>> at >>> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:363) >>> at >>> org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:110) >>> at >>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> ... 1 more >>> Caused by: java.util.concurrent.ExecutionException: >>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >>> at >>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >>> at >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >>> at >>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54) >>> ... 7 more >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>> execution failed. >>> at >>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) >>> at >>> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) >>> at >>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) >>> at >>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) >>> at >>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >>> at >>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237) >>> at >>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) >>> at >>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) >>> at >>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >>> at >>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) >>> at >>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) >>> at akka.dispatch.OnComplete.internal(Future.scala:264) >>> at akka.dispatch.OnComplete.internal(Future.scala:261) >>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) >>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) >>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>> at >>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) >>> at >>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) >>> at >>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) >>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) >>> at >>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) >>> at >>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) >>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) >>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) >>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>> at >>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) >>> at >>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) >>> at >>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>> at >>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >>> at >>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) >>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) >>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed >>> by NoRestartBackoffTimeStrategy >>> at >>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) >>> at >>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) >>> at >>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) >>> at >>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) >>> at >>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) >>> at >>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669) >>> at >>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) >>> at >>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) >>> at >>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>> ... 4 more >>> Caused by: java.io.EOFException >>> at java.io.DataInputStream.readFully(DataInputStream.java:197) >>> at java.io.DataInputStream.readFully(DataInputStream.java:169) >>> at >>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:88) >>> at >>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:82) >>> at >>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:34) >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:129) >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:110) >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106) >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) >>> at >>> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:81) >>> at >>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250) >>> at >>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273) >>> at >>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:199) >>> at >>> org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitWatermark(SourceOperatorStreamTask.java:170) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.advanceToEndOfEventTime(SourceOperatorStreamTask.java:110) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:116) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >>> at java.lang.Thread.run(Thread.java:748) >>> ``` >>> >>> The issue is probably related to the udf. >>> >>> Any help? Thanks! >>> >>> Best, >>> Yik San >>> >>> On Fri, Mar 19, 2021 at 11:58 AM Yik San Chan <evan.chanyik...@gmail.com> >>> wrote: >>> >>>> Hi Dian, >>>> >>>> I am able to reproduce this issue in a much simpler setup. Let me >>>> update with the simpler reproducible example shortly. >>>> >>>> Best, >>>> Yik San >>>> >>>> On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan < >>>> evan.chanyik...@gmail.com> wrote: >>>> >>>>> Hi Dian, >>>>> >>>>> It is a good catch, though after changing to use >>>>> flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same >>>>> error. >>>>> >>>>> Best, >>>>> Yik San >>>>> >>>>> On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <dian0511...@gmail.com> >>>>> wrote: >>>>> >>>>>> >>>>>> I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. >>>>>> Does the jar files in the cluster nodes are also built with Scala 2.12? >>>>>> PyFlink package bundles jar files with Scala 2.11 by default. I’m still >>>>>> not >>>>>> sure if it’s related to this issue. However, I think this is problematic. >>>>>> Could you make sure that they are consistent? >>>>>> >>>>>> >>>>>> 2021年3月19日 上午10:40,Yik San Chan <evan.chanyik...@gmail.com> 写道: >>>>>> >>>>>> Hi Dian, >>>>>> >>>>>> The PyFlink version is 1.12.0 and the Flink version in the cluster >>>>>> nodes is also 1.12.0 >>>>>> >>>>>> $ which flink >>>>>> /data/apache/flink/flink-1.12.0/bin/flink >>>>>> >>>>>> Best, >>>>>> Yik San >>>>>> >>>>>> On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <dian0511...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> What’s the Flink version in the cluster nodes? It should matches the >>>>>>> PyFlink version. >>>>>>> >>>>>>> Regards, >>>>>>> Dian >>>>>>> >>>>>>> 2021年3月18日 下午5:01,Yik San Chan <evan.chanyik...@gmail.com> 写道: >>>>>>> >>>>>>> This question is cross-posted on StackOverflow >>>>>>> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint >>>>>>> >>>>>>> I have a PyFlink job that reads from Kafka source, transform, and >>>>>>> write to Kafka sink. This is a `tree` view of my working directory. >>>>>>> >>>>>>> ``` >>>>>>> > tree >>>>>>> . >>>>>>> ├── deps >>>>>>> │ └── flink-sql-connector-kafka_2.12-1.12.0.jar >>>>>>> ├── flink_run.sh >>>>>>> ├── main.py >>>>>>> ├── pyflink1.12.0.zip >>>>>>> └── tasks >>>>>>> └── user_last_n_clicks >>>>>>> ├── sink_ddl.sql >>>>>>> ├── source_ddl.sql >>>>>>> └── transform_dml.sql >>>>>>> ``` >>>>>>> >>>>>>> This is the `flink_run.sh`: >>>>>>> >>>>>>> ``` >>>>>>> flink run \ >>>>>>> --yarnname test-pyflink \ >>>>>>> -m yarn-cluster \ >>>>>>> -yD yarn.application.queue=tech_platform \ >>>>>>> -pyarch pyflink1.12.0.zip \ >>>>>>> -pyexec /data/software/pyflink1.12.0/bin/python \ >>>>>>> -py main.py testing user_last_n_clicks >>>>>>> ``` >>>>>>> >>>>>>> This is the `main.py`. The key logic is in: >>>>>>> - `parse_content` udf. >>>>>>> - load sql files from tasks subfolder, and execute_sql >>>>>>> >>>>>>> ```python >>>>>>> import os >>>>>>> from sys import argv >>>>>>> from pyflink.datastream import StreamExecutionEnvironment >>>>>>> from pyflink.table import StreamTableEnvironment, >>>>>>> EnvironmentSettings, DataTypes >>>>>>> from pyflink.table.udf import udf >>>>>>> >>>>>>> def read_file_content(filepath): >>>>>>> with open(filepath) as f: >>>>>>> return f.read() >>>>>>> >>>>>>> @udf(input_types=[DataTypes.STRING()], >>>>>>> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) >>>>>>> def parse_content(content_str): >>>>>>> import json >>>>>>> res = {} >>>>>>> content = json.loads(content_str) >>>>>>> if 'postId' in content: >>>>>>> res['item_id'] = content['postId'] >>>>>>> if 'lid' in content: >>>>>>> res['item_id'] = content['lid'] >>>>>>> if 'param' in content and 'tag' in content['param']: >>>>>>> res['tag'] = content['param']['tag'] >>>>>>> return res >>>>>>> >>>>>>> CWD = os.getcwd() >>>>>>> _, palfish_env, task = argv >>>>>>> >>>>>>> VALID_PALFISH_ENVS = ['development', 'testing', 'production'] >>>>>>> if palfish_env not in VALID_PALFISH_ENVS: >>>>>>> raise Exception(f"{palfish_env} is not a valid env, should be >>>>>>> one of [{', '.join(VALID_PALFISH_ENVS)}].") >>>>>>> >>>>>>> VALID_TASKS = os.listdir(f"{CWD}/tasks") >>>>>>> if task not in VALID_TASKS: >>>>>>> raise Exception(f"{task} is not a valid task, should be one of >>>>>>> [{', '.join(VALID_TASKS)}].") >>>>>>> >>>>>>> config = { >>>>>>> "development": { >>>>>>> "${generation.kafka.source.servers}": "localhost:9094", >>>>>>> "${generation.kafka.sink.servers}": "localhost:9094" >>>>>>> }, >>>>>>> "testing": { >>>>>>> "${generation.kafka.source.servers}": "10.111.135.233:9092, >>>>>>> 10.111.130.11:9092,10.111.130.12:9092", >>>>>>> "${generation.kafka.sink.servers}": "10.111.135.233:9092, >>>>>>> 10.111.130.11:9092,10.111.130.12:9092" >>>>>>> }, >>>>>>> "production": { >>>>>>> "${generation.kafka.source.servers}": "10.111.203.9:9092, >>>>>>> 10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092, >>>>>>> 10.111.204.164:9092,10.111.204.165:9092", >>>>>>> "${generation.kafka.sink.servers}": "10.111.209.219:9092, >>>>>>> 10.111.209.220:9092,10.111.209.221:9092" >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> FAT_JAR_PATH = >>>>>>> f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar" >>>>>>> >>>>>>> source_ddl = >>>>>>> read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', >>>>>>> config[palfish_env]['${generation.kafka.source.servers}']) >>>>>>> sink_ddl = >>>>>>> read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', >>>>>>> config[palfish_env]['${generation.kafka.sink.servers}']) >>>>>>> transform_dml = >>>>>>> read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql') >>>>>>> >>>>>>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>>>>>> env_settings = >>>>>>> EnvironmentSettings.Builder().use_blink_planner().build() >>>>>>> t_env = >>>>>>> StreamTableEnvironment.create(stream_execution_environment=exec_env, >>>>>>> environment_settings=env_settings) >>>>>>> >>>>>>> t_env.get_config().get_configuration().set_string("pipeline.jars", >>>>>>> f"file://{FAT_JAR_PATH}") >>>>>>> t_env.create_temporary_function("ParseContent", parse_content) >>>>>>> >>>>>>> t_env.execute_sql(source_ddl) >>>>>>> t_env.execute_sql(sink_ddl) >>>>>>> t_env.execute_sql(transform_dml).wait() >>>>>>> ``` >>>>>>> >>>>>>> See my sqls. Note the udf `ParseContent` is used in >>>>>>> `transform_dml.sql`. >>>>>>> >>>>>>> ```sql >>>>>>> # source_ddl.sql >>>>>>> CREATE TABLE kafka_source ( >>>>>>> `body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` >>>>>>> STRING>> >>>>>>> ) WITH ( >>>>>>> 'connector' = 'kafka', >>>>>>> 'topic' = 'data-report-stat-old-logtype7', >>>>>>> 'properties.bootstrap.servers' = >>>>>>> '${generation.kafka.source.servers}', >>>>>>> 'properties.group.id' = 'flink-featurepipelines', >>>>>>> 'format' = 'json' >>>>>>> ) >>>>>>> >>>>>>> # transform_ddl.sql >>>>>>> INSERT INTO kafka_sink >>>>>>> WITH t1 AS ( >>>>>>> SELECT body['log']['uid'] user_id, >>>>>>> ParseContent(body['log']['contentstr']) content, body['log']['serverts'] >>>>>>> server_ts >>>>>>> FROM kafka_source >>>>>>> ), >>>>>>> t2 AS ( >>>>>>> SELECT user_id, content['item_id'] item_id, content['tag'] tag, >>>>>>> server_ts >>>>>>> FROM t1 >>>>>>> WHERE content['item_id'] IS NOT NULL >>>>>>> AND content['tag'] = '点击帖子卡片' >>>>>>> ), >>>>>>> last_n AS ( >>>>>>> SELECT user_id, item_id, server_ts >>>>>>> FROM ( >>>>>>> SELECT *, >>>>>>> ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as >>>>>>> row_num >>>>>>> FROM t2) >>>>>>> WHERE row_num <= 5 >>>>>>> ) >>>>>>> SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, >>>>>>> LISTAGG(CAST(item_id AS STRING)) last_5_clicks >>>>>>> FROM last_n >>>>>>> GROUP BY user_id >>>>>>> >>>>>>> # sink_ddl.sql >>>>>>> CREATE TABLE kafka_sink ( >>>>>>> user_id BIGINT, >>>>>>> datetime TIMESTAMP(3), >>>>>>> last_5_clicks STRING, >>>>>>> PRIMARY KEY (user_id) NOT ENFORCED >>>>>>> ) WITH ( >>>>>>> 'connector' = 'upsert-kafka', >>>>>>> 'topic' = 'aiinfra.fct.userfeature.0', >>>>>>> 'properties.bootstrap.servers' = >>>>>>> '${generation.kafka.sink.servers}', >>>>>>> 'key.format' = 'json', >>>>>>> 'value.format' = 'json' >>>>>>> ) >>>>>>> ``` >>>>>>> >>>>>>> I got the error when running the PyFlink program in my testing >>>>>>> environment machine. >>>>>>> >>>>>>> ``` >>>>>>> Caused by: java.io.EOFException >>>>>>> at java.io.DataInputStream.readInt(DataInputStream.java:392) >>>>>>> ~[?:1.8.0_261] >>>>>>> at >>>>>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) >>>>>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) >>>>>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) >>>>>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) >>>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) >>>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) >>>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) >>>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) >>>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) >>>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>>>>>> ``` >>>>>>> >>>>>>> Here are the full logs, see >>>>>>> https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc. >>>>>>> >>>>>>> Any idea why the exception? Thanks. >>>>>>> >>>>>>> Yik San >>>>>>> >>>>>>> >>>>>>> >>>>>> >