[ 
https://issues.apache.org/jira/browse/FLINK-19255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17202786#comment-17202786
 ] 

Arvid Heise commented on FLINK-19255:
-------------------------------------

The new source interface has been release in 1.11 and is described in the 
[docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/sources.html].
 That information is valuable for users with custom source implementations as 
they can already port their sources in 1.11.

For the bundled sources: all implemented sources will be usable from 
{{DataStream}} and as [~pnowojski] said Kafka and File source implementation 
are the highest priority and are on track for release 1.12 planned for November 
2020.

> Add configuration to make AsyncWaitOperation Chainable
> ------------------------------------------------------
>
>                 Key: FLINK-19255
>                 URL: https://issues.apache.org/jira/browse/FLINK-19255
>             Project: Flink
>          Issue Type: Task
>          Components: API / Core
>    Affects Versions: 1.10.2, 1.11.2
>         Environment: Any flink job using Async IO post this PR: 
> [https://github.com/apache/flink/pull/11177/files#diff-00b95aec1fa804a531b553610ec74c27R117]
> (so I believe anything starting at either 1.9 or 1.10).
>  
>            Reporter: Kyle Bendickson
>            Priority: Major
>
> Currently, we no longer chain Async IO calls. Instead, anything using AsyncIO 
> starts the new head of an operator chain as a temporary workaround for this 
> issue: https://issues.apache.org/jira/browse/FLINK-13063
>  
> However, because this change can (and does in my customers' cases) have very 
> large impact on the job graph size, and because people were previously 
> accepting of their results, in the 1.10 release it was made so that 
> AsyncWaitOperator could be chained in this issue 
> https://issues.apache.org/jira/browse/FLINK-16219.
>  
> However, it's very complicated and not intuitive for users to call out to 
> operator factory methods. I have users who would very much like to not have 
> their AsyncIO calls generate a new chain, as it's ballooned the number of 
> state stores they have and they were accepting of their previous results. The 
> only exmaple I could find was in the tests, and its rather convoluted.
>  
> My proposal would be to add that config check just before the line of code in 
> AsyncWaitOperator.java that would not add the following line, which is 
> currently hardcoded into the operator and what requires one to use the 
> operator factory:
> {noformat}
> setChainingStrategy(ChainingStrategy.HEAD){noformat}
>   
> Given that this is considered potentially unsafe / legacy behavior, I would 
> suggest that we add a config, something that explicitly calls this out as 
> unsafe / legacy, so that users do not have to go through the unintuitive 
> process of using operator factories but that new users know not to use this 
> option or to use it at their own risk. We could also document that it is not 
> necessarily going to be supported in the future if need be.
>  
> My suggestion for config names that would avoid that setChainingStrategy line 
> include
> {noformat}
> taskmanager.async-io-operator.legacy-unsafe-chaining-strategy{noformat}
> which specifically calls this behavior out as legacy and unsafe.
>  
> Another possible name could be
> {noformat}
> pipeline.operator-chaining.async-io.legacy-inconsistent-chaining-strategy-always{noformat}
> (which would be more in line with the existing config of 
> pipeline.operator-chaining).
>  
>  
> Given that it is possible to stop operator chaining, it's just very 
> unintuitive and requires using operator factories, I think that this 
> configuration would be a good addition. I would be happy to submit a PR, with 
> tests, and updated documentation, so that power users who are looking to do 
> this could enable / disable this behavior without having to change their code 
> much.
>  
> I recognize that this might be an odd request as this has been deemed unsafe, 
> but this change has made it very difficult for some of my users to use 
> rocksdb, namely those with very large state that previously made very liberal 
> use of Async IO (especially for things like analytics events which can be 
> sent on a best effort basis) and who therefore have a very large job graph 
> after this change.
>  
> If anybody has any better suggestions for names, I'd be open to them. And 
> then as mentioned, I'd be happy to submit a PR with tests etc.
>  
> For reference, here are the tests where I found the ability to use the 
> operator factory and here is the utility function which is needed to create a 
> chained async io operator vertex. Note that this utility function is in the 
> test and not part of the public facing API. 
> [https://github.com/apache/flink/blob/3a04e179e09224b09c4ee656d31558844da83a26/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java#L880-L912]
> If there is a simpler way to handle this, I'd be happy to hear it. Otherwise, 
> since this behavior is technically already specifically enabled (as called 
> out in the changelog from Flink 1.11), I think it makes sense to add a config 
> and either document that its legacy behavior, unsafe (or inconsistent, up to 
> you), and that it could go away at any time.
>  
> But it seems unnecessary to require users to go through so many extra hoops 
> in the code, especially for users who share operators amongst different jobs 
> which might be configured to use different state backends. Not to mention 
> that some of these users want the legacy behavior and others would prefer to 
> play it safe and accept the additional shuffle, so a code fix is not always 
> feasible when code is shared, but the enabling / disabling of a cluster level 
> config would still allow for shared code.
>  
> I'd be happy to submit a patch once this issue is discussed.
>  
> Thank you,
> Kyle B. - Data Services @ Tinder 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to