I've tried this and can see there appears to be a bigger problem with PyFlink 
and a call to set_parallelism()​:


events_table = table_env.from_path(MY_SOURCE_TABLE)

filtered_table = events_table.filter(
  col("event_type") == "event_of_interest"
)

filtered_stream = table_env.to_data_stream(filtered_table)

    # fetch desired parallelism from config, don't hardcode
filtered_stream.set_parallelism(int(table_env.get_config().get('my.custom.parallelism',
 1)))

table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_stream)

# now execute SQL onMY_FILTERED_VIEW
table_env.execute_sql(...)


I now get an error:

An error occurred when calling o68.setParallelism(). Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method 
setParallelism([class java.lang.Integer]) does not exist.


Looks like Python is converting to the Integer object in Java and not the int 
primitive.  I actually see this if I just call set_parallelism(1)​ without the 
call to get_config()​. Is this a bug or is there a workaround?

________________________________
From: John Tipper
Sent: 15 July 2022 16:44
To: user@flink.apache.org <user@flink.apache.org>
Subject: PyFlink and parallelism

Hi all,

I have a processing topology using PyFlink and SQL where there is data skew: 
I'm splitting a stream of heterogenous data into separate streams based on the 
type of data that's in it and some of these substreams have very many more 
events than others and this is causing issues when checkpointing (checkpoints 
are timing out). I'd like to increase parallelism for these problematic 
streams, I'm just not sure how I do that and target just those elements. Do I 
need to use the datastream API here? What does this look like please?


I have a table defined and I duplicate a stream from that table, then filter so 
that my substream has only the events I'm interested in:


events_table = table_env.from_path(MY_SOURCE_TABLE)

filtered_table = events_table.filter(
    col("event_type") == "event_of_interest"
)

table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_table)

# now execute SQL on MY_FILTERED_VIEW
table_env.execute_sql(...)


The default parallelism of the overall table env is 1. Is there a way to 
increase the parallelism for just this stream?

Many thanks,

John

Reply via email to