Hi Dian,

The PyFlink version is 1.12.0 and the Flink version in the cluster nodes is
also 1.12.0

$ which flink
/data/apache/flink/flink-1.12.0/bin/flink

Best,
Yik San

On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <dian0511...@gmail.com> wrote:

> Hi,
>
> What’s the Flink version in the cluster nodes? It should matches the
> PyFlink version.
>
> Regards,
> Dian
>
> 2021年3月18日 下午5:01,Yik San Chan <evan.chanyik...@gmail.com> 写道:
>
> This question is cross-posted on StackOverflow
> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint
>
> I have a PyFlink job that reads from Kafka source, transform, and write to
> Kafka sink. This is a `tree` view of my working directory.
>
> ```
> > tree
> .
> ├── deps
> │   └── flink-sql-connector-kafka_2.12-1.12.0.jar
> ├── flink_run.sh
> ├── main.py
> ├── pyflink1.12.0.zip
> └── tasks
>     └── user_last_n_clicks
>         ├── sink_ddl.sql
>         ├── source_ddl.sql
>         └── transform_dml.sql
> ```
>
> This is the `flink_run.sh`:
>
> ```
> flink run \
> --yarnname test-pyflink \
> -m yarn-cluster \
> -yD yarn.application.queue=tech_platform \
> -pyarch pyflink1.12.0.zip \
> -pyexec /data/software/pyflink1.12.0/bin/python \
> -py main.py testing user_last_n_clicks
> ```
>
> This is the `main.py`. The key logic is in:
> - `parse_content` udf.
> - load sql files from tasks subfolder, and execute_sql
>
> ```python
> import os
> from sys import argv
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
> DataTypes
> from pyflink.table.udf import udf
>
> def read_file_content(filepath):
>     with open(filepath) as f:
>         return f.read()
>
> @udf(input_types=[DataTypes.STRING()],
> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
> def parse_content(content_str):
>     import json
>     res = {}
>     content = json.loads(content_str)
>     if 'postId' in content:
>         res['item_id'] = content['postId']
>     if 'lid' in content:
>         res['item_id'] = content['lid']
>     if 'param' in content and 'tag' in content['param']:
>         res['tag'] = content['param']['tag']
>     return res
>
> CWD = os.getcwd()
> _, palfish_env, task = argv
>
> VALID_PALFISH_ENVS = ['development', 'testing', 'production']
> if palfish_env not in VALID_PALFISH_ENVS:
>     raise Exception(f"{palfish_env} is not a valid env, should be one of
> [{', '.join(VALID_PALFISH_ENVS)}].")
>
> VALID_TASKS = os.listdir(f"{CWD}/tasks")
> if task not in VALID_TASKS:
>     raise Exception(f"{task} is not a valid task, should be one of [{',
> '.join(VALID_TASKS)}].")
>
> config = {
>     "development": {
>         "${generation.kafka.source.servers}": "localhost:9094",
>         "${generation.kafka.sink.servers}": "localhost:9094"
>     },
>     "testing": {
>         "${generation.kafka.source.servers}": "10.111.135.233:9092,
> 10.111.130.11:9092,10.111.130.12:9092",
>         "${generation.kafka.sink.servers}": "10.111.135.233:9092,
> 10.111.130.11:9092,10.111.130.12:9092"
>     },
>     "production": {
>         "${generation.kafka.source.servers}": "10.111.203.9:9092,
> 10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,
> 10.111.204.164:9092,10.111.204.165:9092",
>         "${generation.kafka.sink.servers}": "10.111.209.219:9092,
> 10.111.209.220:9092,10.111.209.221:9092"
>     }
> }
>
> FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"
>
> source_ddl =
> read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}',
> config[palfish_env]['${generation.kafka.source.servers}'])
> sink_ddl =
> read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}',
> config[palfish_env]['${generation.kafka.sink.servers}'])
> transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')
>
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
> t_env =
> StreamTableEnvironment.create(stream_execution_environment=exec_env,
> environment_settings=env_settings)
>
> t_env.get_config().get_configuration().set_string("pipeline.jars",
> f"file://{FAT_JAR_PATH}")
> t_env.create_temporary_function("ParseContent", parse_content)
>
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> t_env.execute_sql(transform_dml).wait()
> ```
>
> See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.
>
> ```sql
> # source_ddl.sql
> CREATE TABLE kafka_source (
> `body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'data-report-stat-old-logtype7',
> 'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
> 'properties.group.id' = 'flink-featurepipelines',
> 'format' = 'json'
> )
>
> # transform_ddl.sql
> INSERT INTO kafka_sink
> WITH t1 AS (
> SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr'])
> content, body['log']['serverts'] server_ts
> FROM kafka_source
> ),
> t2 AS (
> SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
> FROM t1
> WHERE content['item_id'] IS NOT NULL
> AND content['tag'] = '点击帖子卡片'
> ),
> last_n AS (
> SELECT user_id, item_id, server_ts
> FROM (
> SELECT *,
> ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num
> FROM t2)
> WHERE row_num <= 5
> )
> SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime,
> LISTAGG(CAST(item_id AS STRING)) last_5_clicks
> FROM last_n
> GROUP BY user_id
>
> # sink_ddl.sql
> CREATE TABLE kafka_sink (
>     user_id BIGINT,
>     datetime TIMESTAMP(3),
>     last_5_clicks STRING,
>     PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
>     'connector' = 'upsert-kafka',
>     'topic' = 'aiinfra.fct.userfeature.0',
>     'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
>     'key.format' = 'json',
>     'value.format' = 'json'
> )
> ```
>
> I got the error when running the PyFlink program in my testing environment
> machine.
>
> ```
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readInt(DataInputStream.java:392)
> ~[?:1.8.0_261]
>     at
> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91)
> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>     at
> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87)
> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>     at
> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36)
> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>     at
> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124)
> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>     at
> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107)
> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>     at
> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46)
> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>     at
> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104)
> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>     at
> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>     at
> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84)
> ~[flink-python_2.11-1.12.0.jar:1.12.0]
> ```
>
> Here are the full logs, see
> https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.
>
> Any idea why the exception? Thanks.
>
> Yik San
>
>
>

Reply via email to