It's not the issue with Python-Java object conversion, you get a DataStream
rather than SingleOutputStreamOperator underlying the Python DataStream
wrapper after calling `to_data_stream`, and `setParallelism` is only
available on SingleOutputStreamOperator. To work around this, change
`set_parallelism` to your processing operator, e.g.
`filtered_stream.map(...).set_parallelism`.

John Tipper <john_tip...@hotmail.com> 于2022年7月16日周六 17:07写道:

> I've tried this and can see there appears to be a bigger problem with
> PyFlink and a call to set_parallelism()​:
>
>
> events_table = table_env.from_path(MY_SOURCE_TABLE)
>
> filtered_table = events_table.filter(
>   col("event_type") == "event_of_interest"
> )
>
> filtered_stream = table_env.to_data_stream(filtered_table)
>
>
>     # fetch desired parallelism from config, don't hardcode
>
> filtered_stream.set_parallelism(int(table_env.get_config().get('my.custom.parallelism',
> 1)))
>
>
> table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_stream)
>
> # now execute SQL onMY_FILTERED_VIEW
> table_env.execute_sql(...)
>
>
>
> I now get an error:
>
> An error occurred when calling o68.setParallelism(). Trace:
> org.apache.flink.api.python.shaded.py4j.Py4JException: Method
> setParallelism([class java.lang.Integer]) does not exist.
>
>
>
> Looks like Python is converting to the Integer object in Java and not the
> int primitive.  I actually see this if I just call set_parallelism(1)​
> without the call to get_config()​. Is this a bug or is there a workaround?
>
> ------------------------------
> *From:* John Tipper
> *Sent:* 15 July 2022 16:44
> *To:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* PyFlink and parallelism
>
> Hi all,
>
> I have a processing topology using PyFlink and SQL where there is data
> skew: I'm splitting a stream of heterogenous data into separate streams
> based on the type of data that's in it and some of these substreams have
> very many more events than others and this is causing issues when
> checkpointing (checkpoints are timing out). I'd like to increase
> parallelism for these problematic streams, I'm just not sure how I do that
> and target just those elements. Do I need to use the datastream API here?
> What does this look like please?
>
>
> I have a table defined and I duplicate a stream from that table, then
> filter so that my substream has only the events I'm interested in:
>
>
> events_table = table_env.from_path(MY_SOURCE_TABLE)
>
> filtered_table = events_table.filter(
>     col("event_type") == "event_of_interest"
> )
>
> table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_table)
>
> # now execute SQL on MY_FILTERED_VIEW
> table_env.execute_sql(...)
>
>
>
> The default parallelism of the overall table env is 1. Is there a way to
> increase the parallelism for just this stream?
>
> Many thanks,
>
> John
>

Reply via email to