Hi,

Following is the timestamp I am getting from DTO, here is the timestamp
difference between the two records :
1466115892162154279
1466116026233613409

So the time difference is roughly 3 min, even if I apply the window of 5min
, I am not getting the last record (last timestamp value above),
using ascending timestamp extractor for generating the timestamp (assuming
that the timestamp are always in order)

I was at-least expecting data to reach the co-group function.
What could be the reason for the data loss ? The data we are getting is
critical, hence we cannot afford to loose any data


Regards,
Vinay Patil

On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <vinay18.pa...@gmail.com>
wrote:

> Just an update, when I keep IngestionTime and remove the timestamp I am
> generating, I am getting all the records, but for Event Time I am getting
> one less record, I checked the Time Difference between two records, it is 3
> min, I tried keeping the window time to 5 mins, but that even did not work.
>
> Even when I try assigning timestamp for IngestionTime, I get one record
> less, so should I safely use Ingestion Time or is it always advisable to
> use EventTime ?
>
> Regards,
> Vinay Patil
>
> On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <vinay18.pa...@gmail.com>
> wrote:
>
>> Hi ,
>>
>> Actually I am only publishing 5 messages each to two different kafka
>> topics (using Junit), even if I keep the window to 500 seconds the result
>> is same.
>>
>> I am not understanding why it is not sending the 5th element to co-group
>> operator even when the keys are same.
>>
>> I actually cannot share the actual client code.
>> But this is what the streams look like :
>> sourceStream.coGroup(destStream)
>> here the sourceStream and destStream is actually Tuple2<String,DTO> , and
>> the ElementSelector returns tuple.f0 which is the key.
>>
>> I am generating the timestamp based on a field from the DTO which is
>> guaranteed to be in order.
>>
>> Will using the triggers help here ?
>>
>>
>> Regards,
>> Vinay Patil
>>
>> *+91-800-728-4749*
>>
>> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> what timestamps are you assigning? Is it guaranteed that all of them
>>> would
>>> fall into the same 30 second window?
>>>
>>> The issue with duplicate printing in the ElementSelector is strange?
>>> Could
>>> you post a more complete code example so that I can reproduce the
>>> problem?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <vinay18.pa...@gmail.com>
>>> wrote:
>>>
>>> > Hi ,
>>> >
>>> > I am able to get the matching and non-matching elements.
>>> >
>>> > However when I am unit testing the code , I am getting one record less
>>> > inside the overriden cogroup function.
>>> > Testing the following way :
>>> >
>>> > 1) Insert 5 messages into local kafka topic (test1)
>>> > 2) Insert different 5 messages into local kafka topic (test2)
>>> > 3) Consume 1) and 2) and I have two different kafka  streams
>>> > 4) Generate ascending timestamp(using Event Time) for both streams and
>>> > create key(String)
>>> >
>>> > Now till 4) I am able to get all the records (checked by printing the
>>> > stream in text file)
>>> >
>>> > However when I send the stream to co-group operator, I am receiving one
>>> > less record, using the following syntax:
>>> >
>>> > sourceStream.coGroup(destStream)
>>> > .where(new ElementSelector())
>>> > .equalTo(new ElementSelector())
>>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> > .apply(new JoinStreams);
>>> >
>>> > Also in the Element Selector I have inserted a sysout, I am getting 20
>>> > sysouts instead of 10 (10 sysouts for source and 10 for dest stream)
>>> >
>>> > Unable to understand why one record is coming less to co-group
>>> >
>>> >
>>> >
>>> > Regards,
>>> > Vinay Patil
>>> >
>>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>> >
>>> > > Can you add a flag to each element emitted by the CoGroupFunction
>>> that
>>> > > indicates whether it was joined or not?
>>> > > Then you can use split to distinguish between both cases and handle
>>> both
>>> > > streams differently.
>>> > >
>>> > > Best, Fabian
>>> > >
>>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <vinay18.pa...@gmail.com>:
>>> > >
>>> > > > Hi Jark,
>>> > > >
>>> > > > I am able to get the non-matching elements in a stream :,
>>> > > >
>>> > > > Of-course we can collect the matching elements in the same stream
>>> as
>>> > > well,
>>> > > > however I want to perform additional operations on the joined
>>> stream
>>> > > before
>>> > > > writing it to S3, so I would have to include a separate join
>>> operator
>>> > for
>>> > > > the same two streams, right ?
>>> > > > Correct me if I am wrong.
>>> > > >
>>> > > > I have pasted the dummy code which collects the non-matching
>>> records (i
>>> > > > have to perform this on the actual data, correct me if I am dong
>>> > wrong).
>>> > > >
>>> > > > sourceStream.coGroup(destStream).where(new
>>> > ElementSelector()).equalTo(new
>>> > > > ElementSelector())
>>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> > > > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
>>> > > >
>>> > > > private static final long serialVersionUID = 6408179761497497475L;
>>> > > >
>>> > > > @Override
>>> > > > public void coGroup(Iterable<Integer> paramIterable,
>>> Iterable<Integer>
>>> > > > paramIterable1,
>>> > > > Collector<Integer> paramCollector) throws Exception {
>>> > > > long exactSizeIfKnown =
>>> > > paramIterable.spliterator().getExactSizeIfKnown();
>>> > > > long exactSizeIfKnown2 =
>>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
>>> > > > if(exactSizeIfKnown == 0 ) {
>>> > > > paramCollector.collect(paramIterable1.iterator().next());
>>> > > > } else if (exactSizeIfKnown2 == 0) {
>>> > > > paramCollector.collect(paramIterable.iterator().next());
>>> > > > }
>>> > > > }
>>> > > > }).print();
>>> > > >
>>> > > > Regards,
>>> > > > Vinay Patil
>>> > > >
>>> > > >
>>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
>>> vinay18.pa...@gmail.com>
>>> > > > wrote:
>>> > > >
>>> > > > > You are right, debugged it for all elements , I can do that now.
>>> > > > > Thanks a lot.
>>> > > > >
>>> > > > > Regards,
>>> > > > > Vinay Patil
>>> > > > >
>>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
>>> > wuchong...@alibaba-inc.com>
>>> > > > > wrote:
>>> > > > >
>>> > > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2,
>>> > > > >> Collector<Integer> out)` ,   when both iter1 and iter2 are not
>>> > empty,
>>> > > it
>>> > > > >> means they are matched elements from both stream.
>>> > > > >> When one of iter1 and iter2 is empty , it means that they are
>>> > > unmatched.
>>> > > > >>
>>> > > > >>
>>> > > > >> - Jark Wu (wuchong)
>>> > > > >>
>>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <vinay18.pa...@gmail.com>
>>> 写道:
>>> > > > >> >
>>> > > > >> > Hi Matthias ,
>>> > > > >> >
>>> > > > >> > I did not get you, even if we use Co-Group we have to apply
>>> it on
>>> > a
>>> > > > key
>>> > > > >> >
>>> > > > >> > sourceStream.coGroup(destStream)
>>> > > > >> > .where(new ElementSelector())
>>> > > > >> > .equalTo(new ElementSelector())
>>> > > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> > > > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
>>> > > > >> > private static final long serialVersionUID =
>>> 6408179761497497475L;
>>> > > > >> >
>>> > > > >> > @Override
>>> > > > >> > public void coGroup(Iterable<Integer> paramIterable,
>>> > > Iterable<Integer>
>>> > > > >> > paramIterable1,
>>> > > > >> > Collector<Integer> paramCollector) throws Exception {
>>> > > > >> > Iterator<Integer> iterator = paramIterable.iterator();
>>> > > > >> > while(iterator.hasNext()) {
>>> > > > >> > }
>>> > > > >> > }
>>> > > > >> > });
>>> > > > >> >
>>> > > > >> > when I debug this ,only the matched element from both stream
>>> will
>>> > > come
>>> > > > >> in
>>> > > > >> > the coGroup function.
>>> > > > >> >
>>> > > > >> > What I want is how do I check for unmatched elements from both
>>> > > streams
>>> > > > >> and
>>> > > > >> > write it to sink.
>>> > > > >> >
>>> > > > >> > Regards,
>>> > > > >> > Vinay Patil
>>> > > > >> >
>>> > > > >> > *+91-800-728-4749*
>>> > > > >> >
>>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <
>>> > mj...@apache.org>
>>> > > > >> wrote:
>>> > > > >> >
>>> > > > >> >> You need to do an outer-join. However, there is no build-in
>>> > support
>>> > > > for
>>> > > > >> >> outer-joins yet.
>>> > > > >> >>
>>> > > > >> >> You can use Window-CoGroup to implement the outer-join as an
>>> own
>>> > > > >> operator.
>>> > > > >> >>
>>> > > > >> >>
>>> > > > >> >> -Matthias
>>> > > > >> >>
>>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
>>> > > > >> >>> Hi,
>>> > > > >> >>>
>>> > > > >> >>> I have a question regarding the join operation, consider the
>>> > > > following
>>> > > > >> >>> dummy example:
>>> > > > >> >>>
>>> > > > >> >>> StreamExecutionEnvironment env =
>>> > > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> > > > >> >>>
>>> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>> > > > >> >>> DataStreamSource<Integer> sourceStream =
>>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
>>> > > > >> >>> DataStreamSource<Integer> destStream =
>>> > > > >> >> env.fromElements(20,30,40,50,60,10);
>>> > > > >> >>>
>>> > > > >> >>> sourceStream.join(destStream)
>>> > > > >> >>> .where(new ElementSelector())
>>> > > > >> >>> .equalTo(new ElementSelector())
>>> > > > >> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
>>> > > > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
>>> > > > >> >>>
>>> > > > >> >>> private static final long serialVersionUID = 1L;
>>> > > > >> >>>
>>> > > > >> >>> @Override
>>> > > > >> >>> public Integer join(Integer paramIN1, Integer paramIN2)
>>> throws
>>> > > > >> Exception
>>> > > > >> >> {
>>> > > > >> >>> return paramIN1;
>>> > > > >> >>> }
>>> > > > >> >>> }).print();
>>> > > > >> >>>
>>> > > > >> >>> I perfectly get the elements that are matching in both the
>>> > > streams,
>>> > > > >> >> however
>>> > > > >> >>> my requirement is to write these matched elements and also
>>> the
>>> > > > >> unmatched
>>> > > > >> >>> elements to sink(S3)
>>> > > > >> >>>
>>> > > > >> >>> How do I get the unmatched elements from each stream ?
>>> > > > >> >>>
>>> > > > >> >>> Regards,
>>> > > > >> >>> Vinay Patil
>>> > > > >> >>>
>>> > > > >> >>
>>> > > > >> >>
>>> > > > >>
>>> > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Reply via email to