Hi John, You could try to use `filtered_stream._j_data_stream.getTransformation().setParallelism(int(table_env.get_config().get('my.custom.parallelism', 1)))`. Usually you don't need to do this, however, when converting Table to DataStream, it returns a Java DataStream object which has no setParallelism method.
Besides, for your use case, I guess you want to set parallelism for the operator after filtered_stream, e.g. ``` filtered_stream.map(xxx).set_parallesim(xxx) ``` Regards, Dian On Sat, Jul 16, 2022 at 5:07 PM John Tipper <john_tip...@hotmail.com> wrote: > I've tried this and can see there appears to be a bigger problem with > PyFlink and a call to set_parallelism(): > > > events_table = table_env.from_path(MY_SOURCE_TABLE) > > filtered_table = events_table.filter( > col("event_type") == "event_of_interest" > ) > > filtered_stream = table_env.to_data_stream(filtered_table) > > > # fetch desired parallelism from config, don't hardcode > > filtered_stream.set_parallelism(int(table_env.get_config().get('my.custom.parallelism', > 1))) > > > table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_stream) > > # now execute SQL onMY_FILTERED_VIEW > table_env.execute_sql(...) > > > > I now get an error: > > An error occurred when calling o68.setParallelism(). Trace: > org.apache.flink.api.python.shaded.py4j.Py4JException: Method > setParallelism([class java.lang.Integer]) does not exist. > > > > Looks like Python is converting to the Integer object in Java and not the > int primitive. I actually see this if I just call set_parallelism(1) > without the call to get_config(). Is this a bug or is there a workaround? > > ------------------------------ > *From:* John Tipper > *Sent:* 15 July 2022 16:44 > *To:* user@flink.apache.org <user@flink.apache.org> > *Subject:* PyFlink and parallelism > > Hi all, > > I have a processing topology using PyFlink and SQL where there is data > skew: I'm splitting a stream of heterogenous data into separate streams > based on the type of data that's in it and some of these substreams have > very many more events than others and this is causing issues when > checkpointing (checkpoints are timing out). I'd like to increase > parallelism for these problematic streams, I'm just not sure how I do that > and target just those elements. Do I need to use the datastream API here? > What does this look like please? > > > I have a table defined and I duplicate a stream from that table, then > filter so that my substream has only the events I'm interested in: > > > events_table = table_env.from_path(MY_SOURCE_TABLE) > > filtered_table = events_table.filter( > col("event_type") == "event_of_interest" > ) > > table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_table) > > # now execute SQL on MY_FILTERED_VIEW > table_env.execute_sql(...) > > > > The default parallelism of the overall table env is 1. Is there a way to > increase the parallelism for just this stream? > > Many thanks, > > John >