Interesting. Thanks, JING ZHANG! On Mon, Oct 18, 2021 at 12:16 AM JING ZHANG <beyond1...@gmail.com> wrote:
> Hi Dan, > > I'm guessing I violate the "The second operator needs to be single-input > (i.e. no TwoInputOp nor union() before)" part. > I think.you are right. > > Do you want to remove shuffle of two inputs in your case? If yes, Flink > provides > support for multiple input operators since 1.11 version. I think it might > satisfy your need. You could find more in [1]. > However, at present, this function does not provide a complete interface > of dataStream API. If users want to use it, they need to manually create > multipleInputTransformation and multipleConnectedStreams. > >> MultipleInputTransformation<Long> transform = new >> MultipleInputTransformation<>( >> "My Operator", >> new SumAllInputOperatorFactory(), >> BasicTypeInfo.LONG_TYPE_INFO, >> 1); >> >> env.addOperator(transform >> .addInput(source1.getTransformation()) >> .addInput(source2.getTransformation()) >> .addInput(source3.getTransformation())); >> new MultipleConnectedStreams(env) >> .transform(transform) >> .addSink(resultSink); >> >> > I would invite @Piotr to double check this conclusion. He is more > professional on this topic. > > @Piotr, Would you please check Dan's question? Please correct me if I'm > wrong. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink?spm=a2c65.11461447.0.0.786d2323FtzWaR > > Best, > JING ZHANG > > Dan Hill <quietgol...@gmail.com> 于2021年10月16日周六 上午6:28写道: > >> Thanks Thias and JING ZHANG! >> >> Here's a Google drive folder link >> <https://drive.google.com/drive/folders/1chE0i3HyBWgREeRuM2qlGU8sK1jr4rLm?usp=sharing> >> with the execution plan and two screenshots from the job graph. >> >> I'm guessing I violate the "The second operator needs to be single-input >> (i.e. no TwoInputOp nor union() before)" part. >> >> After I do a transformation on a KeyedStream (so it goes back to a >> SingleOutputStreamOperator), even if I do a simple map, it usually >> disallows operator chaining. Even with reinterpretAsKeyedStream, it >> doesn't work. >> >> >> On Fri, Oct 15, 2021 at 12:34 AM JING ZHANG <beyond1...@gmail.com> wrote: >> >>> Hi Dan, >>> Sorry for tipos, I meant to provide the code to reproduce the problem. >>> If the current program is complex and secret, maybe you could try to >>> simplify the code. >>> Besides, Matthias's guess is very reasonable. Could you please whether >>> is there network shuffle between your two operators. Were those two >>> operators chained into one vertex? >>> >>> Best, >>> JING ZHANG >>> >>> Schwalbe Matthias <matthias.schwa...@viseca.ch> 于2021年10月15日周五 下午2:57写道: >>> >>>> … didn’t mean to hit the send button so soon 😊 >>>> >>>> >>>> >>>> I guess we are getting closer to a solution >>>> >>>> >>>> >>>> >>>> >>>> Thias >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *From:* Schwalbe Matthias >>>> *Sent:* Freitag, 15. Oktober 2021 08:49 >>>> *To:* 'Dan Hill' <quietgol...@gmail.com>; user <user@flink.apache.org> >>>> *Subject:* RE: Any issues with reinterpretAsKeyedStream when scaling >>>> partitions? >>>> >>>> >>>> >>>> Hi Dan again 😊, >>>> >>>> >>>> >>>> I shed a second look … from what I see from your call stack I conclude >>>> that indeed you have a network shuffle between your two operators, >>>> >>>> In which case reinterpretAsKeyedStream wouldn’t work >>>> >>>> >>>> >>>> ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277 >>>> indicates that the two operators are not chained) >>>> >>>> >>>> >>>> >>>> >>>> … just as a double-check could you please share both your >>>> >>>> - Execution plan (call println(env.getExecutionPlan) right before >>>> your call env.execute) (json), and >>>> - Your job plan (screenshot from flink dashboard) >>>> >>>> >>>> >>>> There is a number of preconditions before two operators get chained, >>>> and probably one of them fails (see [1]): >>>> >>>> - The two operators need to allow chaining the resp. other (see [2] >>>> … chaining strategy) >>>> - We need a ForwardPartitioner in between >>>> - We need to be in streaming mode >>>> - Both operators need the same parallelism >>>> - Chaining needs to be enabled for the streaming environment >>>> - The second operator needs to be single-input (i.e. no TwoInputOp >>>> nor union() before) >>>> >>>> >>>> >>>> >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873 >>>> >>>> [2] >>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932 >>>> >>>> >>>> >>>> >>>> >>>> *From:* Dan Hill <quietgol...@gmail.com> >>>> *Sent:* Donnerstag, 14. Oktober 2021 17:50 >>>> *To:* user <user@flink.apache.org> >>>> *Subject:* Any issues with reinterpretAsKeyedStream when scaling >>>> partitions? >>>> >>>> >>>> >>>> I have a job that uses reinterpretAsKeyedStream across a simple map to >>>> avoid a shuffle. When changing the number of partitions, I'm hitting an >>>> issue with registerEventTimeTimer complaining that "key group from 110 to >>>> 119 does not contain 186". I'm using Flink v1.12.3. >>>> >>>> >>>> >>>> Any thoughts on this? I don't know if there is a known issue >>>> with reinterpretAsKeyedStream. >>>> >>>> >>>> >>>> Rough steps: >>>> >>>> 1. I have a raw input stream of View records. I keyBy the View using >>>> Tuple2<Long, String>(platform_id, log_user_id). >>>> >>>> 2. I do a small transformation of View to a TinyView. I >>>> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key. >>>> The keys are the same. >>>> >>>> 3. I use the TinyView in a KeyedCoProcessFunction. >>>> >>>> >>>> >>>> When I savepoint and start again with a different number of partitions, >>>> my KeyedCoProcessFunction hits an issue with registerEventTimeTimer and >>>> complains that "key group from 110 to 119 does not contain 186". I >>>> verified that the key does not change and that we use Tuple2 with >>>> primitives Long and String. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> 2021-10-14 08:17:07 >>>> >>>> java.lang.IllegalArgumentException: view x insertion issue with >>>> registerEventTimeTimer for key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174), >>>> flat=platform_id: 120 >>>> >>>> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174" >>>> >>>> log_timestamp: 1634224329606 >>>> >>>> view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2" >>>> >>>> >>>> >>>> at >>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318) >>>> >>>> at >>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59) >>>> >>>> at >>>> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36) >>>> >>>> at >>>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) >>>> >>>> at >>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >>>> >>>> at >>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >>>> >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> Caused by: java.lang.IllegalArgumentException: key group from 110 to >>>> 119 does not contain 186 >>>> >>>> at >>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) >>>> >>>> at >>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191) >>>> >>>> at >>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186) >>>> >>>> at >>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179) >>>> >>>> at >>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114) >>>> >>>> at >>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233) >>>> >>>> at >>>> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52) >>>> >>>> at >>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315) >>>> >>>> ... 17 more >>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und >>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die >>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, >>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und >>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir >>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie >>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung >>>> dieser Informationen ist streng verboten. >>>> >>>> This message is intended only for the named recipient and may contain >>>> confidential or privileged information. As the confidentiality of email >>>> communication cannot be guaranteed, we do not accept any responsibility for >>>> the confidentiality and the intactness of this message. If you have >>>> received it in error, please advise the sender by return e-mail and delete >>>> this message and any attachments. Any unauthorised use or dissemination of >>>> this information is strictly prohibited. >>>> >>>