Hi ,

I am able to get the matching and non-matching elements.

However when I am unit testing the code , I am getting one record less
inside the overriden cogroup function.
Testing the following way :

1) Insert 5 messages into local kafka topic (test1)
2) Insert different 5 messages into local kafka topic (test2)
3) Consume 1) and 2) and I have two different kafka  streams
4) Generate ascending timestamp(using Event Time) for both streams and
create key(String)

Now till 4) I am able to get all the records (checked by printing the
stream in text file)

However when I send the stream to co-group operator, I am receiving one
less record, using the following syntax:

sourceStream.coGroup(destStream)
.where(new ElementSelector())
.equalTo(new ElementSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.apply(new JoinStreams);

Also in the Element Selector I have inserted a sysout, I am getting 20
sysouts instead of 10 (10 sysouts for source and 10 for dest stream)

Unable to understand why one record is coming less to co-group



Regards,
Vinay Patil

On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Can you add a flag to each element emitted by the CoGroupFunction that
> indicates whether it was joined or not?
> Then you can use split to distinguish between both cases and handle both
> streams differently.
>
> Best, Fabian
>
> 2016-06-15 6:45 GMT+02:00 Vinay Patil <vinay18.pa...@gmail.com>:
>
> > Hi Jark,
> >
> > I am able to get the non-matching elements in a stream :,
> >
> > Of-course we can collect the matching elements in the same stream as
> well,
> > however I want to perform additional operations on the joined stream
> before
> > writing it to S3, so I would have to include a separate join operator for
> > the same two streams, right ?
> > Correct me if I am wrong.
> >
> > I have pasted the dummy code which collects the non-matching records (i
> > have to perform this on the actual data, correct me if I am dong wrong).
> >
> > sourceStream.coGroup(destStream).where(new ElementSelector()).equalTo(new
> > ElementSelector())
> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> >
> > private static final long serialVersionUID = 6408179761497497475L;
> >
> > @Override
> > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
> > paramIterable1,
> > Collector<Integer> paramCollector) throws Exception {
> > long exactSizeIfKnown =
> paramIterable.spliterator().getExactSizeIfKnown();
> > long exactSizeIfKnown2 =
> > paramIterable1.spliterator().getExactSizeIfKnown();
> > if(exactSizeIfKnown == 0 ) {
> > paramCollector.collect(paramIterable1.iterator().next());
> > } else if (exactSizeIfKnown2 == 0) {
> > paramCollector.collect(paramIterable.iterator().next());
> > }
> > }
> > }).print();
> >
> > Regards,
> > Vinay Patil
> >
> >
> > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <vinay18.pa...@gmail.com>
> > wrote:
> >
> > > You are right, debugged it for all elements , I can do that now.
> > > Thanks a lot.
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <wuchong...@alibaba-inc.com>
> > > wrote:
> > >
> > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2,
> > >> Collector<Integer> out)` ,   when both iter1 and iter2 are not empty,
> it
> > >> means they are matched elements from both stream.
> > >> When one of iter1 and iter2 is empty , it means that they are
> unmatched.
> > >>
> > >>
> > >> - Jark Wu (wuchong)
> > >>
> > >> > 在 2016年6月14日,下午12:46,Vinay Patil <vinay18.pa...@gmail.com> 写道:
> > >> >
> > >> > Hi Matthias ,
> > >> >
> > >> > I did not get you, even if we use Co-Group we have to apply it on a
> > key
> > >> >
> > >> > sourceStream.coGroup(destStream)
> > >> > .where(new ElementSelector())
> > >> > .equalTo(new ElementSelector())
> > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > >> > private static final long serialVersionUID = 6408179761497497475L;
> > >> >
> > >> > @Override
> > >> > public void coGroup(Iterable<Integer> paramIterable,
> Iterable<Integer>
> > >> > paramIterable1,
> > >> > Collector<Integer> paramCollector) throws Exception {
> > >> > Iterator<Integer> iterator = paramIterable.iterator();
> > >> > while(iterator.hasNext()) {
> > >> > }
> > >> > }
> > >> > });
> > >> >
> > >> > when I debug this ,only the matched element from both stream will
> come
> > >> in
> > >> > the coGroup function.
> > >> >
> > >> > What I want is how do I check for unmatched elements from both
> streams
> > >> and
> > >> > write it to sink.
> > >> >
> > >> > Regards,
> > >> > Vinay Patil
> > >> >
> > >> > *+91-800-728-4749*
> > >> >
> > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <mj...@apache.org>
> > >> wrote:
> > >> >
> > >> >> You need to do an outer-join. However, there is no build-in support
> > for
> > >> >> outer-joins yet.
> > >> >>
> > >> >> You can use Window-CoGroup to implement the outer-join as an own
> > >> operator.
> > >> >>
> > >> >>
> > >> >> -Matthias
> > >> >>
> > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > >> >>> Hi,
> > >> >>>
> > >> >>> I have a question regarding the join operation, consider the
> > following
> > >> >>> dummy example:
> > >> >>>
> > >> >>> StreamExecutionEnvironment env =
> > >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> > >> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > >> >>> DataStreamSource<Integer> sourceStream =
> > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > >> >>> DataStreamSource<Integer> destStream =
> > >> >> env.fromElements(20,30,40,50,60,10);
> > >> >>>
> > >> >>> sourceStream.join(destStream)
> > >> >>> .where(new ElementSelector())
> > >> >>> .equalTo(new ElementSelector())
> > >> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
> > >> >>>
> > >> >>> private static final long serialVersionUID = 1L;
> > >> >>>
> > >> >>> @Override
> > >> >>> public Integer join(Integer paramIN1, Integer paramIN2) throws
> > >> Exception
> > >> >> {
> > >> >>> return paramIN1;
> > >> >>> }
> > >> >>> }).print();
> > >> >>>
> > >> >>> I perfectly get the elements that are matching in both the
> streams,
> > >> >> however
> > >> >>> my requirement is to write these matched elements and also the
> > >> unmatched
> > >> >>> elements to sink(S3)
> > >> >>>
> > >> >>> How do I get the unmatched elements from each stream ?
> > >> >>>
> > >> >>> Regards,
> > >> >>> Vinay Patil
> > >> >>>
> > >> >>
> > >> >>
> > >>
> > >>
> > >
> >
>

Reply via email to