It's not the issue with Python-Java object conversion, you get a DataStream rather than SingleOutputStreamOperator underlying the Python DataStream wrapper after calling `to_data_stream`, and `setParallelism` is only available on SingleOutputStreamOperator. To work around this, change `set_parallelism` to your processing operator, e.g. `filtered_stream.map(...).set_parallelism`.
John Tipper <john_tip...@hotmail.com> 于2022年7月16日周六 17:07写道: > I've tried this and can see there appears to be a bigger problem with > PyFlink and a call to set_parallelism(): > > > events_table = table_env.from_path(MY_SOURCE_TABLE) > > filtered_table = events_table.filter( > col("event_type") == "event_of_interest" > ) > > filtered_stream = table_env.to_data_stream(filtered_table) > > > # fetch desired parallelism from config, don't hardcode > > filtered_stream.set_parallelism(int(table_env.get_config().get('my.custom.parallelism', > 1))) > > > table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_stream) > > # now execute SQL onMY_FILTERED_VIEW > table_env.execute_sql(...) > > > > I now get an error: > > An error occurred when calling o68.setParallelism(). Trace: > org.apache.flink.api.python.shaded.py4j.Py4JException: Method > setParallelism([class java.lang.Integer]) does not exist. > > > > Looks like Python is converting to the Integer object in Java and not the > int primitive. I actually see this if I just call set_parallelism(1) > without the call to get_config(). Is this a bug or is there a workaround? > > ------------------------------ > *From:* John Tipper > *Sent:* 15 July 2022 16:44 > *To:* user@flink.apache.org <user@flink.apache.org> > *Subject:* PyFlink and parallelism > > Hi all, > > I have a processing topology using PyFlink and SQL where there is data > skew: I'm splitting a stream of heterogenous data into separate streams > based on the type of data that's in it and some of these substreams have > very many more events than others and this is causing issues when > checkpointing (checkpoints are timing out). I'd like to increase > parallelism for these problematic streams, I'm just not sure how I do that > and target just those elements. Do I need to use the datastream API here? > What does this look like please? > > > I have a table defined and I duplicate a stream from that table, then > filter so that my substream has only the events I'm interested in: > > > events_table = table_env.from_path(MY_SOURCE_TABLE) > > filtered_table = events_table.filter( > col("event_type") == "event_of_interest" > ) > > table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_table) > > # now execute SQL on MY_FILTERED_VIEW > table_env.execute_sql(...) > > > > The default parallelism of the overall table env is 1. Is there a way to > increase the parallelism for just this stream? > > Many thanks, > > John >