I've tried this and can see there appears to be a bigger problem with PyFlink and a call to set_parallelism():
events_table = table_env.from_path(MY_SOURCE_TABLE) filtered_table = events_table.filter( col("event_type") == "event_of_interest" ) filtered_stream = table_env.to_data_stream(filtered_table) # fetch desired parallelism from config, don't hardcode filtered_stream.set_parallelism(int(table_env.get_config().get('my.custom.parallelism', 1))) table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_stream) # now execute SQL onMY_FILTERED_VIEW table_env.execute_sql(...) I now get an error: An error occurred when calling o68.setParallelism(). Trace: org.apache.flink.api.python.shaded.py4j.Py4JException: Method setParallelism([class java.lang.Integer]) does not exist. Looks like Python is converting to the Integer object in Java and not the int primitive. I actually see this if I just call set_parallelism(1) without the call to get_config(). Is this a bug or is there a workaround? ________________________________ From: John Tipper Sent: 15 July 2022 16:44 To: user@flink.apache.org <user@flink.apache.org> Subject: PyFlink and parallelism Hi all, I have a processing topology using PyFlink and SQL where there is data skew: I'm splitting a stream of heterogenous data into separate streams based on the type of data that's in it and some of these substreams have very many more events than others and this is causing issues when checkpointing (checkpoints are timing out). I'd like to increase parallelism for these problematic streams, I'm just not sure how I do that and target just those elements. Do I need to use the datastream API here? What does this look like please? I have a table defined and I duplicate a stream from that table, then filter so that my substream has only the events I'm interested in: events_table = table_env.from_path(MY_SOURCE_TABLE) filtered_table = events_table.filter( col("event_type") == "event_of_interest" ) table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_table) # now execute SQL on MY_FILTERED_VIEW table_env.execute_sql(...) The default parallelism of the overall table env is 1. Is there a way to increase the parallelism for just this stream? Many thanks, John