Hi,
what timestamps are you assigning? Is it guaranteed that all of them would
fall into the same 30 second window?

The issue with duplicate printing in the ElementSelector is strange? Could
you post a more complete code example so that I can reproduce the problem?

Cheers,
Aljoscha

On Mon, 27 Jun 2016 at 13:21 Vinay Patil <vinay18.pa...@gmail.com> wrote:

> Hi ,
>
> I am able to get the matching and non-matching elements.
>
> However when I am unit testing the code , I am getting one record less
> inside the overriden cogroup function.
> Testing the following way :
>
> 1) Insert 5 messages into local kafka topic (test1)
> 2) Insert different 5 messages into local kafka topic (test2)
> 3) Consume 1) and 2) and I have two different kafka  streams
> 4) Generate ascending timestamp(using Event Time) for both streams and
> create key(String)
>
> Now till 4) I am able to get all the records (checked by printing the
> stream in text file)
>
> However when I send the stream to co-group operator, I am receiving one
> less record, using the following syntax:
>
> sourceStream.coGroup(destStream)
> .where(new ElementSelector())
> .equalTo(new ElementSelector())
> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> .apply(new JoinStreams);
>
> Also in the Element Selector I have inserted a sysout, I am getting 20
> sysouts instead of 10 (10 sysouts for source and 10 for dest stream)
>
> Unable to understand why one record is coming less to co-group
>
>
>
> Regards,
> Vinay Patil
>
> On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Can you add a flag to each element emitted by the CoGroupFunction that
> > indicates whether it was joined or not?
> > Then you can use split to distinguish between both cases and handle both
> > streams differently.
> >
> > Best, Fabian
> >
> > 2016-06-15 6:45 GMT+02:00 Vinay Patil <vinay18.pa...@gmail.com>:
> >
> > > Hi Jark,
> > >
> > > I am able to get the non-matching elements in a stream :,
> > >
> > > Of-course we can collect the matching elements in the same stream as
> > well,
> > > however I want to perform additional operations on the joined stream
> > before
> > > writing it to S3, so I would have to include a separate join operator
> for
> > > the same two streams, right ?
> > > Correct me if I am wrong.
> > >
> > > I have pasted the dummy code which collects the non-matching records (i
> > > have to perform this on the actual data, correct me if I am dong
> wrong).
> > >
> > > sourceStream.coGroup(destStream).where(new
> ElementSelector()).equalTo(new
> > > ElementSelector())
> > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > >
> > > private static final long serialVersionUID = 6408179761497497475L;
> > >
> > > @Override
> > > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
> > > paramIterable1,
> > > Collector<Integer> paramCollector) throws Exception {
> > > long exactSizeIfKnown =
> > paramIterable.spliterator().getExactSizeIfKnown();
> > > long exactSizeIfKnown2 =
> > > paramIterable1.spliterator().getExactSizeIfKnown();
> > > if(exactSizeIfKnown == 0 ) {
> > > paramCollector.collect(paramIterable1.iterator().next());
> > > } else if (exactSizeIfKnown2 == 0) {
> > > paramCollector.collect(paramIterable.iterator().next());
> > > }
> > > }
> > > }).print();
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > >
> > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <vinay18.pa...@gmail.com>
> > > wrote:
> > >
> > > > You are right, debugged it for all elements , I can do that now.
> > > > Thanks a lot.
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> wuchong...@alibaba-inc.com>
> > > > wrote:
> > > >
> > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2,
> > > >> Collector<Integer> out)` ,   when both iter1 and iter2 are not
> empty,
> > it
> > > >> means they are matched elements from both stream.
> > > >> When one of iter1 and iter2 is empty , it means that they are
> > unmatched.
> > > >>
> > > >>
> > > >> - Jark Wu (wuchong)
> > > >>
> > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <vinay18.pa...@gmail.com> 写道:
> > > >> >
> > > >> > Hi Matthias ,
> > > >> >
> > > >> > I did not get you, even if we use Co-Group we have to apply it on
> a
> > > key
> > > >> >
> > > >> > sourceStream.coGroup(destStream)
> > > >> > .where(new ElementSelector())
> > > >> > .equalTo(new ElementSelector())
> > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > > >> > private static final long serialVersionUID = 6408179761497497475L;
> > > >> >
> > > >> > @Override
> > > >> > public void coGroup(Iterable<Integer> paramIterable,
> > Iterable<Integer>
> > > >> > paramIterable1,
> > > >> > Collector<Integer> paramCollector) throws Exception {
> > > >> > Iterator<Integer> iterator = paramIterable.iterator();
> > > >> > while(iterator.hasNext()) {
> > > >> > }
> > > >> > }
> > > >> > });
> > > >> >
> > > >> > when I debug this ,only the matched element from both stream will
> > come
> > > >> in
> > > >> > the coGroup function.
> > > >> >
> > > >> > What I want is how do I check for unmatched elements from both
> > streams
> > > >> and
> > > >> > write it to sink.
> > > >> >
> > > >> > Regards,
> > > >> > Vinay Patil
> > > >> >
> > > >> > *+91-800-728-4749*
> > > >> >
> > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <
> mj...@apache.org>
> > > >> wrote:
> > > >> >
> > > >> >> You need to do an outer-join. However, there is no build-in
> support
> > > for
> > > >> >> outer-joins yet.
> > > >> >>
> > > >> >> You can use Window-CoGroup to implement the outer-join as an own
> > > >> operator.
> > > >> >>
> > > >> >>
> > > >> >> -Matthias
> > > >> >>
> > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > > >> >>> Hi,
> > > >> >>>
> > > >> >>> I have a question regarding the join operation, consider the
> > > following
> > > >> >>> dummy example:
> > > >> >>>
> > > >> >>> StreamExecutionEnvironment env =
> > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> > > >> >>>
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > >> >>> DataStreamSource<Integer> sourceStream =
> > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > > >> >>> DataStreamSource<Integer> destStream =
> > > >> >> env.fromElements(20,30,40,50,60,10);
> > > >> >>>
> > > >> >>> sourceStream.join(destStream)
> > > >> >>> .where(new ElementSelector())
> > > >> >>> .equalTo(new ElementSelector())
> > > >> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
> > > >> >>>
> > > >> >>> private static final long serialVersionUID = 1L;
> > > >> >>>
> > > >> >>> @Override
> > > >> >>> public Integer join(Integer paramIN1, Integer paramIN2) throws
> > > >> Exception
> > > >> >> {
> > > >> >>> return paramIN1;
> > > >> >>> }
> > > >> >>> }).print();
> > > >> >>>
> > > >> >>> I perfectly get the elements that are matching in both the
> > streams,
> > > >> >> however
> > > >> >>> my requirement is to write these matched elements and also the
> > > >> unmatched
> > > >> >>> elements to sink(S3)
> > > >> >>>
> > > >> >>> How do I get the unmatched elements from each stream ?
> > > >> >>>
> > > >> >>> Regards,
> > > >> >>> Vinay Patil
> > > >> >>>
> > > >> >>
> > > >> >>
> > > >>
> > > >>
> > > >
> > >
> >
>

Reply via email to