Hi Dan, > I'm guessing I violate the "The second operator needs to be single-input (i.e. no TwoInputOp nor union() before)" part. I think.you are right.
Do you want to remove shuffle of two inputs in your case? If yes, Flink provides support for multiple input operators since 1.11 version. I think it might satisfy your need. You could find more in [1]. However, at present, this function does not provide a complete interface of dataStream API. If users want to use it, they need to manually create multipleInputTransformation and multipleConnectedStreams. > MultipleInputTransformation<Long> transform = new > MultipleInputTransformation<>( > "My Operator", > new SumAllInputOperatorFactory(), > BasicTypeInfo.LONG_TYPE_INFO, > 1); > > env.addOperator(transform > .addInput(source1.getTransformation()) > .addInput(source2.getTransformation()) > .addInput(source3.getTransformation())); > new MultipleConnectedStreams(env) > .transform(transform) > .addSink(resultSink); > > I would invite @Piotr to double check this conclusion. He is more professional on this topic. @Piotr, Would you please check Dan's question? Please correct me if I'm wrong. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink?spm=a2c65.11461447.0.0.786d2323FtzWaR Best, JING ZHANG Dan Hill <quietgol...@gmail.com> 于2021年10月16日周六 上午6:28写道: > Thanks Thias and JING ZHANG! > > Here's a Google drive folder link > <https://drive.google.com/drive/folders/1chE0i3HyBWgREeRuM2qlGU8sK1jr4rLm?usp=sharing> > with the execution plan and two screenshots from the job graph. > > I'm guessing I violate the "The second operator needs to be single-input > (i.e. no TwoInputOp nor union() before)" part. > > After I do a transformation on a KeyedStream (so it goes back to a > SingleOutputStreamOperator), even if I do a simple map, it usually > disallows operator chaining. Even with reinterpretAsKeyedStream, it > doesn't work. > > > On Fri, Oct 15, 2021 at 12:34 AM JING ZHANG <beyond1...@gmail.com> wrote: > >> Hi Dan, >> Sorry for tipos, I meant to provide the code to reproduce the problem. >> If the current program is complex and secret, maybe you could try to >> simplify the code. >> Besides, Matthias's guess is very reasonable. Could you please whether is >> there network shuffle between your two operators. Were those two >> operators chained into one vertex? >> >> Best, >> JING ZHANG >> >> Schwalbe Matthias <matthias.schwa...@viseca.ch> 于2021年10月15日周五 下午2:57写道: >> >>> … didn’t mean to hit the send button so soon 😊 >>> >>> >>> >>> I guess we are getting closer to a solution >>> >>> >>> >>> >>> >>> Thias >>> >>> >>> >>> >>> >>> >>> >>> *From:* Schwalbe Matthias >>> *Sent:* Freitag, 15. Oktober 2021 08:49 >>> *To:* 'Dan Hill' <quietgol...@gmail.com>; user <user@flink.apache.org> >>> *Subject:* RE: Any issues with reinterpretAsKeyedStream when scaling >>> partitions? >>> >>> >>> >>> Hi Dan again 😊, >>> >>> >>> >>> I shed a second look … from what I see from your call stack I conclude >>> that indeed you have a network shuffle between your two operators, >>> >>> In which case reinterpretAsKeyedStream wouldn’t work >>> >>> >>> >>> ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277 >>> indicates that the two operators are not chained) >>> >>> >>> >>> >>> >>> … just as a double-check could you please share both your >>> >>> - Execution plan (call println(env.getExecutionPlan) right before >>> your call env.execute) (json), and >>> - Your job plan (screenshot from flink dashboard) >>> >>> >>> >>> There is a number of preconditions before two operators get chained, and >>> probably one of them fails (see [1]): >>> >>> - The two operators need to allow chaining the resp. other (see [2] >>> … chaining strategy) >>> - We need a ForwardPartitioner in between >>> - We need to be in streaming mode >>> - Both operators need the same parallelism >>> - Chaining needs to be enabled for the streaming environment >>> - The second operator needs to be single-input (i.e. no TwoInputOp >>> nor union() before) >>> >>> >>> >>> >>> >>> [1] >>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873 >>> >>> [2] >>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932 >>> >>> >>> >>> >>> >>> *From:* Dan Hill <quietgol...@gmail.com> >>> *Sent:* Donnerstag, 14. Oktober 2021 17:50 >>> *To:* user <user@flink.apache.org> >>> *Subject:* Any issues with reinterpretAsKeyedStream when scaling >>> partitions? >>> >>> >>> >>> I have a job that uses reinterpretAsKeyedStream across a simple map to >>> avoid a shuffle. When changing the number of partitions, I'm hitting an >>> issue with registerEventTimeTimer complaining that "key group from 110 to >>> 119 does not contain 186". I'm using Flink v1.12.3. >>> >>> >>> >>> Any thoughts on this? I don't know if there is a known issue >>> with reinterpretAsKeyedStream. >>> >>> >>> >>> Rough steps: >>> >>> 1. I have a raw input stream of View records. I keyBy the View using >>> Tuple2<Long, String>(platform_id, log_user_id). >>> >>> 2. I do a small transformation of View to a TinyView. I >>> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key. >>> The keys are the same. >>> >>> 3. I use the TinyView in a KeyedCoProcessFunction. >>> >>> >>> >>> When I savepoint and start again with a different number of partitions, >>> my KeyedCoProcessFunction hits an issue with registerEventTimeTimer and >>> complains that "key group from 110 to 119 does not contain 186". I >>> verified that the key does not change and that we use Tuple2 with >>> primitives Long and String. >>> >>> >>> >>> >>> >>> >>> >>> 2021-10-14 08:17:07 >>> >>> java.lang.IllegalArgumentException: view x insertion issue with >>> registerEventTimeTimer for key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174), >>> flat=platform_id: 120 >>> >>> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174" >>> >>> log_timestamp: 1634224329606 >>> >>> view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2" >>> >>> >>> >>> at >>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318) >>> >>> at >>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59) >>> >>> at >>> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36) >>> >>> at >>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78) >>> >>> at >>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199) >>> >>> at >>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164) >>> >>> at >>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277) >>> >>> at >>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) >>> >>> at >>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) >>> >>> at >>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >>> >>> at >>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) >>> >>> at >>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >>> >>> at >>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >>> >>> at java.lang.Thread.run(Thread.java:748) >>> >>> Caused by: java.lang.IllegalArgumentException: key group from 110 to 119 >>> does not contain 186 >>> >>> at >>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) >>> >>> at >>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191) >>> >>> at >>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186) >>> >>> at >>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179) >>> >>> at >>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114) >>> >>> at >>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233) >>> >>> at >>> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52) >>> >>> at >>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315) >>> >>> ... 17 more >>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und >>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die >>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, >>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und >>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir >>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie >>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung >>> dieser Informationen ist streng verboten. >>> >>> This message is intended only for the named recipient and may contain >>> confidential or privileged information. As the confidentiality of email >>> communication cannot be guaranteed, we do not accept any responsibility for >>> the confidentiality and the intactness of this message. If you have >>> received it in error, please advise the sender by return e-mail and delete >>> this message and any attachments. Any unauthorised use or dissemination of >>> this information is strictly prohibited. >>> >>