Hi , I am able to get the matching and non-matching elements.
However when I am unit testing the code , I am getting one record less inside the overriden cogroup function. Testing the following way : 1) Insert 5 messages into local kafka topic (test1) 2) Insert different 5 messages into local kafka topic (test2) 3) Consume 1) and 2) and I have two different kafka streams 4) Generate ascending timestamp(using Event Time) for both streams and create key(String) Now till 4) I am able to get all the records (checked by printing the stream in text file) However when I send the stream to co-group operator, I am receiving one less record, using the following syntax: sourceStream.coGroup(destStream) .where(new ElementSelector()) .equalTo(new ElementSelector()) .window(TumblingEventTimeWindows.of(Time.seconds(30))) .apply(new JoinStreams); Also in the Element Selector I have inserted a sysout, I am getting 20 sysouts instead of 10 (10 sysouts for source and 10 for dest stream) Unable to understand why one record is coming less to co-group Regards, Vinay Patil On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Can you add a flag to each element emitted by the CoGroupFunction that > indicates whether it was joined or not? > Then you can use split to distinguish between both cases and handle both > streams differently. > > Best, Fabian > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <vinay18.pa...@gmail.com>: > > > Hi Jark, > > > > I am able to get the non-matching elements in a stream :, > > > > Of-course we can collect the matching elements in the same stream as > well, > > however I want to perform additional operations on the joined stream > before > > writing it to S3, so I would have to include a separate join operator for > > the same two streams, right ? > > Correct me if I am wrong. > > > > I have pasted the dummy code which collects the non-matching records (i > > have to perform this on the actual data, correct me if I am dong wrong). > > > > sourceStream.coGroup(destStream).where(new ElementSelector()).equalTo(new > > ElementSelector()) > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > > .apply(new CoGroupFunction<Integer, Integer, Integer>() { > > > > private static final long serialVersionUID = 6408179761497497475L; > > > > @Override > > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer> > > paramIterable1, > > Collector<Integer> paramCollector) throws Exception { > > long exactSizeIfKnown = > paramIterable.spliterator().getExactSizeIfKnown(); > > long exactSizeIfKnown2 = > > paramIterable1.spliterator().getExactSizeIfKnown(); > > if(exactSizeIfKnown == 0 ) { > > paramCollector.collect(paramIterable1.iterator().next()); > > } else if (exactSizeIfKnown2 == 0) { > > paramCollector.collect(paramIterable.iterator().next()); > > } > > } > > }).print(); > > > > Regards, > > Vinay Patil > > > > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <vinay18.pa...@gmail.com> > > wrote: > > > > > You are right, debugged it for all elements , I can do that now. > > > Thanks a lot. > > > > > > Regards, > > > Vinay Patil > > > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <wuchong...@alibaba-inc.com> > > > wrote: > > > > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2, > > >> Collector<Integer> out)` , when both iter1 and iter2 are not empty, > it > > >> means they are matched elements from both stream. > > >> When one of iter1 and iter2 is empty , it means that they are > unmatched. > > >> > > >> > > >> - Jark Wu (wuchong) > > >> > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <vinay18.pa...@gmail.com> 写道: > > >> > > > >> > Hi Matthias , > > >> > > > >> > I did not get you, even if we use Co-Group we have to apply it on a > > key > > >> > > > >> > sourceStream.coGroup(destStream) > > >> > .where(new ElementSelector()) > > >> > .equalTo(new ElementSelector()) > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() { > > >> > private static final long serialVersionUID = 6408179761497497475L; > > >> > > > >> > @Override > > >> > public void coGroup(Iterable<Integer> paramIterable, > Iterable<Integer> > > >> > paramIterable1, > > >> > Collector<Integer> paramCollector) throws Exception { > > >> > Iterator<Integer> iterator = paramIterable.iterator(); > > >> > while(iterator.hasNext()) { > > >> > } > > >> > } > > >> > }); > > >> > > > >> > when I debug this ,only the matched element from both stream will > come > > >> in > > >> > the coGroup function. > > >> > > > >> > What I want is how do I check for unmatched elements from both > streams > > >> and > > >> > write it to sink. > > >> > > > >> > Regards, > > >> > Vinay Patil > > >> > > > >> > *+91-800-728-4749* > > >> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <mj...@apache.org> > > >> wrote: > > >> > > > >> >> You need to do an outer-join. However, there is no build-in support > > for > > >> >> outer-joins yet. > > >> >> > > >> >> You can use Window-CoGroup to implement the outer-join as an own > > >> operator. > > >> >> > > >> >> > > >> >> -Matthias > > >> >> > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote: > > >> >>> Hi, > > >> >>> > > >> >>> I have a question regarding the join operation, consider the > > following > > >> >>> dummy example: > > >> >>> > > >> >>> StreamExecutionEnvironment env = > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment(); > > >> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > > >> >>> DataStreamSource<Integer> sourceStream = > > >> >>> env.fromElements(10,20,23,25,30,33,102,18); > > >> >>> DataStreamSource<Integer> destStream = > > >> >> env.fromElements(20,30,40,50,60,10); > > >> >>> > > >> >>> sourceStream.join(destStream) > > >> >>> .where(new ElementSelector()) > > >> >>> .equalTo(new ElementSelector()) > > >> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) > > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() { > > >> >>> > > >> >>> private static final long serialVersionUID = 1L; > > >> >>> > > >> >>> @Override > > >> >>> public Integer join(Integer paramIN1, Integer paramIN2) throws > > >> Exception > > >> >> { > > >> >>> return paramIN1; > > >> >>> } > > >> >>> }).print(); > > >> >>> > > >> >>> I perfectly get the elements that are matching in both the > streams, > > >> >> however > > >> >>> my requirement is to write these matched elements and also the > > >> unmatched > > >> >>> elements to sink(S3) > > >> >>> > > >> >>> How do I get the unmatched elements from each stream ? > > >> >>> > > >> >>> Regards, > > >> >>> Vinay Patil > > >> >>> > > >> >> > > >> >> > > >> > > >> > > > > > >