[ https://issues.apache.org/jira/browse/FLINK-19255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17202772#comment-17202772 ]
Piotr Nowojski commented on FLINK-19255: ---------------------------------------- Yes, Kafka is among the first sources we want to port. It should make it to 1.12 > Add configuration to make AsyncWaitOperation Chainable > ------------------------------------------------------ > > Key: FLINK-19255 > URL: https://issues.apache.org/jira/browse/FLINK-19255 > Project: Flink > Issue Type: Task > Components: API / Core > Affects Versions: 1.10.2, 1.11.2 > Environment: Any flink job using Async IO post this PR: > [https://github.com/apache/flink/pull/11177/files#diff-00b95aec1fa804a531b553610ec74c27R117] > (so I believe anything starting at either 1.9 or 1.10). > > Reporter: Kyle Bendickson > Priority: Major > > Currently, we no longer chain Async IO calls. Instead, anything using AsyncIO > starts the new head of an operator chain as a temporary workaround for this > issue: https://issues.apache.org/jira/browse/FLINK-13063 > > However, because this change can (and does in my customers' cases) have very > large impact on the job graph size, and because people were previously > accepting of their results, in the 1.10 release it was made so that > AsyncWaitOperator could be chained in this issue > https://issues.apache.org/jira/browse/FLINK-16219. > > However, it's very complicated and not intuitive for users to call out to > operator factory methods. I have users who would very much like to not have > their AsyncIO calls generate a new chain, as it's ballooned the number of > state stores they have and they were accepting of their previous results. The > only exmaple I could find was in the tests, and its rather convoluted. > > My proposal would be to add that config check just before the line of code in > AsyncWaitOperator.java that would not add the following line, which is > currently hardcoded into the operator and what requires one to use the > operator factory: > {noformat} > setChainingStrategy(ChainingStrategy.HEAD){noformat} > > Given that this is considered potentially unsafe / legacy behavior, I would > suggest that we add a config, something that explicitly calls this out as > unsafe / legacy, so that users do not have to go through the unintuitive > process of using operator factories but that new users know not to use this > option or to use it at their own risk. We could also document that it is not > necessarily going to be supported in the future if need be. > > My suggestion for config names that would avoid that setChainingStrategy line > include > {noformat} > taskmanager.async-io-operator.legacy-unsafe-chaining-strategy{noformat} > which specifically calls this behavior out as legacy and unsafe. > > Another possible name could be > {noformat} > pipeline.operator-chaining.async-io.legacy-inconsistent-chaining-strategy-always{noformat} > (which would be more in line with the existing config of > pipeline.operator-chaining). > > > Given that it is possible to stop operator chaining, it's just very > unintuitive and requires using operator factories, I think that this > configuration would be a good addition. I would be happy to submit a PR, with > tests, and updated documentation, so that power users who are looking to do > this could enable / disable this behavior without having to change their code > much. > > I recognize that this might be an odd request as this has been deemed unsafe, > but this change has made it very difficult for some of my users to use > rocksdb, namely those with very large state that previously made very liberal > use of Async IO (especially for things like analytics events which can be > sent on a best effort basis) and who therefore have a very large job graph > after this change. > > If anybody has any better suggestions for names, I'd be open to them. And > then as mentioned, I'd be happy to submit a PR with tests etc. > > For reference, here are the tests where I found the ability to use the > operator factory and here is the utility function which is needed to create a > chained async io operator vertex. Note that this utility function is in the > test and not part of the public facing API. > [https://github.com/apache/flink/blob/3a04e179e09224b09c4ee656d31558844da83a26/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java#L880-L912] > If there is a simpler way to handle this, I'd be happy to hear it. Otherwise, > since this behavior is technically already specifically enabled (as called > out in the changelog from Flink 1.11), I think it makes sense to add a config > and either document that its legacy behavior, unsafe (or inconsistent, up to > you), and that it could go away at any time. > > But it seems unnecessary to require users to go through so many extra hoops > in the code, especially for users who share operators amongst different jobs > which might be configured to use different state backends. Not to mention > that some of these users want the legacy behavior and others would prefer to > play it safe and accept the additional shuffle, so a code fix is not always > feasible when code is shared, but the enabling / disabling of a cluster level > config would still allow for shared code. > > I'd be happy to submit a patch once this issue is discussed. > > Thank you, > Kyle B. - Data Services @ Tinder -- This message was sent by Atlassian Jira (v8.3.4#803005)