Hi, Following is the timestamp I am getting from DTO, here is the timestamp difference between the two records : 1466115892162154279 1466116026233613409
So the time difference is roughly 3 min, even if I apply the window of 5min , I am not getting the last record (last timestamp value above), using ascending timestamp extractor for generating the timestamp (assuming that the timestamp are always in order) I was at-least expecting data to reach the co-group function. What could be the reason for the data loss ? The data we are getting is critical, hence we cannot afford to loose any data Regards, Vinay Patil On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <vinay18.pa...@gmail.com> wrote: > Just an update, when I keep IngestionTime and remove the timestamp I am > generating, I am getting all the records, but for Event Time I am getting > one less record, I checked the Time Difference between two records, it is 3 > min, I tried keeping the window time to 5 mins, but that even did not work. > > Even when I try assigning timestamp for IngestionTime, I get one record > less, so should I safely use Ingestion Time or is it always advisable to > use EventTime ? > > Regards, > Vinay Patil > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <vinay18.pa...@gmail.com> > wrote: > >> Hi , >> >> Actually I am only publishing 5 messages each to two different kafka >> topics (using Junit), even if I keep the window to 500 seconds the result >> is same. >> >> I am not understanding why it is not sending the 5th element to co-group >> operator even when the keys are same. >> >> I actually cannot share the actual client code. >> But this is what the streams look like : >> sourceStream.coGroup(destStream) >> here the sourceStream and destStream is actually Tuple2<String,DTO> , and >> the ElementSelector returns tuple.f0 which is the key. >> >> I am generating the timestamp based on a field from the DTO which is >> guaranteed to be in order. >> >> Will using the triggers help here ? >> >> >> Regards, >> Vinay Patil >> >> *+91-800-728-4749* >> >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> what timestamps are you assigning? Is it guaranteed that all of them >>> would >>> fall into the same 30 second window? >>> >>> The issue with duplicate printing in the ElementSelector is strange? >>> Could >>> you post a more complete code example so that I can reproduce the >>> problem? >>> >>> Cheers, >>> Aljoscha >>> >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <vinay18.pa...@gmail.com> >>> wrote: >>> >>> > Hi , >>> > >>> > I am able to get the matching and non-matching elements. >>> > >>> > However when I am unit testing the code , I am getting one record less >>> > inside the overriden cogroup function. >>> > Testing the following way : >>> > >>> > 1) Insert 5 messages into local kafka topic (test1) >>> > 2) Insert different 5 messages into local kafka topic (test2) >>> > 3) Consume 1) and 2) and I have two different kafka streams >>> > 4) Generate ascending timestamp(using Event Time) for both streams and >>> > create key(String) >>> > >>> > Now till 4) I am able to get all the records (checked by printing the >>> > stream in text file) >>> > >>> > However when I send the stream to co-group operator, I am receiving one >>> > less record, using the following syntax: >>> > >>> > sourceStream.coGroup(destStream) >>> > .where(new ElementSelector()) >>> > .equalTo(new ElementSelector()) >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30))) >>> > .apply(new JoinStreams); >>> > >>> > Also in the Element Selector I have inserted a sysout, I am getting 20 >>> > sysouts instead of 10 (10 sysouts for source and 10 for dest stream) >>> > >>> > Unable to understand why one record is coming less to co-group >>> > >>> > >>> > >>> > Regards, >>> > Vinay Patil >>> > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> > >>> > > Can you add a flag to each element emitted by the CoGroupFunction >>> that >>> > > indicates whether it was joined or not? >>> > > Then you can use split to distinguish between both cases and handle >>> both >>> > > streams differently. >>> > > >>> > > Best, Fabian >>> > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <vinay18.pa...@gmail.com>: >>> > > >>> > > > Hi Jark, >>> > > > >>> > > > I am able to get the non-matching elements in a stream :, >>> > > > >>> > > > Of-course we can collect the matching elements in the same stream >>> as >>> > > well, >>> > > > however I want to perform additional operations on the joined >>> stream >>> > > before >>> > > > writing it to S3, so I would have to include a separate join >>> operator >>> > for >>> > > > the same two streams, right ? >>> > > > Correct me if I am wrong. >>> > > > >>> > > > I have pasted the dummy code which collects the non-matching >>> records (i >>> > > > have to perform this on the actual data, correct me if I am dong >>> > wrong). >>> > > > >>> > > > sourceStream.coGroup(destStream).where(new >>> > ElementSelector()).equalTo(new >>> > > > ElementSelector()) >>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) >>> > > > .apply(new CoGroupFunction<Integer, Integer, Integer>() { >>> > > > >>> > > > private static final long serialVersionUID = 6408179761497497475L; >>> > > > >>> > > > @Override >>> > > > public void coGroup(Iterable<Integer> paramIterable, >>> Iterable<Integer> >>> > > > paramIterable1, >>> > > > Collector<Integer> paramCollector) throws Exception { >>> > > > long exactSizeIfKnown = >>> > > paramIterable.spliterator().getExactSizeIfKnown(); >>> > > > long exactSizeIfKnown2 = >>> > > > paramIterable1.spliterator().getExactSizeIfKnown(); >>> > > > if(exactSizeIfKnown == 0 ) { >>> > > > paramCollector.collect(paramIterable1.iterator().next()); >>> > > > } else if (exactSizeIfKnown2 == 0) { >>> > > > paramCollector.collect(paramIterable.iterator().next()); >>> > > > } >>> > > > } >>> > > > }).print(); >>> > > > >>> > > > Regards, >>> > > > Vinay Patil >>> > > > >>> > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil < >>> vinay18.pa...@gmail.com> >>> > > > wrote: >>> > > > >>> > > > > You are right, debugged it for all elements , I can do that now. >>> > > > > Thanks a lot. >>> > > > > >>> > > > > Regards, >>> > > > > Vinay Patil >>> > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu < >>> > wuchong...@alibaba-inc.com> >>> > > > > wrote: >>> > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2, >>> > > > >> Collector<Integer> out)` , when both iter1 and iter2 are not >>> > empty, >>> > > it >>> > > > >> means they are matched elements from both stream. >>> > > > >> When one of iter1 and iter2 is empty , it means that they are >>> > > unmatched. >>> > > > >> >>> > > > >> >>> > > > >> - Jark Wu (wuchong) >>> > > > >> >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <vinay18.pa...@gmail.com> >>> 写道: >>> > > > >> > >>> > > > >> > Hi Matthias , >>> > > > >> > >>> > > > >> > I did not get you, even if we use Co-Group we have to apply >>> it on >>> > a >>> > > > key >>> > > > >> > >>> > > > >> > sourceStream.coGroup(destStream) >>> > > > >> > .where(new ElementSelector()) >>> > > > >> > .equalTo(new ElementSelector()) >>> > > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30))) >>> > > > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() { >>> > > > >> > private static final long serialVersionUID = >>> 6408179761497497475L; >>> > > > >> > >>> > > > >> > @Override >>> > > > >> > public void coGroup(Iterable<Integer> paramIterable, >>> > > Iterable<Integer> >>> > > > >> > paramIterable1, >>> > > > >> > Collector<Integer> paramCollector) throws Exception { >>> > > > >> > Iterator<Integer> iterator = paramIterable.iterator(); >>> > > > >> > while(iterator.hasNext()) { >>> > > > >> > } >>> > > > >> > } >>> > > > >> > }); >>> > > > >> > >>> > > > >> > when I debug this ,only the matched element from both stream >>> will >>> > > come >>> > > > >> in >>> > > > >> > the coGroup function. >>> > > > >> > >>> > > > >> > What I want is how do I check for unmatched elements from both >>> > > streams >>> > > > >> and >>> > > > >> > write it to sink. >>> > > > >> > >>> > > > >> > Regards, >>> > > > >> > Vinay Patil >>> > > > >> > >>> > > > >> > *+91-800-728-4749* >>> > > > >> > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax < >>> > mj...@apache.org> >>> > > > >> wrote: >>> > > > >> > >>> > > > >> >> You need to do an outer-join. However, there is no build-in >>> > support >>> > > > for >>> > > > >> >> outer-joins yet. >>> > > > >> >> >>> > > > >> >> You can use Window-CoGroup to implement the outer-join as an >>> own >>> > > > >> operator. >>> > > > >> >> >>> > > > >> >> >>> > > > >> >> -Matthias >>> > > > >> >> >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote: >>> > > > >> >>> Hi, >>> > > > >> >>> >>> > > > >> >>> I have a question regarding the join operation, consider the >>> > > > following >>> > > > >> >>> dummy example: >>> > > > >> >>> >>> > > > >> >>> StreamExecutionEnvironment env = >>> > > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> > > > >> >>> >>> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); >>> > > > >> >>> DataStreamSource<Integer> sourceStream = >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18); >>> > > > >> >>> DataStreamSource<Integer> destStream = >>> > > > >> >> env.fromElements(20,30,40,50,60,10); >>> > > > >> >>> >>> > > > >> >>> sourceStream.join(destStream) >>> > > > >> >>> .where(new ElementSelector()) >>> > > > >> >>> .equalTo(new ElementSelector()) >>> > > > >> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) >>> > > > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() { >>> > > > >> >>> >>> > > > >> >>> private static final long serialVersionUID = 1L; >>> > > > >> >>> >>> > > > >> >>> @Override >>> > > > >> >>> public Integer join(Integer paramIN1, Integer paramIN2) >>> throws >>> > > > >> Exception >>> > > > >> >> { >>> > > > >> >>> return paramIN1; >>> > > > >> >>> } >>> > > > >> >>> }).print(); >>> > > > >> >>> >>> > > > >> >>> I perfectly get the elements that are matching in both the >>> > > streams, >>> > > > >> >> however >>> > > > >> >>> my requirement is to write these matched elements and also >>> the >>> > > > >> unmatched >>> > > > >> >>> elements to sink(S3) >>> > > > >> >>> >>> > > > >> >>> How do I get the unmatched elements from each stream ? >>> > > > >> >>> >>> > > > >> >>> Regards, >>> > > > >> >>> Vinay Patil >>> > > > >> >>> >>> > > > >> >> >>> > > > >> >> >>> > > > >> >>> > > > >> >>> > > > > >>> > > > >>> > > >>> > >>> >> >> >