Hi Dian, It is a good catch, though after changing to use flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same error.
Best, Yik San On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <dian0511...@gmail.com> wrote: > > I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does > the jar files in the cluster nodes are also built with Scala 2.12? PyFlink > package bundles jar files with Scala 2.11 by default. I’m still not sure if > it’s related to this issue. However, I think this is problematic. Could you > make sure that they are consistent? > > > 2021年3月19日 上午10:40,Yik San Chan <evan.chanyik...@gmail.com> 写道: > > Hi Dian, > > The PyFlink version is 1.12.0 and the Flink version in the cluster nodes > is also 1.12.0 > > $ which flink > /data/apache/flink/flink-1.12.0/bin/flink > > Best, > Yik San > > On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <dian0511...@gmail.com> wrote: > >> Hi, >> >> What’s the Flink version in the cluster nodes? It should matches the >> PyFlink version. >> >> Regards, >> Dian >> >> 2021年3月18日 下午5:01,Yik San Chan <evan.chanyik...@gmail.com> 写道: >> >> This question is cross-posted on StackOverflow >> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint >> >> I have a PyFlink job that reads from Kafka source, transform, and write >> to Kafka sink. This is a `tree` view of my working directory. >> >> ``` >> > tree >> . >> ├── deps >> │ └── flink-sql-connector-kafka_2.12-1.12.0.jar >> ├── flink_run.sh >> ├── main.py >> ├── pyflink1.12.0.zip >> └── tasks >> └── user_last_n_clicks >> ├── sink_ddl.sql >> ├── source_ddl.sql >> └── transform_dml.sql >> ``` >> >> This is the `flink_run.sh`: >> >> ``` >> flink run \ >> --yarnname test-pyflink \ >> -m yarn-cluster \ >> -yD yarn.application.queue=tech_platform \ >> -pyarch pyflink1.12.0.zip \ >> -pyexec /data/software/pyflink1.12.0/bin/python \ >> -py main.py testing user_last_n_clicks >> ``` >> >> This is the `main.py`. The key logic is in: >> - `parse_content` udf. >> - load sql files from tasks subfolder, and execute_sql >> >> ```python >> import os >> from sys import argv >> from pyflink.datastream import StreamExecutionEnvironment >> from pyflink.table import StreamTableEnvironment, EnvironmentSettings, >> DataTypes >> from pyflink.table.udf import udf >> >> def read_file_content(filepath): >> with open(filepath) as f: >> return f.read() >> >> @udf(input_types=[DataTypes.STRING()], >> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) >> def parse_content(content_str): >> import json >> res = {} >> content = json.loads(content_str) >> if 'postId' in content: >> res['item_id'] = content['postId'] >> if 'lid' in content: >> res['item_id'] = content['lid'] >> if 'param' in content and 'tag' in content['param']: >> res['tag'] = content['param']['tag'] >> return res >> >> CWD = os.getcwd() >> _, palfish_env, task = argv >> >> VALID_PALFISH_ENVS = ['development', 'testing', 'production'] >> if palfish_env not in VALID_PALFISH_ENVS: >> raise Exception(f"{palfish_env} is not a valid env, should be one of >> [{', '.join(VALID_PALFISH_ENVS)}].") >> >> VALID_TASKS = os.listdir(f"{CWD}/tasks") >> if task not in VALID_TASKS: >> raise Exception(f"{task} is not a valid task, should be one of [{', >> '.join(VALID_TASKS)}].") >> >> config = { >> "development": { >> "${generation.kafka.source.servers}": "localhost:9094", >> "${generation.kafka.sink.servers}": "localhost:9094" >> }, >> "testing": { >> "${generation.kafka.source.servers}": "10.111.135.233:9092, >> 10.111.130.11:9092,10.111.130.12:9092", >> "${generation.kafka.sink.servers}": "10.111.135.233:9092, >> 10.111.130.11:9092,10.111.130.12:9092" >> }, >> "production": { >> "${generation.kafka.source.servers}": "10.111.203.9:9092, >> 10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092, >> 10.111.204.164:9092,10.111.204.165:9092", >> "${generation.kafka.sink.servers}": "10.111.209.219:9092, >> 10.111.209.220:9092,10.111.209.221:9092" >> } >> } >> >> FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar" >> >> source_ddl = >> read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', >> config[palfish_env]['${generation.kafka.source.servers}']) >> sink_ddl = >> read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', >> config[palfish_env]['${generation.kafka.sink.servers}']) >> transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql') >> >> exec_env = StreamExecutionEnvironment.get_execution_environment() >> env_settings = EnvironmentSettings.Builder().use_blink_planner().build() >> t_env = >> StreamTableEnvironment.create(stream_execution_environment=exec_env, >> environment_settings=env_settings) >> >> t_env.get_config().get_configuration().set_string("pipeline.jars", >> f"file://{FAT_JAR_PATH}") >> t_env.create_temporary_function("ParseContent", parse_content) >> >> t_env.execute_sql(source_ddl) >> t_env.execute_sql(sink_ddl) >> t_env.execute_sql(transform_dml).wait() >> ``` >> >> See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`. >> >> ```sql >> # source_ddl.sql >> CREATE TABLE kafka_source ( >> `body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>> >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'data-report-stat-old-logtype7', >> 'properties.bootstrap.servers' = '${generation.kafka.source.servers}', >> 'properties.group.id' = 'flink-featurepipelines', >> 'format' = 'json' >> ) >> >> # transform_ddl.sql >> INSERT INTO kafka_sink >> WITH t1 AS ( >> SELECT body['log']['uid'] user_id, >> ParseContent(body['log']['contentstr']) content, body['log']['serverts'] >> server_ts >> FROM kafka_source >> ), >> t2 AS ( >> SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts >> FROM t1 >> WHERE content['item_id'] IS NOT NULL >> AND content['tag'] = '点击帖子卡片' >> ), >> last_n AS ( >> SELECT user_id, item_id, server_ts >> FROM ( >> SELECT *, >> ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as >> row_num >> FROM t2) >> WHERE row_num <= 5 >> ) >> SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, >> LISTAGG(CAST(item_id AS STRING)) last_5_clicks >> FROM last_n >> GROUP BY user_id >> >> # sink_ddl.sql >> CREATE TABLE kafka_sink ( >> user_id BIGINT, >> datetime TIMESTAMP(3), >> last_5_clicks STRING, >> PRIMARY KEY (user_id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'upsert-kafka', >> 'topic' = 'aiinfra.fct.userfeature.0', >> 'properties.bootstrap.servers' = '${generation.kafka.sink.servers}', >> 'key.format' = 'json', >> 'value.format' = 'json' >> ) >> ``` >> >> I got the error when running the PyFlink program in my testing >> environment machine. >> >> ``` >> Caused by: java.io.EOFException >> at java.io.DataInputStream.readInt(DataInputStream.java:392) >> ~[?:1.8.0_261] >> at >> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) >> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) >> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) >> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) >> ~[flink-python_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) >> ~[flink-python_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) >> ~[flink-python_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) >> ~[flink-python_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) >> ~[flink-python_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) >> ~[flink-python_2.11-1.12.0.jar:1.12.0] >> ``` >> >> Here are the full logs, see >> https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc. >> >> Any idea why the exception? Thanks. >> >> Yik San >> >> >> >