Hi, I think the problem is that the DeltaFunction needs to have this signature:
DeltaFunction<CoGroupedStreams.TaggedUnion<Tuple2<String,DTO>, Tuple2<String,DTO>>> because the Trigger will see elements from both input streams which are represented as a TaggedUnion that can contain an element from either side. May I ask why you want to use the DeltaTrigger? Cheers, Aljoscha On Wed, 29 Jun 2016 at 19:06 Vinay Patil <vinay18.pa...@gmail.com> wrote: > Hi, > > Yes , now I am getting clear with the concepts here. > One last thing I want to try before going for custom trigger, I want to try > Delta Trigger but I am not able to get the syntax right , this is how I am > trying it : > > TypeInformation<Tuple2<String, DTO>> typeInfo = TypeInformation.of(new > TypeHint<Tuple2<String, DTO>>() { > }); > // source and destStream : Tuple2<String,DTO> > sourceStream.coGroup(destStream).where(new ElementSelector()).equalTo(new > ElementSelector()) > .window(TumblingTimeEventWindows.of(Time.seconds(10))) > .trigger(DeltaTrigger.of(triggerMeters, > new DeltaFunction<Tuple2<String,DTO>>() { > private static final long serialVersionUID = 1L; > > @Override > public double getDelta( > Tuple2<String,DTO> oldDataPoint, > Tuple2<String,DTO> newDataPoint) { > return <some_val>; > } > }, typeInfo.createSerializer(env.getConfig()).apply(new JoinStreams()); > > I am getting error cannot convert from DeltaTrigger to Trigger<? super > CoGroupedStreams... > What am I doing wrong here, I have referred the sample example. > > Regards, > Vinay Patil > > On Wed, Jun 29, 2016 at 7:15 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > > Hi, > > you can use ingestion time if you don't care about the timestamps in your > > events, yes. If elements from the two streams happen to arrive at such > > times that they are not put into the same window then you won't get a > > match, correct. > > > > Regarding ingestion time and out-of-order events. I think this section > just > > reiterates that when using ingestion time the inherent timestamps in your > > events will not be considered and their order will not be respected. > > > > Regarding late data: right now, Flink always processes late data and it > is > > up to the Trigger to decide what to do with late data. You can implement > > your custom trigger based on EventTimeTrigger that would immediately > purge > > a window when an element arrives that is later than an allowed amount of > > lateness. In Flink 1.1 we will introduce a setting for windows that > allows > > to specify an allowed lateness. With this, late elements will be dropped > > automatically. This feature is already available in the master, by the > way. > > > > Cheers, > > Aljoscha > > > > On Wed, 29 Jun 2016 at 14:14 Vinay Patil <vinay18.pa...@gmail.com> > wrote: > > > > > Hi, > > > > > > Ok. > > > Inside the checkAndGetNextWatermark(lastElement, extractedTimestamp) > > method > > > both these parameters are coming same (timestamp value) , I was > expecting > > > last element timestamp value in the 1st param when I extract it. > > > > > > Lets say I decide to use IngestionTime (since I am getting accurate > > results > > > here for now), if the joining element of both streams are assigned to > > > different windows , then it that case I will not get the match , right > ? > > > > > > However in case of event time this guarantees to be in the same window > > > since we are assigning the timestamp, correct me here. > > > > > > According to documentation : > > > * Ingestion Time programs cannot handle any out-of-order events or late > > > data* > > > > > > In this context What do we mean by out-of-order events How does it know > > > that the events are out of order, I mean on which parameter does it > > decide > > > that the events are out-of-order ? As in case of event time we can say > > the > > > timestamps received are out of order. > > > > > > Late Data : does it have a threshold after which it does not accept > late > > > data ? > > > > > > > > > Regards, > > > Vinay Patil > > > > > > On Wed, Jun 29, 2016 at 5:15 PM, Aljoscha Krettek <aljos...@apache.org > > > > > wrote: > > > > > > > Hi, > > > > the element will be kept around indefinitely if no new watermark > > arrives. > > > > > > > > I think the same problem will persist for > > > AssignerWithPunctuatedWatermarks > > > > since there you also might not get the required "last watermark" to > > > trigger > > > > processing of the last window. > > > > > > > > Cheers, > > > > Aljoscha > > > > > > > > On Wed, 29 Jun 2016 at 13:18 Vinay Patil <vinay18.pa...@gmail.com> > > > wrote: > > > > > > > > > Hi Aljoscha, > > > > > > > > > > This clears a lot of doubts now. > > > > > So now lets say the stream paused for a while or it stops > completely > > on > > > > > Friday , let us assume that the last message did not get processed > > and > > > is > > > > > kept in the internal buffers. > > > > > > > > > > So when the stream starts again on Monday , will it consider the > last > > > > > element that is in the internal buffer for processing ? > > > > > How much time the internal buffer can hold the data or will it > flush > > > the > > > > > data after a threshold ? > > > > > > > > > > I have tried using AssignerWithPunctuatedWatermarks and generated > the > > > > > watermark for each event, still getting one record less. > > > > > > > > > > > > > > > Regards, > > > > > Vinay Patil > > > > > > > > > > On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek < > > aljos...@apache.org > > > > > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > the reason why the last element might never be emitted is the way > > the > > > > > > ascending timestamp extractor works. I'll try and explain with an > > > > > example. > > > > > > > > > > > > Let's say we have a window size of 2 milliseconds, elements > arrive > > > > > starting > > > > > > with timestamp 0, window begin timestamp is inclusive, end > > timestamp > > > is > > > > > > exclusive: > > > > > > > > > > > > Element 0, Timestamp 0 (at this point the watermark is -1) > > > > > > Element 1, Timestamp 1 (at this point the watermark is 0) > > > > > > Element 2, Timestamp 1 (at this point the watermark is still 0) > > > > > > Element 3, Timestamp 2 (at this point the watermark is 1) > > > > > > > > > > > > now we can process the window (0, 2) because we know from the > > > watermark > > > > > > that no elements can arrive for that window anymore. The window > > > > contains > > > > > > elements 0,1,2 > > > > > > > > > > > > Element 4, Timestamp 3 (at this point the watermark is 2) > > > > > > Element 5, Timestamp 4 (at this point the watermark is 3) > > > > > > > > > > > > now we can process window (2, 4). The window contains elements > 3,4. > > > > > > > > > > > > At this point, we have Element 5 sitting in internal buffers for > > > window > > > > > (4, > > > > > > 6) but if we don't receive further elements the watermark will > > never > > > > > > advance and we will never process that window. > > > > > > > > > > > > If, however, we get new elements at some point the watermark > > advances > > > > and > > > > > > we don't have a problem. That's what I meant when I said that you > > > > > shouldn't > > > > > > have a problem if data keeps continuously arriving. > > > > > > > > > > > > Cheers, > > > > > > Aljoscha > > > > > > > > > > > > > > > > > > On Tue, 28 Jun 2016 at 17:14 Vinay Patil < > vinay18.pa...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Hi Aljoscha, > > > > > > > > > > > > > > Thanks a lot for your inputs. > > > > > > > > > > > > > > I still did not get you when you say you will not face this > issue > > > in > > > > > case > > > > > > > of continuous stream, lets consider the following example : > > > > > > > Assume that the stream runs continuously from Monday to > Friday, > > > and > > > > on > > > > > > > Friday it stops after 5.00 PM , will I still face this issue ? > > > > > > > > > > > > > > I am actually not able to understand how it will differ in real > > > time > > > > > > > streams. > > > > > > > > > > > > > > Regards, > > > > > > > Vinay Patil > > > > > > > > > > > > > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek < > > > > aljos...@apache.org > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi, > > > > > > > > ingestion time can only be used if you don't care about the > > > > timestamp > > > > > > in > > > > > > > > the elements. So if you have those you should probably use > > event > > > > > time. > > > > > > > > > > > > > > > > If your timestamps really are strictly increasing then the > > > > ascending > > > > > > > > extractor is good. And if you have a continuous stream of > > > incoming > > > > > > > elements > > > > > > > > you will not see the behavior of not getting the last > elements. > > > > > > > > > > > > > > > > By the way, when using Kafka you can also embed the timestamp > > > > > extractor > > > > > > > > directly in the Kafka consumer. This is described here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission > > > > > > > > > > > > > > > > Cheers, > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil < > > > vinay18.pa...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Aljoscha, > > > > > > > > > > > > > > > > > > Thank you for your response. > > > > > > > > > So do you suggest to use different approach for extracting > > > > > timestamp > > > > > > > (as > > > > > > > > > given in document) instead of AscendingTimeStamp Extractor > ? > > > > > > > > > Is that the reason I am seeing this unexpected behaviour ? > in > > > > case > > > > > of > > > > > > > > > continuous stream I would not see any data loss ? > > > > > > > > > > > > > > > > > > Also assuming that the records are always going to be in > > order > > > , > > > > > > which > > > > > > > is > > > > > > > > > the best approach : Ingestion Time or Event Time ? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > Vinay Patil > > > > > > > > > > > > > > > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek < > > > > > > aljos...@apache.org > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > first regarding tumbling windows: even if you have 5 > minute > > > > > windows > > > > > > > it > > > > > > > > > can > > > > > > > > > > happen that elements that are only seconds apart go into > > > > > different > > > > > > > > > windows. > > > > > > > > > > Consider the following case: > > > > > > > > > > > > > > > > > > > > | x | x | > > > > > > > > > > > > > > > > > > > > These are two 5-mintue windows and the two elements are > > only > > > > > > seconds > > > > > > > > > apart > > > > > > > > > > but go into different windows because windows are aligned > > to > > > > > epoch. > > > > > > > > > > > > > > > > > > > > Now, for the ascending timestamp extractor. The reason > this > > > can > > > > > > > behave > > > > > > > > in > > > > > > > > > > unexpected ways is that it emits a watermark that is > "last > > > > > > timestamp > > > > > > > - > > > > > > > > > 1", > > > > > > > > > > i.e. if it has seen timestamp t it can only emit > watermark > > > t-1 > > > > > > > because > > > > > > > > > > there might be other elements with timestamp t arriving. > If > > > you > > > > > > have > > > > > > > a > > > > > > > > > > continuous stream of elements you wouldn't notice this. > > Only > > > in > > > > > > this > > > > > > > > > > constructed example does it become visible. > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil < > > > > > vinay18.pa...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > Following is the timestamp I am getting from DTO, here > is > > > the > > > > > > > > timestamp > > > > > > > > > > > difference between the two records : > > > > > > > > > > > 1466115892162154279 > > > > > > > > > > > 1466116026233613409 > > > > > > > > > > > > > > > > > > > > > > So the time difference is roughly 3 min, even if I > apply > > > the > > > > > > window > > > > > > > > of > > > > > > > > > > 5min > > > > > > > > > > > , I am not getting the last record (last timestamp > value > > > > > above), > > > > > > > > > > > using ascending timestamp extractor for generating the > > > > > timestamp > > > > > > > > > > (assuming > > > > > > > > > > > that the timestamp are always in order) > > > > > > > > > > > > > > > > > > > > > > I was at-least expecting data to reach the co-group > > > function. > > > > > > > > > > > What could be the reason for the data loss ? The data > we > > > are > > > > > > > getting > > > > > > > > is > > > > > > > > > > > critical, hence we cannot afford to loose any data > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > Vinay Patil > > > > > > > > > > > > > > > > > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil < > > > > > > > > vinay18.pa...@gmail.com > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Just an update, when I keep IngestionTime and remove > > the > > > > > > > timestamp > > > > > > > > I > > > > > > > > > am > > > > > > > > > > > > generating, I am getting all the records, but for > Event > > > > Time > > > > > I > > > > > > am > > > > > > > > > > getting > > > > > > > > > > > > one less record, I checked the Time Difference > between > > > two > > > > > > > records, > > > > > > > > > it > > > > > > > > > > > is 3 > > > > > > > > > > > > min, I tried keeping the window time to 5 mins, but > > that > > > > even > > > > > > did > > > > > > > > not > > > > > > > > > > > work. > > > > > > > > > > > > > > > > > > > > > > > > Even when I try assigning timestamp for > IngestionTime, > > I > > > > get > > > > > > one > > > > > > > > > record > > > > > > > > > > > > less, so should I safely use Ingestion Time or is it > > > always > > > > > > > > advisable > > > > > > > > > > to > > > > > > > > > > > > use EventTime ? > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > Vinay Patil > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil < > > > > > > > > > vinay18.pa...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > >> Hi , > > > > > > > > > > > >> > > > > > > > > > > > >> Actually I am only publishing 5 messages each to two > > > > > different > > > > > > > > kafka > > > > > > > > > > > >> topics (using Junit), even if I keep the window to > 500 > > > > > seconds > > > > > > > the > > > > > > > > > > > result > > > > > > > > > > > >> is same. > > > > > > > > > > > >> > > > > > > > > > > > >> I am not understanding why it is not sending the 5th > > > > element > > > > > > to > > > > > > > > > > co-group > > > > > > > > > > > >> operator even when the keys are same. > > > > > > > > > > > >> > > > > > > > > > > > >> I actually cannot share the actual client code. > > > > > > > > > > > >> But this is what the streams look like : > > > > > > > > > > > >> sourceStream.coGroup(destStream) > > > > > > > > > > > >> here the sourceStream and destStream is actually > > > > > > > > Tuple2<String,DTO> > > > > > > > > > , > > > > > > > > > > > and > > > > > > > > > > > >> the ElementSelector returns tuple.f0 which is the > key. > > > > > > > > > > > >> > > > > > > > > > > > >> I am generating the timestamp based on a field from > > the > > > > DTO > > > > > > > which > > > > > > > > is > > > > > > > > > > > >> guaranteed to be in order. > > > > > > > > > > > >> > > > > > > > > > > > >> Will using the triggers help here ? > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> Regards, > > > > > > > > > > > >> Vinay Patil > > > > > > > > > > > >> > > > > > > > > > > > >> *+91-800-728-4749* > > > > > > > > > > > >> > > > > > > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek < > > > > > > > > > > aljos...@apache.org> > > > > > > > > > > > >> wrote: > > > > > > > > > > > >> > > > > > > > > > > > >>> Hi, > > > > > > > > > > > >>> what timestamps are you assigning? Is it guaranteed > > > that > > > > > all > > > > > > of > > > > > > > > > them > > > > > > > > > > > >>> would > > > > > > > > > > > >>> fall into the same 30 second window? > > > > > > > > > > > >>> > > > > > > > > > > > >>> The issue with duplicate printing in the > > > ElementSelector > > > > is > > > > > > > > > strange? > > > > > > > > > > > >>> Could > > > > > > > > > > > >>> you post a more complete code example so that I can > > > > > reproduce > > > > > > > the > > > > > > > > > > > >>> problem? > > > > > > > > > > > >>> > > > > > > > > > > > >>> Cheers, > > > > > > > > > > > >>> Aljoscha > > > > > > > > > > > >>> > > > > > > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil < > > > > > > > > vinay18.pa...@gmail.com> > > > > > > > > > > > >>> wrote: > > > > > > > > > > > >>> > > > > > > > > > > > >>> > Hi , > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > I am able to get the matching and non-matching > > > > elements. > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > However when I am unit testing the code , I am > > > getting > > > > > one > > > > > > > > record > > > > > > > > > > > less > > > > > > > > > > > >>> > inside the overriden cogroup function. > > > > > > > > > > > >>> > Testing the following way : > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > 1) Insert 5 messages into local kafka topic > (test1) > > > > > > > > > > > >>> > 2) Insert different 5 messages into local kafka > > topic > > > > > > (test2) > > > > > > > > > > > >>> > 3) Consume 1) and 2) and I have two different > kafka > > > > > > streams > > > > > > > > > > > >>> > 4) Generate ascending timestamp(using Event Time) > > for > > > > > both > > > > > > > > > streams > > > > > > > > > > > and > > > > > > > > > > > >>> > create key(String) > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > Now till 4) I am able to get all the records > > (checked > > > > by > > > > > > > > printing > > > > > > > > > > the > > > > > > > > > > > >>> > stream in text file) > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > However when I send the stream to co-group > > operator, > > > I > > > > am > > > > > > > > > receiving > > > > > > > > > > > one > > > > > > > > > > > >>> > less record, using the following syntax: > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > sourceStream.coGroup(destStream) > > > > > > > > > > > >>> > .where(new ElementSelector()) > > > > > > > > > > > >>> > .equalTo(new ElementSelector()) > > > > > > > > > > > >>> > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > > > > > > > > > > > >>> > .apply(new JoinStreams); > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > Also in the Element Selector I have inserted a > > > sysout, > > > > I > > > > > am > > > > > > > > > getting > > > > > > > > > > > 20 > > > > > > > > > > > >>> > sysouts instead of 10 (10 sysouts for source and > 10 > > > for > > > > > > dest > > > > > > > > > > stream) > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > Unable to understand why one record is coming > less > > to > > > > > > > co-group > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > Regards, > > > > > > > > > > > >>> > Vinay Patil > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske < > > > > > > > > > fhue...@gmail.com> > > > > > > > > > > > >>> wrote: > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > Can you add a flag to each element emitted by > the > > > > > > > > > CoGroupFunction > > > > > > > > > > > >>> that > > > > > > > > > > > >>> > > indicates whether it was joined or not? > > > > > > > > > > > >>> > > Then you can use split to distinguish between > > both > > > > > cases > > > > > > > and > > > > > > > > > > handle > > > > > > > > > > > >>> both > > > > > > > > > > > >>> > > streams differently. > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > Best, Fabian > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil < > > > > > > > > vinay18.pa...@gmail.com > > > > > > > > > >: > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > Hi Jark, > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > I am able to get the non-matching elements > in a > > > > > stream > > > > > > :, > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > Of-course we can collect the matching > elements > > in > > > > the > > > > > > > same > > > > > > > > > > stream > > > > > > > > > > > >>> as > > > > > > > > > > > >>> > > well, > > > > > > > > > > > >>> > > > however I want to perform additional > operations > > > on > > > > > the > > > > > > > > joined > > > > > > > > > > > >>> stream > > > > > > > > > > > >>> > > before > > > > > > > > > > > >>> > > > writing it to S3, so I would have to include > a > > > > > separate > > > > > > > > join > > > > > > > > > > > >>> operator > > > > > > > > > > > >>> > for > > > > > > > > > > > >>> > > > the same two streams, right ? > > > > > > > > > > > >>> > > > Correct me if I am wrong. > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > I have pasted the dummy code which collects > the > > > > > > > > non-matching > > > > > > > > > > > >>> records (i > > > > > > > > > > > >>> > > > have to perform this on the actual data, > > correct > > > me > > > > > if > > > > > > I > > > > > > > am > > > > > > > > > > dong > > > > > > > > > > > >>> > wrong). > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > sourceStream.coGroup(destStream).where(new > > > > > > > > > > > >>> > ElementSelector()).equalTo(new > > > > > > > > > > > >>> > > > ElementSelector()) > > > > > > > > > > > >>> > > > > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > > > > > > > > > > > >>> > > > .apply(new CoGroupFunction<Integer, Integer, > > > > > > Integer>() { > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > private static final long serialVersionUID = > > > > > > > > > > > 6408179761497497475L; > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > @Override > > > > > > > > > > > >>> > > > public void coGroup(Iterable<Integer> > > > > paramIterable, > > > > > > > > > > > >>> Iterable<Integer> > > > > > > > > > > > >>> > > > paramIterable1, > > > > > > > > > > > >>> > > > Collector<Integer> paramCollector) throws > > > > Exception { > > > > > > > > > > > >>> > > > long exactSizeIfKnown = > > > > > > > > > > > >>> > > > > paramIterable.spliterator().getExactSizeIfKnown(); > > > > > > > > > > > >>> > > > long exactSizeIfKnown2 = > > > > > > > > > > > >>> > > > > > > paramIterable1.spliterator().getExactSizeIfKnown(); > > > > > > > > > > > >>> > > > if(exactSizeIfKnown == 0 ) { > > > > > > > > > > > >>> > > > > > > > > > paramCollector.collect(paramIterable1.iterator().next()); > > > > > > > > > > > >>> > > > } else if (exactSizeIfKnown2 == 0) { > > > > > > > > > > > >>> > > > > > > > > > paramCollector.collect(paramIterable.iterator().next()); > > > > > > > > > > > >>> > > > } > > > > > > > > > > > >>> > > > } > > > > > > > > > > > >>> > > > }).print(); > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > Regards, > > > > > > > > > > > >>> > > > Vinay Patil > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil > < > > > > > > > > > > > >>> vinay18.pa...@gmail.com> > > > > > > > > > > > >>> > > > wrote: > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > > You are right, debugged it for all elements > > , I > > > > can > > > > > > do > > > > > > > > that > > > > > > > > > > > now. > > > > > > > > > > > >>> > > > > Thanks a lot. > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > Regards, > > > > > > > > > > > >>> > > > > Vinay Patil > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu < > > > > > > > > > > > >>> > wuchong...@alibaba-inc.com> > > > > > > > > > > > >>> > > > > wrote: > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1, > > > > > > Iterable<Integer> > > > > > > > > > > iter2, > > > > > > > > > > > >>> > > > >> Collector<Integer> out)` , when both > iter1 > > > and > > > > > > iter2 > > > > > > > > are > > > > > > > > > > not > > > > > > > > > > > >>> > empty, > > > > > > > > > > > >>> > > it > > > > > > > > > > > >>> > > > >> means they are matched elements from both > > > > stream. > > > > > > > > > > > >>> > > > >> When one of iter1 and iter2 is empty , it > > > means > > > > > that > > > > > > > > they > > > > > > > > > > are > > > > > > > > > > > >>> > > unmatched. > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > >> - Jark Wu (wuchong) > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil < > > > > > > > > > vinay18.pa...@gmail.com > > > > > > > > > > > > > > > > > > > > > > >>> 写道: > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > >>> > > > >> > Hi Matthias , > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > >>> > > > >> > I did not get you, even if we use > Co-Group > > > we > > > > > have > > > > > > > to > > > > > > > > > > apply > > > > > > > > > > > >>> it on > > > > > > > > > > > >>> > a > > > > > > > > > > > >>> > > > key > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > >>> > > > >> > sourceStream.coGroup(destStream) > > > > > > > > > > > >>> > > > >> > .where(new ElementSelector()) > > > > > > > > > > > >>> > > > >> > .equalTo(new ElementSelector()) > > > > > > > > > > > >>> > > > >> > > > > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > > > > > > > > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer, > > Integer, > > > > > > > > Integer>() > > > > > > > > > { > > > > > > > > > > > >>> > > > >> > private static final long > > serialVersionUID = > > > > > > > > > > > >>> 6408179761497497475L; > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > >>> > > > >> > @Override > > > > > > > > > > > >>> > > > >> > public void coGroup(Iterable<Integer> > > > > > > paramIterable, > > > > > > > > > > > >>> > > Iterable<Integer> > > > > > > > > > > > >>> > > > >> > paramIterable1, > > > > > > > > > > > >>> > > > >> > Collector<Integer> paramCollector) > throws > > > > > > Exception > > > > > > > { > > > > > > > > > > > >>> > > > >> > Iterator<Integer> iterator = > > > > > > > paramIterable.iterator(); > > > > > > > > > > > >>> > > > >> > while(iterator.hasNext()) { > > > > > > > > > > > >>> > > > >> > } > > > > > > > > > > > >>> > > > >> > } > > > > > > > > > > > >>> > > > >> > }); > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > >>> > > > >> > when I debug this ,only the matched > > element > > > > from > > > > > > > both > > > > > > > > > > stream > > > > > > > > > > > >>> will > > > > > > > > > > > >>> > > come > > > > > > > > > > > >>> > > > >> in > > > > > > > > > > > >>> > > > >> > the coGroup function. > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > >>> > > > >> > What I want is how do I check for > > unmatched > > > > > > elements > > > > > > > > > from > > > > > > > > > > > both > > > > > > > > > > > >>> > > streams > > > > > > > > > > > >>> > > > >> and > > > > > > > > > > > >>> > > > >> > write it to sink. > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > >>> > > > >> > Regards, > > > > > > > > > > > >>> > > > >> > Vinay Patil > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > >>> > > > >> > *+91-800-728-4749* > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, > Matthias > > J. > > > > > Sax < > > > > > > > > > > > >>> > mj...@apache.org> > > > > > > > > > > > >>> > > > >> wrote: > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > >>> > > > >> >> You need to do an outer-join. However, > > > there > > > > is > > > > > > no > > > > > > > > > > build-in > > > > > > > > > > > >>> > support > > > > > > > > > > > >>> > > > for > > > > > > > > > > > >>> > > > >> >> outer-joins yet. > > > > > > > > > > > >>> > > > >> >> > > > > > > > > > > > >>> > > > >> >> You can use Window-CoGroup to implement > > the > > > > > > > > outer-join > > > > > > > > > as > > > > > > > > > > > an > > > > > > > > > > > >>> own > > > > > > > > > > > >>> > > > >> operator. > > > > > > > > > > > >>> > > > >> >> > > > > > > > > > > > >>> > > > >> >> > > > > > > > > > > > >>> > > > >> >> -Matthias > > > > > > > > > > > >>> > > > >> >> > > > > > > > > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil > > wrote: > > > > > > > > > > > >>> > > > >> >>> Hi, > > > > > > > > > > > >>> > > > >> >>> > > > > > > > > > > > >>> > > > >> >>> I have a question regarding the join > > > > > operation, > > > > > > > > > consider > > > > > > > > > > > the > > > > > > > > > > > >>> > > > following > > > > > > > > > > > >>> > > > >> >>> dummy example: > > > > > > > > > > > >>> > > > >> >>> > > > > > > > > > > > >>> > > > >> >>> StreamExecutionEnvironment env = > > > > > > > > > > > >>> > > > >> >>> > > > > > > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > > > > > > > >>> > > > >> >>> > > > > > > > > > > > >>> > > > > > > > > > > > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer> > sourceStream = > > > > > > > > > > > >>> > > > >> >>> > > > env.fromElements(10,20,23,25,30,33,102,18); > > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer> destStream = > > > > > > > > > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10); > > > > > > > > > > > >>> > > > >> >>> > > > > > > > > > > > >>> > > > >> >>> sourceStream.join(destStream) > > > > > > > > > > > >>> > > > >> >>> .where(new ElementSelector()) > > > > > > > > > > > >>> > > > >> >>> .equalTo(new ElementSelector()) > > > > > > > > > > > >>> > > > >> >>> > > > > > > > > > > > > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) > > > > > > > > > > > >>> > > > >> >>> .apply(new JoinFunction<Integer, > > Integer, > > > > > > > > Integer>() { > > > > > > > > > > > >>> > > > >> >>> > > > > > > > > > > > >>> > > > >> >>> private static final long > > > serialVersionUID = > > > > > 1L; > > > > > > > > > > > >>> > > > >> >>> > > > > > > > > > > > >>> > > > >> >>> @Override > > > > > > > > > > > >>> > > > >> >>> public Integer join(Integer paramIN1, > > > > Integer > > > > > > > > > paramIN2) > > > > > > > > > > > >>> throws > > > > > > > > > > > >>> > > > >> Exception > > > > > > > > > > > >>> > > > >> >> { > > > > > > > > > > > >>> > > > >> >>> return paramIN1; > > > > > > > > > > > >>> > > > >> >>> } > > > > > > > > > > > >>> > > > >> >>> }).print(); > > > > > > > > > > > >>> > > > >> >>> > > > > > > > > > > > >>> > > > >> >>> I perfectly get the elements that are > > > > matching > > > > > > in > > > > > > > > both > > > > > > > > > > the > > > > > > > > > > > >>> > > streams, > > > > > > > > > > > >>> > > > >> >> however > > > > > > > > > > > >>> > > > >> >>> my requirement is to write these > matched > > > > > > elements > > > > > > > > and > > > > > > > > > > also > > > > > > > > > > > >>> the > > > > > > > > > > > >>> > > > >> unmatched > > > > > > > > > > > >>> > > > >> >>> elements to sink(S3) > > > > > > > > > > > >>> > > > >> >>> > > > > > > > > > > > >>> > > > >> >>> How do I get the unmatched elements > from > > > > each > > > > > > > > stream ? > > > > > > > > > > > >>> > > > >> >>> > > > > > > > > > > > >>> > > > >> >>> Regards, > > > > > > > > > > > >>> > > > >> >>> Vinay Patil > > > > > > > > > > > >>> > > > >> >>> > > > > > > > > > > > >>> > > > >> >> > > > > > > > > > > > >>> > > > >> >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >