Yes, you need to ensure that the key and value types of the Map are
determined

Best,
Xingbo

Yik San Chan <evan.chanyik...@gmail.com> 于2021年3月19日周五 下午3:41写道:

> I got why regarding the simplified question - the dummy parser should
> return key(string)-value(string), otherwise it fails the result_type spec
>
> On Fri, Mar 19, 2021 at 3:37 PM Yik San Chan <evan.chanyik...@gmail.com>
> wrote:
>
>> Hi Dian,
>>
>> I simplify the question in
>> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully.
>> You can also find the updated question below:
>>
>> I have a PyFlink job that reads from a file, filter based on a condition,
>> and print. This is a `tree` view of my working directory. This is the
>> PyFlink script main.py:
>>
>> ```python
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment, DataTypes
>> from pyflink.table.udf import udf
>>
>> # https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
>> #
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html
>>
>> @udf(input_types=[DataTypes.STRING()],
>> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
>> def parse(s):
>>   import json
>>   # a dummy parser
>>   res = {'item_id': 123, 'tag': 'a'}
>>   return res
>>
>> env = StreamExecutionEnvironment.get_execution_environment()
>> t_env = StreamTableEnvironment.create(env)
>>
>> t_env.register_function("parse", parse)
>>
>> my_source_ddl = """
>> create table mySource (
>>     id BIGINT,
>>     contentstr STRING
>> ) with (
>>     'connector' = 'filesystem',
>>     'format' = 'json',
>>     'path' = '/tmp/input'
>> )
>> """
>>
>> my_sink_ddl = """
>> create table mySink (
>>     id BIGINT
>> ) with (
>>     'connector' = 'print'
>> )
>> """
>>
>> my_transform_dml = """
>> insert into mySink
>> with t1 as (
>>     select id, parse(contentstr) as content
>>     from mySource
>> )
>> select id
>> from t1
>> where content['item_id'] is not null
>> and content['tag'] = 'a'
>> """
>>
>> t_env.execute_sql(my_source_ddl)
>> t_env.execute_sql(my_sink_ddl)
>> t_env.execute_sql(my_transform_dml).wait()
>> ```
>>
>> To run the `main.py`:
>> - Ensure installing pyflink==1.12.0 in my conda env
>> - /tmp/input has a single row of content `{"id":1,"tag":"a"}`
>>
>> Then I run `main.py` and I get the exception:
>>
>> ```
>> Traceback (most recent call last):
>>   File "udf_parse.py", line 53, in <module>
>>     t_env.execute_sql(my_transform_dml).wait()
>>   File
>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table_result.py",
>> line 76, in wait
>>     get_method(self._j_table_result, "await")()
>>   File
>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py",
>> line 1286, in __call__
>>     answer, self.gateway_client, self.target_id, self.name)
>>   File
>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>> line 147, in deco
>>     return f(*a, **kw)
>>   File
>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/protocol.py",
>> line 328, in get_return_value
>>     format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling o53.await.
>> : java.util.concurrent.ExecutionException:
>> org.apache.flink.table.api.TableException: Failed to wait job finish
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>> at
>> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123)
>> at
>> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.table.api.TableException: Failed to wait job
>> finish
>> at
>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)
>> at
>> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
>> at
>> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:363)
>> at
>> org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:110)
>> at
>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> ... 1 more
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>> at
>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
>> ... 7 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>> at
>> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at
>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
>> at
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>> at
>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>> at
>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at
>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>> at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> at
>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
>> by NoRestartBackoffTimeStrategy
>> at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
>> at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
>> at
>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> ... 4 more
>> Caused by: java.io.EOFException
>> at java.io.DataInputStream.readFully(DataInputStream.java:197)
>> at java.io.DataInputStream.readFully(DataInputStream.java:169)
>> at
>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:88)
>> at
>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:82)
>> at
>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:34)
>> at
>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:129)
>> at
>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:110)
>> at
>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46)
>> at
>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106)
>> at
>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
>> at
>> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:81)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:199)
>> at
>> org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitWatermark(SourceOperatorStreamTask.java:170)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.advanceToEndOfEventTime(SourceOperatorStreamTask.java:110)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:116)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> at java.lang.Thread.run(Thread.java:748)
>> ```
>>
>> The issue is probably related to the udf.
>>
>> Any help? Thanks!
>>
>> Best,
>> Yik San
>>
>> On Fri, Mar 19, 2021 at 11:58 AM Yik San Chan <evan.chanyik...@gmail.com>
>> wrote:
>>
>>> Hi Dian,
>>>
>>> I am able to reproduce this issue in a much simpler setup. Let me update
>>> with the simpler reproducible example shortly.
>>>
>>> Best,
>>> Yik San
>>>
>>> On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan <evan.chanyik...@gmail.com>
>>> wrote:
>>>
>>>> Hi Dian,
>>>>
>>>> It is a good catch, though after changing to use
>>>> flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same
>>>> error.
>>>>
>>>> Best,
>>>> Yik San
>>>>
>>>> On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <dian0511...@gmail.com> wrote:
>>>>
>>>>>
>>>>> I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”.
>>>>> Does the jar files in the cluster nodes are also built with Scala 2.12?
>>>>> PyFlink package bundles jar files with Scala 2.11 by default. I’m still 
>>>>> not
>>>>> sure if it’s related to this issue. However, I think this is problematic.
>>>>> Could you make sure that they are consistent?
>>>>>
>>>>>
>>>>> 2021年3月19日 上午10:40,Yik San Chan <evan.chanyik...@gmail.com> 写道:
>>>>>
>>>>> Hi Dian,
>>>>>
>>>>> The PyFlink version is 1.12.0 and the Flink version in the cluster
>>>>> nodes is also 1.12.0
>>>>>
>>>>> $ which flink
>>>>> /data/apache/flink/flink-1.12.0/bin/flink
>>>>>
>>>>> Best,
>>>>> Yik San
>>>>>
>>>>> On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <dian0511...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> What’s the Flink version in the cluster nodes? It should matches the
>>>>>> PyFlink version.
>>>>>>
>>>>>> Regards,
>>>>>> Dian
>>>>>>
>>>>>> 2021年3月18日 下午5:01,Yik San Chan <evan.chanyik...@gmail.com> 写道:
>>>>>>
>>>>>> This question is cross-posted on StackOverflow
>>>>>> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint
>>>>>>
>>>>>> I have a PyFlink job that reads from Kafka source, transform, and
>>>>>> write to Kafka sink. This is a `tree` view of my working directory.
>>>>>>
>>>>>> ```
>>>>>> > tree
>>>>>> .
>>>>>> ├── deps
>>>>>> │   └── flink-sql-connector-kafka_2.12-1.12.0.jar
>>>>>> ├── flink_run.sh
>>>>>> ├── main.py
>>>>>> ├── pyflink1.12.0.zip
>>>>>> └── tasks
>>>>>>     └── user_last_n_clicks
>>>>>>         ├── sink_ddl.sql
>>>>>>         ├── source_ddl.sql
>>>>>>         └── transform_dml.sql
>>>>>> ```
>>>>>>
>>>>>> This is the `flink_run.sh`:
>>>>>>
>>>>>> ```
>>>>>> flink run \
>>>>>> --yarnname test-pyflink \
>>>>>> -m yarn-cluster \
>>>>>> -yD yarn.application.queue=tech_platform \
>>>>>> -pyarch pyflink1.12.0.zip \
>>>>>> -pyexec /data/software/pyflink1.12.0/bin/python \
>>>>>> -py main.py testing user_last_n_clicks
>>>>>> ```
>>>>>>
>>>>>> This is the `main.py`. The key logic is in:
>>>>>> - `parse_content` udf.
>>>>>> - load sql files from tasks subfolder, and execute_sql
>>>>>>
>>>>>> ```python
>>>>>> import os
>>>>>> from sys import argv
>>>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>>>> from pyflink.table import StreamTableEnvironment,
>>>>>> EnvironmentSettings, DataTypes
>>>>>> from pyflink.table.udf import udf
>>>>>>
>>>>>> def read_file_content(filepath):
>>>>>>     with open(filepath) as f:
>>>>>>         return f.read()
>>>>>>
>>>>>> @udf(input_types=[DataTypes.STRING()],
>>>>>> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
>>>>>> def parse_content(content_str):
>>>>>>     import json
>>>>>>     res = {}
>>>>>>     content = json.loads(content_str)
>>>>>>     if 'postId' in content:
>>>>>>         res['item_id'] = content['postId']
>>>>>>     if 'lid' in content:
>>>>>>         res['item_id'] = content['lid']
>>>>>>     if 'param' in content and 'tag' in content['param']:
>>>>>>         res['tag'] = content['param']['tag']
>>>>>>     return res
>>>>>>
>>>>>> CWD = os.getcwd()
>>>>>> _, palfish_env, task = argv
>>>>>>
>>>>>> VALID_PALFISH_ENVS = ['development', 'testing', 'production']
>>>>>> if palfish_env not in VALID_PALFISH_ENVS:
>>>>>>     raise Exception(f"{palfish_env} is not a valid env, should be one
>>>>>> of [{', '.join(VALID_PALFISH_ENVS)}].")
>>>>>>
>>>>>> VALID_TASKS = os.listdir(f"{CWD}/tasks")
>>>>>> if task not in VALID_TASKS:
>>>>>>     raise Exception(f"{task} is not a valid task, should be one of
>>>>>> [{', '.join(VALID_TASKS)}].")
>>>>>>
>>>>>> config = {
>>>>>>     "development": {
>>>>>>         "${generation.kafka.source.servers}": "localhost:9094",
>>>>>>         "${generation.kafka.sink.servers}": "localhost:9094"
>>>>>>     },
>>>>>>     "testing": {
>>>>>>         "${generation.kafka.source.servers}": "10.111.135.233:9092,
>>>>>> 10.111.130.11:9092,10.111.130.12:9092",
>>>>>>         "${generation.kafka.sink.servers}": "10.111.135.233:9092,
>>>>>> 10.111.130.11:9092,10.111.130.12:9092"
>>>>>>     },
>>>>>>     "production": {
>>>>>>         "${generation.kafka.source.servers}": "10.111.203.9:9092,
>>>>>> 10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,
>>>>>> 10.111.204.164:9092,10.111.204.165:9092",
>>>>>>         "${generation.kafka.sink.servers}": "10.111.209.219:9092,
>>>>>> 10.111.209.220:9092,10.111.209.221:9092"
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"
>>>>>>
>>>>>> source_ddl =
>>>>>> read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}',
>>>>>> config[palfish_env]['${generation.kafka.source.servers}'])
>>>>>> sink_ddl =
>>>>>> read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}',
>>>>>> config[palfish_env]['${generation.kafka.sink.servers}'])
>>>>>> transform_dml =
>>>>>> read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')
>>>>>>
>>>>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>>>> env_settings =
>>>>>> EnvironmentSettings.Builder().use_blink_planner().build()
>>>>>> t_env =
>>>>>> StreamTableEnvironment.create(stream_execution_environment=exec_env,
>>>>>> environment_settings=env_settings)
>>>>>>
>>>>>> t_env.get_config().get_configuration().set_string("pipeline.jars",
>>>>>> f"file://{FAT_JAR_PATH}")
>>>>>> t_env.create_temporary_function("ParseContent", parse_content)
>>>>>>
>>>>>> t_env.execute_sql(source_ddl)
>>>>>> t_env.execute_sql(sink_ddl)
>>>>>> t_env.execute_sql(transform_dml).wait()
>>>>>> ```
>>>>>>
>>>>>> See my sqls. Note the udf `ParseContent` is used in
>>>>>> `transform_dml.sql`.
>>>>>>
>>>>>> ```sql
>>>>>> # source_ddl.sql
>>>>>> CREATE TABLE kafka_source (
>>>>>> `body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr`
>>>>>> STRING>>
>>>>>> ) WITH (
>>>>>> 'connector' = 'kafka',
>>>>>> 'topic' = 'data-report-stat-old-logtype7',
>>>>>> 'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
>>>>>> 'properties.group.id' = 'flink-featurepipelines',
>>>>>> 'format' = 'json'
>>>>>> )
>>>>>>
>>>>>> # transform_ddl.sql
>>>>>> INSERT INTO kafka_sink
>>>>>> WITH t1 AS (
>>>>>> SELECT body['log']['uid'] user_id,
>>>>>> ParseContent(body['log']['contentstr']) content, body['log']['serverts']
>>>>>> server_ts
>>>>>> FROM kafka_source
>>>>>> ),
>>>>>> t2 AS (
>>>>>> SELECT user_id, content['item_id'] item_id, content['tag'] tag,
>>>>>> server_ts
>>>>>> FROM t1
>>>>>> WHERE content['item_id'] IS NOT NULL
>>>>>> AND content['tag'] = '点击帖子卡片'
>>>>>> ),
>>>>>> last_n AS (
>>>>>> SELECT user_id, item_id, server_ts
>>>>>> FROM (
>>>>>> SELECT *,
>>>>>> ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as
>>>>>> row_num
>>>>>> FROM t2)
>>>>>> WHERE row_num <= 5
>>>>>> )
>>>>>> SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime,
>>>>>> LISTAGG(CAST(item_id AS STRING)) last_5_clicks
>>>>>> FROM last_n
>>>>>> GROUP BY user_id
>>>>>>
>>>>>> # sink_ddl.sql
>>>>>> CREATE TABLE kafka_sink (
>>>>>>     user_id BIGINT,
>>>>>>     datetime TIMESTAMP(3),
>>>>>>     last_5_clicks STRING,
>>>>>>     PRIMARY KEY (user_id) NOT ENFORCED
>>>>>> ) WITH (
>>>>>>     'connector' = 'upsert-kafka',
>>>>>>     'topic' = 'aiinfra.fct.userfeature.0',
>>>>>>     'properties.bootstrap.servers' =
>>>>>> '${generation.kafka.sink.servers}',
>>>>>>     'key.format' = 'json',
>>>>>>     'value.format' = 'json'
>>>>>> )
>>>>>> ```
>>>>>>
>>>>>> I got the error when running the PyFlink program in my testing
>>>>>> environment machine.
>>>>>>
>>>>>> ```
>>>>>> Caused by: java.io.EOFException
>>>>>>     at java.io.DataInputStream.readInt(DataInputStream.java:392)
>>>>>> ~[?:1.8.0_261]
>>>>>>     at
>>>>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91)
>>>>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>>>>>>     at
>>>>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87)
>>>>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>>>>>>     at
>>>>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36)
>>>>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>>>>>>     at
>>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124)
>>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>>>>     at
>>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107)
>>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>>>>     at
>>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46)
>>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>>>>     at
>>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104)
>>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>>>>     at
>>>>>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
>>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>>>>     at
>>>>>> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84)
>>>>>> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>>>> ```
>>>>>>
>>>>>> Here are the full logs, see
>>>>>> https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.
>>>>>>
>>>>>> Any idea why the exception? Thanks.
>>>>>>
>>>>>> Yik San
>>>>>>
>>>>>>
>>>>>>
>>>>>

Reply via email to