I think the issue is that t2 is not registered to keyed state, so it is being shared across all of the keys on that taskmanager. Take a look at this article:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#using-managed-keyed-state Basically you need to change t2 to be a ValueState[Tuple2[TrackEvent,RulesEvent]] and register it with a ValueStateDescriptor in in the function's open method. Then access it using t2.value() and t2.update(). Hopefully that helps. On Thu, May 4, 2017 at 9:17 AM, Tarek khal <tarek.khal.leta...@gmail.com> wrote: > Hi , > I have two kafka topics (tracking and rules) and I would like to join > "tracking" datastream with "rules" datastream as the data arrives in the > "tracking" datastream. > > The problem with a join is that the rules only “survive” for the length of > the window while I suspect that i want them to survive longer than that so > that they can be applied to events arriving in the future. > > I tested ConnectedStream and CoFlatMapFunction but the result is not as I > wait. > > *For the execution:* > > 1) I added 3 rules on "rules" topic (imei: "01","02,"03") > 2) Perform 15 events with different imei but i guess i have problem with > "keyby" > > *Result : * > > <http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/file/n12999/2222222.jpg> > > *Code :* > > ConnectedStreams<TrackEvent, RulesEvent> connectedStreams = > inputEventStream.connect(inputRulesStream).keyBy("imei","imei"); > DataStream<Tuple2<TrackEvent, RulesEvent>> ds= > connectedStreams.flatMap(new CoFlatMapFunction<TrackEvent, RulesEvent, > Tuple2<TrackEvent,RulesEvent>>() { > Tuple2<TrackEvent,RulesEvent> t2=new Tuple2<TrackEvent, > RulesEvent>(); > @Override > public void flatMap1(TrackEvent trackEvent, > Collector<Tuple2<TrackEvent, RulesEvent>> collector) throws Exception { > t2.f0=trackEvent; > collector.collect(t2); > // t2=new Tuple2<TrackEvent, RulesEvent>(); > } > > @Override > public void flatMap2(RulesEvent rulesEvent, > Collector<Tuple2<TrackEvent, RulesEvent>> collector) throws Exception { > t2.f1 = rulesEvent; > //collector.collect(t2); > } > }); > ds.printToErr(); > > Best, > > > > > > -- > View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/ConnectedStream- > keyby-issues-tp12999.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. > -- *Jason Brelloch* | Product Developer 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 <http://www.bettercloud.com/> Subscribe to the BetterCloud Monitor <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> - Get IT delivered to your inbox