Hi Dian, The PyFlink version is 1.12.0 and the Flink version in the cluster nodes is also 1.12.0
$ which flink /data/apache/flink/flink-1.12.0/bin/flink Best, Yik San On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <dian0511...@gmail.com> wrote: > Hi, > > What’s the Flink version in the cluster nodes? It should matches the > PyFlink version. > > Regards, > Dian > > 2021年3月18日 下午5:01,Yik San Chan <evan.chanyik...@gmail.com> 写道: > > This question is cross-posted on StackOverflow > https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint > > I have a PyFlink job that reads from Kafka source, transform, and write to > Kafka sink. This is a `tree` view of my working directory. > > ``` > > tree > . > ├── deps > │ └── flink-sql-connector-kafka_2.12-1.12.0.jar > ├── flink_run.sh > ├── main.py > ├── pyflink1.12.0.zip > └── tasks > └── user_last_n_clicks > ├── sink_ddl.sql > ├── source_ddl.sql > └── transform_dml.sql > ``` > > This is the `flink_run.sh`: > > ``` > flink run \ > --yarnname test-pyflink \ > -m yarn-cluster \ > -yD yarn.application.queue=tech_platform \ > -pyarch pyflink1.12.0.zip \ > -pyexec /data/software/pyflink1.12.0/bin/python \ > -py main.py testing user_last_n_clicks > ``` > > This is the `main.py`. The key logic is in: > - `parse_content` udf. > - load sql files from tasks subfolder, and execute_sql > > ```python > import os > from sys import argv > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, EnvironmentSettings, > DataTypes > from pyflink.table.udf import udf > > def read_file_content(filepath): > with open(filepath) as f: > return f.read() > > @udf(input_types=[DataTypes.STRING()], > result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) > def parse_content(content_str): > import json > res = {} > content = json.loads(content_str) > if 'postId' in content: > res['item_id'] = content['postId'] > if 'lid' in content: > res['item_id'] = content['lid'] > if 'param' in content and 'tag' in content['param']: > res['tag'] = content['param']['tag'] > return res > > CWD = os.getcwd() > _, palfish_env, task = argv > > VALID_PALFISH_ENVS = ['development', 'testing', 'production'] > if palfish_env not in VALID_PALFISH_ENVS: > raise Exception(f"{palfish_env} is not a valid env, should be one of > [{', '.join(VALID_PALFISH_ENVS)}].") > > VALID_TASKS = os.listdir(f"{CWD}/tasks") > if task not in VALID_TASKS: > raise Exception(f"{task} is not a valid task, should be one of [{', > '.join(VALID_TASKS)}].") > > config = { > "development": { > "${generation.kafka.source.servers}": "localhost:9094", > "${generation.kafka.sink.servers}": "localhost:9094" > }, > "testing": { > "${generation.kafka.source.servers}": "10.111.135.233:9092, > 10.111.130.11:9092,10.111.130.12:9092", > "${generation.kafka.sink.servers}": "10.111.135.233:9092, > 10.111.130.11:9092,10.111.130.12:9092" > }, > "production": { > "${generation.kafka.source.servers}": "10.111.203.9:9092, > 10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092, > 10.111.204.164:9092,10.111.204.165:9092", > "${generation.kafka.sink.servers}": "10.111.209.219:9092, > 10.111.209.220:9092,10.111.209.221:9092" > } > } > > FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar" > > source_ddl = > read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', > config[palfish_env]['${generation.kafka.source.servers}']) > sink_ddl = > read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', > config[palfish_env]['${generation.kafka.sink.servers}']) > transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql') > > exec_env = StreamExecutionEnvironment.get_execution_environment() > env_settings = EnvironmentSettings.Builder().use_blink_planner().build() > t_env = > StreamTableEnvironment.create(stream_execution_environment=exec_env, > environment_settings=env_settings) > > t_env.get_config().get_configuration().set_string("pipeline.jars", > f"file://{FAT_JAR_PATH}") > t_env.create_temporary_function("ParseContent", parse_content) > > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > t_env.execute_sql(transform_dml).wait() > ``` > > See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`. > > ```sql > # source_ddl.sql > CREATE TABLE kafka_source ( > `body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>> > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'data-report-stat-old-logtype7', > 'properties.bootstrap.servers' = '${generation.kafka.source.servers}', > 'properties.group.id' = 'flink-featurepipelines', > 'format' = 'json' > ) > > # transform_ddl.sql > INSERT INTO kafka_sink > WITH t1 AS ( > SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) > content, body['log']['serverts'] server_ts > FROM kafka_source > ), > t2 AS ( > SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts > FROM t1 > WHERE content['item_id'] IS NOT NULL > AND content['tag'] = '点击帖子卡片' > ), > last_n AS ( > SELECT user_id, item_id, server_ts > FROM ( > SELECT *, > ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num > FROM t2) > WHERE row_num <= 5 > ) > SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, > LISTAGG(CAST(item_id AS STRING)) last_5_clicks > FROM last_n > GROUP BY user_id > > # sink_ddl.sql > CREATE TABLE kafka_sink ( > user_id BIGINT, > datetime TIMESTAMP(3), > last_5_clicks STRING, > PRIMARY KEY (user_id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'aiinfra.fct.userfeature.0', > 'properties.bootstrap.servers' = '${generation.kafka.sink.servers}', > 'key.format' = 'json', > 'value.format' = 'json' > ) > ``` > > I got the error when running the PyFlink program in my testing environment > machine. > > ``` > Caused by: java.io.EOFException > at java.io.DataInputStream.readInt(DataInputStream.java:392) > ~[?:1.8.0_261] > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) > ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) > ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) > ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) > ~[flink-python_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) > ~[flink-python_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) > ~[flink-python_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) > ~[flink-python_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) > ~[flink-python_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) > ~[flink-python_2.11-1.12.0.jar:1.12.0] > ``` > > Here are the full logs, see > https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc. > > Any idea why the exception? Thanks. > > Yik San > > >