Hi, yes, the window operator is stateful, which means that it will pick up where it left in case of a failure and restore.
You're right about the graph, chained operators are shown as one box. Cheers, Aljoscha On Fri, 1 Jul 2016 at 04:52 Vinay Patil <vinay18.pa...@gmail.com> wrote: > Hi, > > Just watched the video on Robust Stream Processing . > So when we say Window is a stateful operator , does it mean that even if > the task manager doing the window operation fails, will it pick up from > the state left earlier when it comes up ? (Have not read more on state for > now) > > > Also in one of our project when we deploy on cluster and check the Job > Graph , everything is shown in one box , why this happens ? Is it because > of chaining of streams ? > So the box here represent the function flow, right ? > > > > Regards, > Vinay Patil > > On Thu, Jun 30, 2016 at 7:29 PM, Vinay Patil <vinay18.pa...@gmail.com> > wrote: > > > Hi Aljoscha, > > > > Just wanted to check if it works with it. > > Anyways to solve the problem what we have thought of is to push heartbeat > > message to Kafka after certain interval, so that we get continuous stream > > always and that edge case will never occur, right ? > > > > One more question I have regarding the failover case : > > Lets say I have a window of 10 secs , and in that there are e0 to en > > elements , what if during this time node goes down ? > > When the node comes up will it resume from the same state or will it > > resume from the last checkpointed state ? > > > > Can we explicitly checkpoint inside the window , may be at the start of > > the window or before we are applying window ? > > > > > > Regards, > > Vinay Patil > > > > On Thu, Jun 30, 2016 at 2:11 PM, Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > >> Hi, > >> I think the problem is that the DeltaFunction needs to have this > >> signature: > >> > >> DeltaFunction<CoGroupedStreams.TaggedUnion<Tuple2<String,DTO>, > >> Tuple2<String,DTO>>> > >> > >> because the Trigger will see elements from both input streams which are > >> represented as a TaggedUnion that can contain an element from either > side. > >> > >> May I ask why you want to use the DeltaTrigger? > >> > >> Cheers, > >> Aljoscha > >> > >> On Wed, 29 Jun 2016 at 19:06 Vinay Patil <vinay18.pa...@gmail.com> > wrote: > >> > >> > Hi, > >> > > >> > Yes , now I am getting clear with the concepts here. > >> > One last thing I want to try before going for custom trigger, I want > to > >> try > >> > Delta Trigger but I am not able to get the syntax right , this is how > I > >> am > >> > trying it : > >> > > >> > TypeInformation<Tuple2<String, DTO>> typeInfo = TypeInformation.of(new > >> > TypeHint<Tuple2<String, DTO>>() { > >> > }); > >> > // source and destStream : Tuple2<String,DTO> > >> > sourceStream.coGroup(destStream).where(new > >> ElementSelector()).equalTo(new > >> > ElementSelector()) > >> > .window(TumblingTimeEventWindows.of(Time.seconds(10))) > >> > .trigger(DeltaTrigger.of(triggerMeters, > >> > new DeltaFunction<Tuple2<String,DTO>>() { > >> > private static final long serialVersionUID = 1L; > >> > > >> > @Override > >> > public double getDelta( > >> > Tuple2<String,DTO> oldDataPoint, > >> > Tuple2<String,DTO> newDataPoint) { > >> > return <some_val>; > >> > } > >> > }, typeInfo.createSerializer(env.getConfig()).apply(new > JoinStreams()); > >> > > >> > I am getting error cannot convert from DeltaTrigger to Trigger<? super > >> > CoGroupedStreams... > >> > What am I doing wrong here, I have referred the sample example. > >> > > >> > Regards, > >> > Vinay Patil > >> > > >> > On Wed, Jun 29, 2016 at 7:15 PM, Aljoscha Krettek < > aljos...@apache.org> > >> > wrote: > >> > > >> > > Hi, > >> > > you can use ingestion time if you don't care about the timestamps in > >> your > >> > > events, yes. If elements from the two streams happen to arrive at > such > >> > > times that they are not put into the same window then you won't get > a > >> > > match, correct. > >> > > > >> > > Regarding ingestion time and out-of-order events. I think this > section > >> > just > >> > > reiterates that when using ingestion time the inherent timestamps in > >> your > >> > > events will not be considered and their order will not be respected. > >> > > > >> > > Regarding late data: right now, Flink always processes late data and > >> it > >> > is > >> > > up to the Trigger to decide what to do with late data. You can > >> implement > >> > > your custom trigger based on EventTimeTrigger that would immediately > >> > purge > >> > > a window when an element arrives that is later than an allowed > amount > >> of > >> > > lateness. In Flink 1.1 we will introduce a setting for windows that > >> > allows > >> > > to specify an allowed lateness. With this, late elements will be > >> dropped > >> > > automatically. This feature is already available in the master, by > the > >> > way. > >> > > > >> > > Cheers, > >> > > Aljoscha > >> > > > >> > > On Wed, 29 Jun 2016 at 14:14 Vinay Patil <vinay18.pa...@gmail.com> > >> > wrote: > >> > > > >> > > > Hi, > >> > > > > >> > > > Ok. > >> > > > Inside the checkAndGetNextWatermark(lastElement, > extractedTimestamp) > >> > > method > >> > > > both these parameters are coming same (timestamp value) , I was > >> > expecting > >> > > > last element timestamp value in the 1st param when I extract it. > >> > > > > >> > > > Lets say I decide to use IngestionTime (since I am getting > accurate > >> > > results > >> > > > here for now), if the joining element of both streams are assigned > >> to > >> > > > different windows , then it that case I will not get the match , > >> right > >> > ? > >> > > > > >> > > > However in case of event time this guarantees to be in the same > >> window > >> > > > since we are assigning the timestamp, correct me here. > >> > > > > >> > > > According to documentation : > >> > > > * Ingestion Time programs cannot handle any out-of-order events or > >> late > >> > > > data* > >> > > > > >> > > > In this context What do we mean by out-of-order events How does it > >> know > >> > > > that the events are out of order, I mean on which parameter does > it > >> > > decide > >> > > > that the events are out-of-order ? As in case of event time we > can > >> say > >> > > the > >> > > > timestamps received are out of order. > >> > > > > >> > > > Late Data : does it have a threshold after which it does not > accept > >> > late > >> > > > data ? > >> > > > > >> > > > > >> > > > Regards, > >> > > > Vinay Patil > >> > > > > >> > > > On Wed, Jun 29, 2016 at 5:15 PM, Aljoscha Krettek < > >> aljos...@apache.org > >> > > > >> > > > wrote: > >> > > > > >> > > > > Hi, > >> > > > > the element will be kept around indefinitely if no new watermark > >> > > arrives. > >> > > > > > >> > > > > I think the same problem will persist for > >> > > > AssignerWithPunctuatedWatermarks > >> > > > > since there you also might not get the required "last watermark" > >> to > >> > > > trigger > >> > > > > processing of the last window. > >> > > > > > >> > > > > Cheers, > >> > > > > Aljoscha > >> > > > > > >> > > > > On Wed, 29 Jun 2016 at 13:18 Vinay Patil < > vinay18.pa...@gmail.com > >> > > >> > > > wrote: > >> > > > > > >> > > > > > Hi Aljoscha, > >> > > > > > > >> > > > > > This clears a lot of doubts now. > >> > > > > > So now lets say the stream paused for a while or it stops > >> > completely > >> > > on > >> > > > > > Friday , let us assume that the last message did not get > >> processed > >> > > and > >> > > > is > >> > > > > > kept in the internal buffers. > >> > > > > > > >> > > > > > So when the stream starts again on Monday , will it consider > the > >> > last > >> > > > > > element that is in the internal buffer for processing ? > >> > > > > > How much time the internal buffer can hold the data or will > it > >> > flush > >> > > > the > >> > > > > > data after a threshold ? > >> > > > > > > >> > > > > > I have tried using AssignerWithPunctuatedWatermarks and > >> generated > >> > the > >> > > > > > watermark for each event, still getting one record less. > >> > > > > > > >> > > > > > > >> > > > > > Regards, > >> > > > > > Vinay Patil > >> > > > > > > >> > > > > > On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek < > >> > > aljos...@apache.org > >> > > > > > >> > > > > > wrote: > >> > > > > > > >> > > > > > > Hi, > >> > > > > > > the reason why the last element might never be emitted is > the > >> way > >> > > the > >> > > > > > > ascending timestamp extractor works. I'll try and explain > >> with an > >> > > > > > example. > >> > > > > > > > >> > > > > > > Let's say we have a window size of 2 milliseconds, elements > >> > arrive > >> > > > > > starting > >> > > > > > > with timestamp 0, window begin timestamp is inclusive, end > >> > > timestamp > >> > > > is > >> > > > > > > exclusive: > >> > > > > > > > >> > > > > > > Element 0, Timestamp 0 (at this point the watermark is -1) > >> > > > > > > Element 1, Timestamp 1 (at this point the watermark is 0) > >> > > > > > > Element 2, Timestamp 1 (at this point the watermark is still > >> 0) > >> > > > > > > Element 3, Timestamp 2 (at this point the watermark is 1) > >> > > > > > > > >> > > > > > > now we can process the window (0, 2) because we know from > the > >> > > > watermark > >> > > > > > > that no elements can arrive for that window anymore. The > >> window > >> > > > > contains > >> > > > > > > elements 0,1,2 > >> > > > > > > > >> > > > > > > Element 4, Timestamp 3 (at this point the watermark is 2) > >> > > > > > > Element 5, Timestamp 4 (at this point the watermark is 3) > >> > > > > > > > >> > > > > > > now we can process window (2, 4). The window contains > elements > >> > 3,4. > >> > > > > > > > >> > > > > > > At this point, we have Element 5 sitting in internal buffers > >> for > >> > > > window > >> > > > > > (4, > >> > > > > > > 6) but if we don't receive further elements the watermark > will > >> > > never > >> > > > > > > advance and we will never process that window. > >> > > > > > > > >> > > > > > > If, however, we get new elements at some point the watermark > >> > > advances > >> > > > > and > >> > > > > > > we don't have a problem. That's what I meant when I said > that > >> you > >> > > > > > shouldn't > >> > > > > > > have a problem if data keeps continuously arriving. > >> > > > > > > > >> > > > > > > Cheers, > >> > > > > > > Aljoscha > >> > > > > > > > >> > > > > > > > >> > > > > > > On Tue, 28 Jun 2016 at 17:14 Vinay Patil < > >> > vinay18.pa...@gmail.com> > >> > > > > > wrote: > >> > > > > > > > >> > > > > > > > Hi Aljoscha, > >> > > > > > > > > >> > > > > > > > Thanks a lot for your inputs. > >> > > > > > > > > >> > > > > > > > I still did not get you when you say you will not face > this > >> > issue > >> > > > in > >> > > > > > case > >> > > > > > > > of continuous stream, lets consider the following example > : > >> > > > > > > > Assume that the stream runs continuously from Monday to > >> > Friday, > >> > > > and > >> > > > > on > >> > > > > > > > Friday it stops after 5.00 PM , will I still face this > >> issue ? > >> > > > > > > > > >> > > > > > > > I am actually not able to understand how it will differ in > >> real > >> > > > time > >> > > > > > > > streams. > >> > > > > > > > > >> > > > > > > > Regards, > >> > > > > > > > Vinay Patil > >> > > > > > > > > >> > > > > > > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek < > >> > > > > aljos...@apache.org > >> > > > > > > > >> > > > > > > > wrote: > >> > > > > > > > > >> > > > > > > > > Hi, > >> > > > > > > > > ingestion time can only be used if you don't care about > >> the > >> > > > > timestamp > >> > > > > > > in > >> > > > > > > > > the elements. So if you have those you should probably > use > >> > > event > >> > > > > > time. > >> > > > > > > > > > >> > > > > > > > > If your timestamps really are strictly increasing then > the > >> > > > > ascending > >> > > > > > > > > extractor is good. And if you have a continuous stream > of > >> > > > incoming > >> > > > > > > > elements > >> > > > > > > > > you will not see the behavior of not getting the last > >> > elements. > >> > > > > > > > > > >> > > > > > > > > By the way, when using Kafka you can also embed the > >> timestamp > >> > > > > > extractor > >> > > > > > > > > directly in the Kafka consumer. This is described here: > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission > >> > > > > > > > > > >> > > > > > > > > Cheers, > >> > > > > > > > > Aljoscha > >> > > > > > > > > > >> > > > > > > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil < > >> > > > vinay18.pa...@gmail.com> > >> > > > > > > > wrote: > >> > > > > > > > > > >> > > > > > > > > > Hi Aljoscha, > >> > > > > > > > > > > >> > > > > > > > > > Thank you for your response. > >> > > > > > > > > > So do you suggest to use different approach for > >> extracting > >> > > > > > timestamp > >> > > > > > > > (as > >> > > > > > > > > > given in document) instead of AscendingTimeStamp > >> Extractor > >> > ? > >> > > > > > > > > > Is that the reason I am seeing this unexpected > >> behaviour ? > >> > in > >> > > > > case > >> > > > > > of > >> > > > > > > > > > continuous stream I would not see any data loss ? > >> > > > > > > > > > > >> > > > > > > > > > Also assuming that the records are always going to be > in > >> > > order > >> > > > , > >> > > > > > > which > >> > > > > > > > is > >> > > > > > > > > > the best approach : Ingestion Time or Event Time ? > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > Regards, > >> > > > > > > > > > Vinay Patil > >> > > > > > > > > > > >> > > > > > > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek < > >> > > > > > > aljos...@apache.org > >> > > > > > > > > > >> > > > > > > > > > wrote: > >> > > > > > > > > > > >> > > > > > > > > > > Hi, > >> > > > > > > > > > > first regarding tumbling windows: even if you have 5 > >> > minute > >> > > > > > windows > >> > > > > > > > it > >> > > > > > > > > > can > >> > > > > > > > > > > happen that elements that are only seconds apart go > >> into > >> > > > > > different > >> > > > > > > > > > windows. > >> > > > > > > > > > > Consider the following case: > >> > > > > > > > > > > > >> > > > > > > > > > > | x | x | > >> > > > > > > > > > > > >> > > > > > > > > > > These are two 5-mintue windows and the two elements > >> are > >> > > only > >> > > > > > > seconds > >> > > > > > > > > > apart > >> > > > > > > > > > > but go into different windows because windows are > >> aligned > >> > > to > >> > > > > > epoch. > >> > > > > > > > > > > > >> > > > > > > > > > > Now, for the ascending timestamp extractor. The > reason > >> > this > >> > > > can > >> > > > > > > > behave > >> > > > > > > > > in > >> > > > > > > > > > > unexpected ways is that it emits a watermark that is > >> > "last > >> > > > > > > timestamp > >> > > > > > > > - > >> > > > > > > > > > 1", > >> > > > > > > > > > > i.e. if it has seen timestamp t it can only emit > >> > watermark > >> > > > t-1 > >> > > > > > > > because > >> > > > > > > > > > > there might be other elements with timestamp t > >> arriving. > >> > If > >> > > > you > >> > > > > > > have > >> > > > > > > > a > >> > > > > > > > > > > continuous stream of elements you wouldn't notice > >> this. > >> > > Only > >> > > > in > >> > > > > > > this > >> > > > > > > > > > > constructed example does it become visible. > >> > > > > > > > > > > > >> > > > > > > > > > > Cheers, > >> > > > > > > > > > > Aljoscha > >> > > > > > > > > > > > >> > > > > > > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil < > >> > > > > > vinay18.pa...@gmail.com> > >> > > > > > > > > > wrote: > >> > > > > > > > > > > > >> > > > > > > > > > > > Hi, > >> > > > > > > > > > > > > >> > > > > > > > > > > > Following is the timestamp I am getting from DTO, > >> here > >> > is > >> > > > the > >> > > > > > > > > timestamp > >> > > > > > > > > > > > difference between the two records : > >> > > > > > > > > > > > 1466115892162154279 > >> > > > > > > > > > > > 1466116026233613409 > >> > > > > > > > > > > > > >> > > > > > > > > > > > So the time difference is roughly 3 min, even if I > >> > apply > >> > > > the > >> > > > > > > window > >> > > > > > > > > of > >> > > > > > > > > > > 5min > >> > > > > > > > > > > > , I am not getting the last record (last timestamp > >> > value > >> > > > > > above), > >> > > > > > > > > > > > using ascending timestamp extractor for generating > >> the > >> > > > > > timestamp > >> > > > > > > > > > > (assuming > >> > > > > > > > > > > > that the timestamp are always in order) > >> > > > > > > > > > > > > >> > > > > > > > > > > > I was at-least expecting data to reach the > co-group > >> > > > function. > >> > > > > > > > > > > > What could be the reason for the data loss ? The > >> data > >> > we > >> > > > are > >> > > > > > > > getting > >> > > > > > > > > is > >> > > > > > > > > > > > critical, hence we cannot afford to loose any data > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > Regards, > >> > > > > > > > > > > > Vinay Patil > >> > > > > > > > > > > > > >> > > > > > > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil < > >> > > > > > > > > vinay18.pa...@gmail.com > >> > > > > > > > > > > > >> > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > >> > > > > > > > > > > > > Just an update, when I keep IngestionTime and > >> remove > >> > > the > >> > > > > > > > timestamp > >> > > > > > > > > I > >> > > > > > > > > > am > >> > > > > > > > > > > > > generating, I am getting all the records, but > for > >> > Event > >> > > > > Time > >> > > > > > I > >> > > > > > > am > >> > > > > > > > > > > getting > >> > > > > > > > > > > > > one less record, I checked the Time Difference > >> > between > >> > > > two > >> > > > > > > > records, > >> > > > > > > > > > it > >> > > > > > > > > > > > is 3 > >> > > > > > > > > > > > > min, I tried keeping the window time to 5 mins, > >> but > >> > > that > >> > > > > even > >> > > > > > > did > >> > > > > > > > > not > >> > > > > > > > > > > > work. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Even when I try assigning timestamp for > >> > IngestionTime, > >> > > I > >> > > > > get > >> > > > > > > one > >> > > > > > > > > > record > >> > > > > > > > > > > > > less, so should I safely use Ingestion Time or > is > >> it > >> > > > always > >> > > > > > > > > advisable > >> > > > > > > > > > > to > >> > > > > > > > > > > > > use EventTime ? > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Regards, > >> > > > > > > > > > > > > Vinay Patil > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil < > >> > > > > > > > > > vinay18.pa...@gmail.com> > >> > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > >> > > > > > > > > > > > >> Hi , > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > >> Actually I am only publishing 5 messages each > to > >> two > >> > > > > > different > >> > > > > > > > > kafka > >> > > > > > > > > > > > >> topics (using Junit), even if I keep the window > >> to > >> > 500 > >> > > > > > seconds > >> > > > > > > > the > >> > > > > > > > > > > > result > >> > > > > > > > > > > > >> is same. > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > >> I am not understanding why it is not sending > the > >> 5th > >> > > > > element > >> > > > > > > to > >> > > > > > > > > > > co-group > >> > > > > > > > > > > > >> operator even when the keys are same. > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > >> I actually cannot share the actual client code. > >> > > > > > > > > > > > >> But this is what the streams look like : > >> > > > > > > > > > > > >> sourceStream.coGroup(destStream) > >> > > > > > > > > > > > >> here the sourceStream and destStream is > actually > >> > > > > > > > > Tuple2<String,DTO> > >> > > > > > > > > > , > >> > > > > > > > > > > > and > >> > > > > > > > > > > > >> the ElementSelector returns tuple.f0 which is > the > >> > key. > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > >> I am generating the timestamp based on a field > >> from > >> > > the > >> > > > > DTO > >> > > > > > > > which > >> > > > > > > > > is > >> > > > > > > > > > > > >> guaranteed to be in order. > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > >> Will using the triggers help here ? > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > >> Regards, > >> > > > > > > > > > > > >> Vinay Patil > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > >> *+91-800-728-4749* > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha > >> Krettek < > >> > > > > > > > > > > aljos...@apache.org> > >> > > > > > > > > > > > >> wrote: > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > >>> Hi, > >> > > > > > > > > > > > >>> what timestamps are you assigning? Is it > >> guaranteed > >> > > > that > >> > > > > > all > >> > > > > > > of > >> > > > > > > > > > them > >> > > > > > > > > > > > >>> would > >> > > > > > > > > > > > >>> fall into the same 30 second window? > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> The issue with duplicate printing in the > >> > > > ElementSelector > >> > > > > is > >> > > > > > > > > > strange? > >> > > > > > > > > > > > >>> Could > >> > > > > > > > > > > > >>> you post a more complete code example so that > I > >> can > >> > > > > > reproduce > >> > > > > > > > the > >> > > > > > > > > > > > >>> problem? > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> Cheers, > >> > > > > > > > > > > > >>> Aljoscha > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil < > >> > > > > > > > > vinay18.pa...@gmail.com> > >> > > > > > > > > > > > >>> wrote: > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > Hi , > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > I am able to get the matching and > non-matching > >> > > > > elements. > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > However when I am unit testing the code , I > am > >> > > > getting > >> > > > > > one > >> > > > > > > > > record > >> > > > > > > > > > > > less > >> > > > > > > > > > > > >>> > inside the overriden cogroup function. > >> > > > > > > > > > > > >>> > Testing the following way : > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > 1) Insert 5 messages into local kafka topic > >> > (test1) > >> > > > > > > > > > > > >>> > 2) Insert different 5 messages into local > >> kafka > >> > > topic > >> > > > > > > (test2) > >> > > > > > > > > > > > >>> > 3) Consume 1) and 2) and I have two > different > >> > kafka > >> > > > > > > streams > >> > > > > > > > > > > > >>> > 4) Generate ascending timestamp(using Event > >> Time) > >> > > for > >> > > > > > both > >> > > > > > > > > > streams > >> > > > > > > > > > > > and > >> > > > > > > > > > > > >>> > create key(String) > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > Now till 4) I am able to get all the records > >> > > (checked > >> > > > > by > >> > > > > > > > > printing > >> > > > > > > > > > > the > >> > > > > > > > > > > > >>> > stream in text file) > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > However when I send the stream to co-group > >> > > operator, > >> > > > I > >> > > > > am > >> > > > > > > > > > receiving > >> > > > > > > > > > > > one > >> > > > > > > > > > > > >>> > less record, using the following syntax: > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > sourceStream.coGroup(destStream) > >> > > > > > > > > > > > >>> > .where(new ElementSelector()) > >> > > > > > > > > > > > >>> > .equalTo(new ElementSelector()) > >> > > > > > > > > > > > >>> > > >> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > >> > > > > > > > > > > > >>> > .apply(new JoinStreams); > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > Also in the Element Selector I have > inserted a > >> > > > sysout, > >> > > > > I > >> > > > > > am > >> > > > > > > > > > getting > >> > > > > > > > > > > > 20 > >> > > > > > > > > > > > >>> > sysouts instead of 10 (10 sysouts for source > >> and > >> > 10 > >> > > > for > >> > > > > > > dest > >> > > > > > > > > > > stream) > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > Unable to understand why one record is > coming > >> > less > >> > > to > >> > > > > > > > co-group > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > Regards, > >> > > > > > > > > > > > >>> > Vinay Patil > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian > >> Hueske < > >> > > > > > > > > > fhue...@gmail.com> > >> > > > > > > > > > > > >>> wrote: > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > > Can you add a flag to each element emitted > >> by > >> > the > >> > > > > > > > > > CoGroupFunction > >> > > > > > > > > > > > >>> that > >> > > > > > > > > > > > >>> > > indicates whether it was joined or not? > >> > > > > > > > > > > > >>> > > Then you can use split to distinguish > >> between > >> > > both > >> > > > > > cases > >> > > > > > > > and > >> > > > > > > > > > > handle > >> > > > > > > > > > > > >>> both > >> > > > > > > > > > > > >>> > > streams differently. > >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > Best, Fabian > >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil < > >> > > > > > > > > vinay18.pa...@gmail.com > >> > > > > > > > > > >: > >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > Hi Jark, > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > I am able to get the non-matching > elements > >> > in a > >> > > > > > stream > >> > > > > > > :, > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > Of-course we can collect the matching > >> > elements > >> > > in > >> > > > > the > >> > > > > > > > same > >> > > > > > > > > > > stream > >> > > > > > > > > > > > >>> as > >> > > > > > > > > > > > >>> > > well, > >> > > > > > > > > > > > >>> > > > however I want to perform additional > >> > operations > >> > > > on > >> > > > > > the > >> > > > > > > > > joined > >> > > > > > > > > > > > >>> stream > >> > > > > > > > > > > > >>> > > before > >> > > > > > > > > > > > >>> > > > writing it to S3, so I would have to > >> include > >> > a > >> > > > > > separate > >> > > > > > > > > join > >> > > > > > > > > > > > >>> operator > >> > > > > > > > > > > > >>> > for > >> > > > > > > > > > > > >>> > > > the same two streams, right ? > >> > > > > > > > > > > > >>> > > > Correct me if I am wrong. > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > I have pasted the dummy code which > >> collects > >> > the > >> > > > > > > > > non-matching > >> > > > > > > > > > > > >>> records (i > >> > > > > > > > > > > > >>> > > > have to perform this on the actual data, > >> > > correct > >> > > > me > >> > > > > > if > >> > > > > > > I > >> > > > > > > > am > >> > > > > > > > > > > dong > >> > > > > > > > > > > > >>> > wrong). > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > sourceStream.coGroup(destStream).where(new > >> > > > > > > > > > > > >>> > ElementSelector()).equalTo(new > >> > > > > > > > > > > > >>> > > > ElementSelector()) > >> > > > > > > > > > > > >>> > > > > >> > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > >> > > > > > > > > > > > >>> > > > .apply(new CoGroupFunction<Integer, > >> Integer, > >> > > > > > > Integer>() { > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > private static final long > >> serialVersionUID = > >> > > > > > > > > > > > 6408179761497497475L; > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > @Override > >> > > > > > > > > > > > >>> > > > public void coGroup(Iterable<Integer> > >> > > > > paramIterable, > >> > > > > > > > > > > > >>> Iterable<Integer> > >> > > > > > > > > > > > >>> > > > paramIterable1, > >> > > > > > > > > > > > >>> > > > Collector<Integer> paramCollector) > throws > >> > > > > Exception { > >> > > > > > > > > > > > >>> > > > long exactSizeIfKnown = > >> > > > > > > > > > > > >>> > > > >> > > paramIterable.spliterator().getExactSizeIfKnown(); > >> > > > > > > > > > > > >>> > > > long exactSizeIfKnown2 = > >> > > > > > > > > > > > >>> > > > > >> > > > paramIterable1.spliterator().getExactSizeIfKnown(); > >> > > > > > > > > > > > >>> > > > if(exactSizeIfKnown == 0 ) { > >> > > > > > > > > > > > >>> > > > > >> > > > > > > paramCollector.collect(paramIterable1.iterator().next()); > >> > > > > > > > > > > > >>> > > > } else if (exactSizeIfKnown2 == 0) { > >> > > > > > > > > > > > >>> > > > > >> > > > > > > paramCollector.collect(paramIterable.iterator().next()); > >> > > > > > > > > > > > >>> > > > } > >> > > > > > > > > > > > >>> > > > } > >> > > > > > > > > > > > >>> > > > }).print(); > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > Regards, > >> > > > > > > > > > > > >>> > > > Vinay Patil > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay > >> Patil > >> > < > >> > > > > > > > > > > > >>> vinay18.pa...@gmail.com> > >> > > > > > > > > > > > >>> > > > wrote: > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > You are right, debugged it for all > >> elements > >> > > , I > >> > > > > can > >> > > > > > > do > >> > > > > > > > > that > >> > > > > > > > > > > > now. > >> > > > > > > > > > > > >>> > > > > Thanks a lot. > >> > > > > > > > > > > > >>> > > > > > >> > > > > > > > > > > > >>> > > > > Regards, > >> > > > > > > > > > > > >>> > > > > Vinay Patil > >> > > > > > > > > > > > >>> > > > > > >> > > > > > > > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark > >> Wu < > >> > > > > > > > > > > > >>> > wuchong...@alibaba-inc.com> > >> > > > > > > > > > > > >>> > > > > wrote: > >> > > > > > > > > > > > >>> > > > > > >> > > > > > > > > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1, > >> > > > > > > Iterable<Integer> > >> > > > > > > > > > > iter2, > >> > > > > > > > > > > > >>> > > > >> Collector<Integer> out)` , when > both > >> > iter1 > >> > > > and > >> > > > > > > iter2 > >> > > > > > > > > are > >> > > > > > > > > > > not > >> > > > > > > > > > > > >>> > empty, > >> > > > > > > > > > > > >>> > > it > >> > > > > > > > > > > > >>> > > > >> means they are matched elements from > >> both > >> > > > > stream. > >> > > > > > > > > > > > >>> > > > >> When one of iter1 and iter2 is empty > , > >> it > >> > > > means > >> > > > > > that > >> > > > > > > > > they > >> > > > > > > > > > > are > >> > > > > > > > > > > > >>> > > unmatched. > >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > >> - Jark Wu (wuchong) > >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil < > >> > > > > > > > > > vinay18.pa...@gmail.com > >> > > > > > > > > > > > > >> > > > > > > > > > > > >>> 写道: > >> > > > > > > > > > > > >>> > > > >> > > >> > > > > > > > > > > > >>> > > > >> > Hi Matthias , > >> > > > > > > > > > > > >>> > > > >> > > >> > > > > > > > > > > > >>> > > > >> > I did not get you, even if we use > >> > Co-Group > >> > > > we > >> > > > > > have > >> > > > > > > > to > >> > > > > > > > > > > apply > >> > > > > > > > > > > > >>> it on > >> > > > > > > > > > > > >>> > a > >> > > > > > > > > > > > >>> > > > key > >> > > > > > > > > > > > >>> > > > >> > > >> > > > > > > > > > > > >>> > > > >> > sourceStream.coGroup(destStream) > >> > > > > > > > > > > > >>> > > > >> > .where(new ElementSelector()) > >> > > > > > > > > > > > >>> > > > >> > .equalTo(new ElementSelector()) > >> > > > > > > > > > > > >>> > > > >> > > >> > > > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > >> > > > > > > > > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer, > >> > > Integer, > >> > > > > > > > > Integer>() > >> > > > > > > > > > { > >> > > > > > > > > > > > >>> > > > >> > private static final long > >> > > serialVersionUID = > >> > > > > > > > > > > > >>> 6408179761497497475L; > >> > > > > > > > > > > > >>> > > > >> > > >> > > > > > > > > > > > >>> > > > >> > @Override > >> > > > > > > > > > > > >>> > > > >> > public void > coGroup(Iterable<Integer> > >> > > > > > > paramIterable, > >> > > > > > > > > > > > >>> > > Iterable<Integer> > >> > > > > > > > > > > > >>> > > > >> > paramIterable1, > >> > > > > > > > > > > > >>> > > > >> > Collector<Integer> paramCollector) > >> > throws > >> > > > > > > Exception > >> > > > > > > > { > >> > > > > > > > > > > > >>> > > > >> > Iterator<Integer> iterator = > >> > > > > > > > paramIterable.iterator(); > >> > > > > > > > > > > > >>> > > > >> > while(iterator.hasNext()) { > >> > > > > > > > > > > > >>> > > > >> > } > >> > > > > > > > > > > > >>> > > > >> > } > >> > > > > > > > > > > > >>> > > > >> > }); > >> > > > > > > > > > > > >>> > > > >> > > >> > > > > > > > > > > > >>> > > > >> > when I debug this ,only the matched > >> > > element > >> > > > > from > >> > > > > > > > both > >> > > > > > > > > > > stream > >> > > > > > > > > > > > >>> will > >> > > > > > > > > > > > >>> > > come > >> > > > > > > > > > > > >>> > > > >> in > >> > > > > > > > > > > > >>> > > > >> > the coGroup function. > >> > > > > > > > > > > > >>> > > > >> > > >> > > > > > > > > > > > >>> > > > >> > What I want is how do I check for > >> > > unmatched > >> > > > > > > elements > >> > > > > > > > > > from > >> > > > > > > > > > > > both > >> > > > > > > > > > > > >>> > > streams > >> > > > > > > > > > > > >>> > > > >> and > >> > > > > > > > > > > > >>> > > > >> > write it to sink. > >> > > > > > > > > > > > >>> > > > >> > > >> > > > > > > > > > > > >>> > > > >> > Regards, > >> > > > > > > > > > > > >>> > > > >> > Vinay Patil > >> > > > > > > > > > > > >>> > > > >> > > >> > > > > > > > > > > > >>> > > > >> > *+91-800-728-4749* > >> > > > > > > > > > > > >>> > > > >> > > >> > > > > > > > > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, > >> > Matthias > >> > > J. > >> > > > > > Sax < > >> > > > > > > > > > > > >>> > mj...@apache.org> > >> > > > > > > > > > > > >>> > > > >> wrote: > >> > > > > > > > > > > > >>> > > > >> > > >> > > > > > > > > > > > >>> > > > >> >> You need to do an outer-join. > >> However, > >> > > > there > >> > > > > is > >> > > > > > > no > >> > > > > > > > > > > build-in > >> > > > > > > > > > > > >>> > support > >> > > > > > > > > > > > >>> > > > for > >> > > > > > > > > > > > >>> > > > >> >> outer-joins yet. > >> > > > > > > > > > > > >>> > > > >> >> > >> > > > > > > > > > > > >>> > > > >> >> You can use Window-CoGroup to > >> implement > >> > > the > >> > > > > > > > > outer-join > >> > > > > > > > > > as > >> > > > > > > > > > > > an > >> > > > > > > > > > > > >>> own > >> > > > > > > > > > > > >>> > > > >> operator. > >> > > > > > > > > > > > >>> > > > >> >> > >> > > > > > > > > > > > >>> > > > >> >> > >> > > > > > > > > > > > >>> > > > >> >> -Matthias > >> > > > > > > > > > > > >>> > > > >> >> > >> > > > > > > > > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay > Patil > >> > > wrote: > >> > > > > > > > > > > > >>> > > > >> >>> Hi, > >> > > > > > > > > > > > >>> > > > >> >>> > >> > > > > > > > > > > > >>> > > > >> >>> I have a question regarding the > >> join > >> > > > > > operation, > >> > > > > > > > > > consider > >> > > > > > > > > > > > the > >> > > > > > > > > > > > >>> > > > following > >> > > > > > > > > > > > >>> > > > >> >>> dummy example: > >> > > > > > > > > > > > >>> > > > >> >>> > >> > > > > > > > > > > > >>> > > > >> >>> StreamExecutionEnvironment env = > >> > > > > > > > > > > > >>> > > > >> >>> > >> > > > > > > > > StreamExecutionEnvironment.getExecutionEnvironment(); > >> > > > > > > > > > > > >>> > > > >> >>> > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > >> > > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > >> > > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer> > >> > sourceStream = > >> > > > > > > > > > > > >>> > > > >> >>> > >> > > > env.fromElements(10,20,23,25,30,33,102,18); > >> > > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer> > >> destStream = > >> > > > > > > > > > > > >>> > > > >> >> > env.fromElements(20,30,40,50,60,10); > >> > > > > > > > > > > > >>> > > > >> >>> > >> > > > > > > > > > > > >>> > > > >> >>> sourceStream.join(destStream) > >> > > > > > > > > > > > >>> > > > >> >>> .where(new ElementSelector()) > >> > > > > > > > > > > > >>> > > > >> >>> .equalTo(new ElementSelector()) > >> > > > > > > > > > > > >>> > > > >> >>> > >> > > > > > > > > > > > > >> > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) > >> > > > > > > > > > > > >>> > > > >> >>> .apply(new JoinFunction<Integer, > >> > > Integer, > >> > > > > > > > > Integer>() { > >> > > > > > > > > > > > >>> > > > >> >>> > >> > > > > > > > > > > > >>> > > > >> >>> private static final long > >> > > > serialVersionUID = > >> > > > > > 1L; > >> > > > > > > > > > > > >>> > > > >> >>> > >> > > > > > > > > > > > >>> > > > >> >>> @Override > >> > > > > > > > > > > > >>> > > > >> >>> public Integer join(Integer > >> paramIN1, > >> > > > > Integer > >> > > > > > > > > > paramIN2) > >> > > > > > > > > > > > >>> throws > >> > > > > > > > > > > > >>> > > > >> Exception > >> > > > > > > > > > > > >>> > > > >> >> { > >> > > > > > > > > > > > >>> > > > >> >>> return paramIN1; > >> > > > > > > > > > > > >>> > > > >> >>> } > >> > > > > > > > > > > > >>> > > > >> >>> }).print(); > >> > > > > > > > > > > > >>> > > > >> >>> > >> > > > > > > > > > > > >>> > > > >> >>> I perfectly get the elements that > >> are > >> > > > > matching > >> > > > > > > in > >> > > > > > > > > both > >> > > > > > > > > > > the > >> > > > > > > > > > > > >>> > > streams, > >> > > > > > > > > > > > >>> > > > >> >> however > >> > > > > > > > > > > > >>> > > > >> >>> my requirement is to write these > >> > matched > >> > > > > > > elements > >> > > > > > > > > and > >> > > > > > > > > > > also > >> > > > > > > > > > > > >>> the > >> > > > > > > > > > > > >>> > > > >> unmatched > >> > > > > > > > > > > > >>> > > > >> >>> elements to sink(S3) > >> > > > > > > > > > > > >>> > > > >> >>> > >> > > > > > > > > > > > >>> > > > >> >>> How do I get the unmatched > elements > >> > from > >> > > > > each > >> > > > > > > > > stream ? > >> > > > > > > > > > > > >>> > > > >> >>> > >> > > > > > > > > > > > >>> > > > >> >>> Regards, > >> > > > > > > > > > > > >>> > > > >> >>> Vinay Patil > >> > > > > > > > > > > > >>> > > > >> >>> > >> > > > > > > > > > > > >>> > > > >> >> > >> > > > > > > > > > > > >>> > > > >> >> > >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > > > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > > > >