Hi Aljoscha, Thank you for your response. So do you suggest to use different approach for extracting timestamp (as given in document) instead of AscendingTimeStamp Extractor ? Is that the reason I am seeing this unexpected behaviour ? in case of continuous stream I would not see any data loss ?
Also assuming that the records are always going to be in order , which is the best approach : Ingestion Time or Event Time ? Regards, Vinay Patil On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > first regarding tumbling windows: even if you have 5 minute windows it can > happen that elements that are only seconds apart go into different windows. > Consider the following case: > > | x | x | > > These are two 5-mintue windows and the two elements are only seconds apart > but go into different windows because windows are aligned to epoch. > > Now, for the ascending timestamp extractor. The reason this can behave in > unexpected ways is that it emits a watermark that is "last timestamp - 1", > i.e. if it has seen timestamp t it can only emit watermark t-1 because > there might be other elements with timestamp t arriving. If you have a > continuous stream of elements you wouldn't notice this. Only in this > constructed example does it become visible. > > Cheers, > Aljoscha > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <vinay18.pa...@gmail.com> wrote: > > > Hi, > > > > Following is the timestamp I am getting from DTO, here is the timestamp > > difference between the two records : > > 1466115892162154279 > > 1466116026233613409 > > > > So the time difference is roughly 3 min, even if I apply the window of > 5min > > , I am not getting the last record (last timestamp value above), > > using ascending timestamp extractor for generating the timestamp > (assuming > > that the timestamp are always in order) > > > > I was at-least expecting data to reach the co-group function. > > What could be the reason for the data loss ? The data we are getting is > > critical, hence we cannot afford to loose any data > > > > > > Regards, > > Vinay Patil > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <vinay18.pa...@gmail.com> > > wrote: > > > > > Just an update, when I keep IngestionTime and remove the timestamp I am > > > generating, I am getting all the records, but for Event Time I am > getting > > > one less record, I checked the Time Difference between two records, it > > is 3 > > > min, I tried keeping the window time to 5 mins, but that even did not > > work. > > > > > > Even when I try assigning timestamp for IngestionTime, I get one record > > > less, so should I safely use Ingestion Time or is it always advisable > to > > > use EventTime ? > > > > > > Regards, > > > Vinay Patil > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <vinay18.pa...@gmail.com> > > > wrote: > > > > > >> Hi , > > >> > > >> Actually I am only publishing 5 messages each to two different kafka > > >> topics (using Junit), even if I keep the window to 500 seconds the > > result > > >> is same. > > >> > > >> I am not understanding why it is not sending the 5th element to > co-group > > >> operator even when the keys are same. > > >> > > >> I actually cannot share the actual client code. > > >> But this is what the streams look like : > > >> sourceStream.coGroup(destStream) > > >> here the sourceStream and destStream is actually Tuple2<String,DTO> , > > and > > >> the ElementSelector returns tuple.f0 which is the key. > > >> > > >> I am generating the timestamp based on a field from the DTO which is > > >> guaranteed to be in order. > > >> > > >> Will using the triggers help here ? > > >> > > >> > > >> Regards, > > >> Vinay Patil > > >> > > >> *+91-800-728-4749* > > >> > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek < > aljos...@apache.org> > > >> wrote: > > >> > > >>> Hi, > > >>> what timestamps are you assigning? Is it guaranteed that all of them > > >>> would > > >>> fall into the same 30 second window? > > >>> > > >>> The issue with duplicate printing in the ElementSelector is strange? > > >>> Could > > >>> you post a more complete code example so that I can reproduce the > > >>> problem? > > >>> > > >>> Cheers, > > >>> Aljoscha > > >>> > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <vinay18.pa...@gmail.com> > > >>> wrote: > > >>> > > >>> > Hi , > > >>> > > > >>> > I am able to get the matching and non-matching elements. > > >>> > > > >>> > However when I am unit testing the code , I am getting one record > > less > > >>> > inside the overriden cogroup function. > > >>> > Testing the following way : > > >>> > > > >>> > 1) Insert 5 messages into local kafka topic (test1) > > >>> > 2) Insert different 5 messages into local kafka topic (test2) > > >>> > 3) Consume 1) and 2) and I have two different kafka streams > > >>> > 4) Generate ascending timestamp(using Event Time) for both streams > > and > > >>> > create key(String) > > >>> > > > >>> > Now till 4) I am able to get all the records (checked by printing > the > > >>> > stream in text file) > > >>> > > > >>> > However when I send the stream to co-group operator, I am receiving > > one > > >>> > less record, using the following syntax: > > >>> > > > >>> > sourceStream.coGroup(destStream) > > >>> > .where(new ElementSelector()) > > >>> > .equalTo(new ElementSelector()) > > >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > > >>> > .apply(new JoinStreams); > > >>> > > > >>> > Also in the Element Selector I have inserted a sysout, I am getting > > 20 > > >>> > sysouts instead of 10 (10 sysouts for source and 10 for dest > stream) > > >>> > > > >>> > Unable to understand why one record is coming less to co-group > > >>> > > > >>> > > > >>> > > > >>> > Regards, > > >>> > Vinay Patil > > >>> > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <fhue...@gmail.com> > > >>> wrote: > > >>> > > > >>> > > Can you add a flag to each element emitted by the CoGroupFunction > > >>> that > > >>> > > indicates whether it was joined or not? > > >>> > > Then you can use split to distinguish between both cases and > handle > > >>> both > > >>> > > streams differently. > > >>> > > > > >>> > > Best, Fabian > > >>> > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <vinay18.pa...@gmail.com>: > > >>> > > > > >>> > > > Hi Jark, > > >>> > > > > > >>> > > > I am able to get the non-matching elements in a stream :, > > >>> > > > > > >>> > > > Of-course we can collect the matching elements in the same > stream > > >>> as > > >>> > > well, > > >>> > > > however I want to perform additional operations on the joined > > >>> stream > > >>> > > before > > >>> > > > writing it to S3, so I would have to include a separate join > > >>> operator > > >>> > for > > >>> > > > the same two streams, right ? > > >>> > > > Correct me if I am wrong. > > >>> > > > > > >>> > > > I have pasted the dummy code which collects the non-matching > > >>> records (i > > >>> > > > have to perform this on the actual data, correct me if I am > dong > > >>> > wrong). > > >>> > > > > > >>> > > > sourceStream.coGroup(destStream).where(new > > >>> > ElementSelector()).equalTo(new > > >>> > > > ElementSelector()) > > >>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > > >>> > > > .apply(new CoGroupFunction<Integer, Integer, Integer>() { > > >>> > > > > > >>> > > > private static final long serialVersionUID = > > 6408179761497497475L; > > >>> > > > > > >>> > > > @Override > > >>> > > > public void coGroup(Iterable<Integer> paramIterable, > > >>> Iterable<Integer> > > >>> > > > paramIterable1, > > >>> > > > Collector<Integer> paramCollector) throws Exception { > > >>> > > > long exactSizeIfKnown = > > >>> > > paramIterable.spliterator().getExactSizeIfKnown(); > > >>> > > > long exactSizeIfKnown2 = > > >>> > > > paramIterable1.spliterator().getExactSizeIfKnown(); > > >>> > > > if(exactSizeIfKnown == 0 ) { > > >>> > > > paramCollector.collect(paramIterable1.iterator().next()); > > >>> > > > } else if (exactSizeIfKnown2 == 0) { > > >>> > > > paramCollector.collect(paramIterable.iterator().next()); > > >>> > > > } > > >>> > > > } > > >>> > > > }).print(); > > >>> > > > > > >>> > > > Regards, > > >>> > > > Vinay Patil > > >>> > > > > > >>> > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil < > > >>> vinay18.pa...@gmail.com> > > >>> > > > wrote: > > >>> > > > > > >>> > > > > You are right, debugged it for all elements , I can do that > > now. > > >>> > > > > Thanks a lot. > > >>> > > > > > > >>> > > > > Regards, > > >>> > > > > Vinay Patil > > >>> > > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu < > > >>> > wuchong...@alibaba-inc.com> > > >>> > > > > wrote: > > >>> > > > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> > iter2, > > >>> > > > >> Collector<Integer> out)` , when both iter1 and iter2 are > not > > >>> > empty, > > >>> > > it > > >>> > > > >> means they are matched elements from both stream. > > >>> > > > >> When one of iter1 and iter2 is empty , it means that they > are > > >>> > > unmatched. > > >>> > > > >> > > >>> > > > >> > > >>> > > > >> - Jark Wu (wuchong) > > >>> > > > >> > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <vinay18.pa...@gmail.com > > > > >>> 写道: > > >>> > > > >> > > > >>> > > > >> > Hi Matthias , > > >>> > > > >> > > > >>> > > > >> > I did not get you, even if we use Co-Group we have to > apply > > >>> it on > > >>> > a > > >>> > > > key > > >>> > > > >> > > > >>> > > > >> > sourceStream.coGroup(destStream) > > >>> > > > >> > .where(new ElementSelector()) > > >>> > > > >> > .equalTo(new ElementSelector()) > > >>> > > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > > >>> > > > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() { > > >>> > > > >> > private static final long serialVersionUID = > > >>> 6408179761497497475L; > > >>> > > > >> > > > >>> > > > >> > @Override > > >>> > > > >> > public void coGroup(Iterable<Integer> paramIterable, > > >>> > > Iterable<Integer> > > >>> > > > >> > paramIterable1, > > >>> > > > >> > Collector<Integer> paramCollector) throws Exception { > > >>> > > > >> > Iterator<Integer> iterator = paramIterable.iterator(); > > >>> > > > >> > while(iterator.hasNext()) { > > >>> > > > >> > } > > >>> > > > >> > } > > >>> > > > >> > }); > > >>> > > > >> > > > >>> > > > >> > when I debug this ,only the matched element from both > stream > > >>> will > > >>> > > come > > >>> > > > >> in > > >>> > > > >> > the coGroup function. > > >>> > > > >> > > > >>> > > > >> > What I want is how do I check for unmatched elements from > > both > > >>> > > streams > > >>> > > > >> and > > >>> > > > >> > write it to sink. > > >>> > > > >> > > > >>> > > > >> > Regards, > > >>> > > > >> > Vinay Patil > > >>> > > > >> > > > >>> > > > >> > *+91-800-728-4749* > > >>> > > > >> > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax < > > >>> > mj...@apache.org> > > >>> > > > >> wrote: > > >>> > > > >> > > > >>> > > > >> >> You need to do an outer-join. However, there is no > build-in > > >>> > support > > >>> > > > for > > >>> > > > >> >> outer-joins yet. > > >>> > > > >> >> > > >>> > > > >> >> You can use Window-CoGroup to implement the outer-join as > > an > > >>> own > > >>> > > > >> operator. > > >>> > > > >> >> > > >>> > > > >> >> > > >>> > > > >> >> -Matthias > > >>> > > > >> >> > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote: > > >>> > > > >> >>> Hi, > > >>> > > > >> >>> > > >>> > > > >> >>> I have a question regarding the join operation, consider > > the > > >>> > > > following > > >>> > > > >> >>> dummy example: > > >>> > > > >> >>> > > >>> > > > >> >>> StreamExecutionEnvironment env = > > >>> > > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment(); > > >>> > > > >> >>> > > >>> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > > >>> > > > >> >>> DataStreamSource<Integer> sourceStream = > > >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18); > > >>> > > > >> >>> DataStreamSource<Integer> destStream = > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10); > > >>> > > > >> >>> > > >>> > > > >> >>> sourceStream.join(destStream) > > >>> > > > >> >>> .where(new ElementSelector()) > > >>> > > > >> >>> .equalTo(new ElementSelector()) > > >>> > > > >> >>> > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) > > >>> > > > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() { > > >>> > > > >> >>> > > >>> > > > >> >>> private static final long serialVersionUID = 1L; > > >>> > > > >> >>> > > >>> > > > >> >>> @Override > > >>> > > > >> >>> public Integer join(Integer paramIN1, Integer paramIN2) > > >>> throws > > >>> > > > >> Exception > > >>> > > > >> >> { > > >>> > > > >> >>> return paramIN1; > > >>> > > > >> >>> } > > >>> > > > >> >>> }).print(); > > >>> > > > >> >>> > > >>> > > > >> >>> I perfectly get the elements that are matching in both > the > > >>> > > streams, > > >>> > > > >> >> however > > >>> > > > >> >>> my requirement is to write these matched elements and > also > > >>> the > > >>> > > > >> unmatched > > >>> > > > >> >>> elements to sink(S3) > > >>> > > > >> >>> > > >>> > > > >> >>> How do I get the unmatched elements from each stream ? > > >>> > > > >> >>> > > >>> > > > >> >>> Regards, > > >>> > > > >> >>> Vinay Patil > > >>> > > > >> >>> > > >>> > > > >> >> > > >>> > > > >> >> > > >>> > > > >> > > >>> > > > >> > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >> > > >> > > > > > >