I found the following link about this. Still looks applicable. In my case, I don't need to do a broadcast join. https://www.alibabacloud.com/blog/flink-how-to-optimize-sql-performance-using-multiple-input-operators_597839
On Thu, Oct 21, 2021 at 9:51 PM Dan Hill <quietgol...@gmail.com> wrote: > Interesting. Thanks, JING ZHANG! > > On Mon, Oct 18, 2021 at 12:16 AM JING ZHANG <beyond1...@gmail.com> wrote: > >> Hi Dan, >> > I'm guessing I violate the "The second operator needs to be >> single-input (i.e. no TwoInputOp nor union() before)" part. >> I think.you are right. >> >> Do you want to remove shuffle of two inputs in your case? If yes, Flink >> provides >> support for multiple input operators since 1.11 version. I think it >> might satisfy your need. You could find more in [1]. >> However, at present, this function does not provide a complete interface >> of dataStream API. If users want to use it, they need to manually create >> multipleInputTransformation and multipleConnectedStreams. >> >>> MultipleInputTransformation<Long> transform = new >>> MultipleInputTransformation<>( >>> "My Operator", >>> new SumAllInputOperatorFactory(), >>> BasicTypeInfo.LONG_TYPE_INFO, >>> 1); >>> >>> env.addOperator(transform >>> .addInput(source1.getTransformation()) >>> .addInput(source2.getTransformation()) >>> .addInput(source3.getTransformation())); >>> new MultipleConnectedStreams(env) >>> .transform(transform) >>> .addSink(resultSink); >>> >>> >> I would invite @Piotr to double check this conclusion. He is more >> professional on this topic. >> >> @Piotr, Would you please check Dan's question? Please correct me if I'm >> wrong. >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink?spm=a2c65.11461447.0.0.786d2323FtzWaR >> >> Best, >> JING ZHANG >> >> Dan Hill <quietgol...@gmail.com> 于2021年10月16日周六 上午6:28写道: >> >>> Thanks Thias and JING ZHANG! >>> >>> Here's a Google drive folder link >>> <https://drive.google.com/drive/folders/1chE0i3HyBWgREeRuM2qlGU8sK1jr4rLm?usp=sharing> >>> with the execution plan and two screenshots from the job graph. >>> >>> I'm guessing I violate the "The second operator needs to be >>> single-input (i.e. no TwoInputOp nor union() before)" part. >>> >>> After I do a transformation on a KeyedStream (so it goes back to a >>> SingleOutputStreamOperator), even if I do a simple map, it usually >>> disallows operator chaining. Even with reinterpretAsKeyedStream, it >>> doesn't work. >>> >>> >>> On Fri, Oct 15, 2021 at 12:34 AM JING ZHANG <beyond1...@gmail.com> >>> wrote: >>> >>>> Hi Dan, >>>> Sorry for tipos, I meant to provide the code to reproduce the problem. >>>> If the current program is complex and secret, maybe you could try to >>>> simplify the code. >>>> Besides, Matthias's guess is very reasonable. Could you please whether >>>> is there network shuffle between your two operators. Were those two >>>> operators chained into one vertex? >>>> >>>> Best, >>>> JING ZHANG >>>> >>>> Schwalbe Matthias <matthias.schwa...@viseca.ch> 于2021年10月15日周五 >>>> 下午2:57写道: >>>> >>>>> … didn’t mean to hit the send button so soon 😊 >>>>> >>>>> >>>>> >>>>> I guess we are getting closer to a solution >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Thias >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> *From:* Schwalbe Matthias >>>>> *Sent:* Freitag, 15. Oktober 2021 08:49 >>>>> *To:* 'Dan Hill' <quietgol...@gmail.com>; user <user@flink.apache.org> >>>>> *Subject:* RE: Any issues with reinterpretAsKeyedStream when scaling >>>>> partitions? >>>>> >>>>> >>>>> >>>>> Hi Dan again 😊, >>>>> >>>>> >>>>> >>>>> I shed a second look … from what I see from your call stack I conclude >>>>> that indeed you have a network shuffle between your two operators, >>>>> >>>>> In which case reinterpretAsKeyedStream wouldn’t work >>>>> >>>>> >>>>> >>>>> ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277 >>>>> indicates that the two operators are not chained) >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> … just as a double-check could you please share both your >>>>> >>>>> - Execution plan (call println(env.getExecutionPlan) right before >>>>> your call env.execute) (json), and >>>>> - Your job plan (screenshot from flink dashboard) >>>>> >>>>> >>>>> >>>>> There is a number of preconditions before two operators get chained, >>>>> and probably one of them fails (see [1]): >>>>> >>>>> - The two operators need to allow chaining the resp. other (see >>>>> [2] … chaining strategy) >>>>> - We need a ForwardPartitioner in between >>>>> - We need to be in streaming mode >>>>> - Both operators need the same parallelism >>>>> - Chaining needs to be enabled for the streaming environment >>>>> - The second operator needs to be single-input (i.e. no TwoInputOp >>>>> nor union() before) >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> [1] >>>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873 >>>>> >>>>> [2] >>>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932 >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> *From:* Dan Hill <quietgol...@gmail.com> >>>>> *Sent:* Donnerstag, 14. Oktober 2021 17:50 >>>>> *To:* user <user@flink.apache.org> >>>>> *Subject:* Any issues with reinterpretAsKeyedStream when scaling >>>>> partitions? >>>>> >>>>> >>>>> >>>>> I have a job that uses reinterpretAsKeyedStream across a simple map to >>>>> avoid a shuffle. When changing the number of partitions, I'm hitting an >>>>> issue with registerEventTimeTimer complaining that "key group from 110 to >>>>> 119 does not contain 186". I'm using Flink v1.12.3. >>>>> >>>>> >>>>> >>>>> Any thoughts on this? I don't know if there is a known issue >>>>> with reinterpretAsKeyedStream. >>>>> >>>>> >>>>> >>>>> Rough steps: >>>>> >>>>> 1. I have a raw input stream of View records. I keyBy the View using >>>>> Tuple2<Long, String>(platform_id, log_user_id). >>>>> >>>>> 2. I do a small transformation of View to a TinyView. I >>>>> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key. >>>>> The keys are the same. >>>>> >>>>> 3. I use the TinyView in a KeyedCoProcessFunction. >>>>> >>>>> >>>>> >>>>> When I savepoint and start again with a different number of >>>>> partitions, my KeyedCoProcessFunction hits an issue >>>>> with registerEventTimeTimer and complains that "key group from 110 to 119 >>>>> does not contain 186". I verified that the key does not change and that >>>>> we >>>>> use Tuple2 with primitives Long and String. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> 2021-10-14 08:17:07 >>>>> >>>>> java.lang.IllegalArgumentException: view x insertion issue with >>>>> registerEventTimeTimer for key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174), >>>>> flat=platform_id: 120 >>>>> >>>>> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174" >>>>> >>>>> log_timestamp: 1634224329606 >>>>> >>>>> view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2" >>>>> >>>>> >>>>> >>>>> at >>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318) >>>>> >>>>> at >>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59) >>>>> >>>>> at >>>>> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36) >>>>> >>>>> at >>>>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) >>>>> >>>>> at >>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >>>>> >>>>> at >>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >>>>> >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> Caused by: java.lang.IllegalArgumentException: key group from 110 to >>>>> 119 does not contain 186 >>>>> >>>>> at >>>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) >>>>> >>>>> at >>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191) >>>>> >>>>> at >>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186) >>>>> >>>>> at >>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179) >>>>> >>>>> at >>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114) >>>>> >>>>> at >>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233) >>>>> >>>>> at >>>>> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52) >>>>> >>>>> at >>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315) >>>>> >>>>> ... 17 more >>>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und >>>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die >>>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, >>>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und >>>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir >>>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie >>>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung >>>>> dieser Informationen ist streng verboten. >>>>> >>>>> This message is intended only for the named recipient and may contain >>>>> confidential or privileged information. As the confidentiality of email >>>>> communication cannot be guaranteed, we do not accept any responsibility >>>>> for >>>>> the confidentiality and the intactness of this message. If you have >>>>> received it in error, please advise the sender by return e-mail and delete >>>>> this message and any attachments. Any unauthorised use or dissemination of >>>>> this information is strictly prohibited. >>>>> >>>>