Hi, Just watched the video on Robust Stream Processing . So when we say Window is a stateful operator , does it mean that even if the task manager doing the window operation fails, will it pick up from the state left earlier when it comes up ? (Have not read more on state for now)
Also in one of our project when we deploy on cluster and check the Job Graph , everything is shown in one box , why this happens ? Is it because of chaining of streams ? So the box here represent the function flow, right ? Regards, Vinay Patil On Thu, Jun 30, 2016 at 7:29 PM, Vinay Patil <vinay18.pa...@gmail.com> wrote: > Hi Aljoscha, > > Just wanted to check if it works with it. > Anyways to solve the problem what we have thought of is to push heartbeat > message to Kafka after certain interval, so that we get continuous stream > always and that edge case will never occur, right ? > > One more question I have regarding the failover case : > Lets say I have a window of 10 secs , and in that there are e0 to en > elements , what if during this time node goes down ? > When the node comes up will it resume from the same state or will it > resume from the last checkpointed state ? > > Can we explicitly checkpoint inside the window , may be at the start of > the window or before we are applying window ? > > > Regards, > Vinay Patil > > On Thu, Jun 30, 2016 at 2:11 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> I think the problem is that the DeltaFunction needs to have this >> signature: >> >> DeltaFunction<CoGroupedStreams.TaggedUnion<Tuple2<String,DTO>, >> Tuple2<String,DTO>>> >> >> because the Trigger will see elements from both input streams which are >> represented as a TaggedUnion that can contain an element from either side. >> >> May I ask why you want to use the DeltaTrigger? >> >> Cheers, >> Aljoscha >> >> On Wed, 29 Jun 2016 at 19:06 Vinay Patil <vinay18.pa...@gmail.com> wrote: >> >> > Hi, >> > >> > Yes , now I am getting clear with the concepts here. >> > One last thing I want to try before going for custom trigger, I want to >> try >> > Delta Trigger but I am not able to get the syntax right , this is how I >> am >> > trying it : >> > >> > TypeInformation<Tuple2<String, DTO>> typeInfo = TypeInformation.of(new >> > TypeHint<Tuple2<String, DTO>>() { >> > }); >> > // source and destStream : Tuple2<String,DTO> >> > sourceStream.coGroup(destStream).where(new >> ElementSelector()).equalTo(new >> > ElementSelector()) >> > .window(TumblingTimeEventWindows.of(Time.seconds(10))) >> > .trigger(DeltaTrigger.of(triggerMeters, >> > new DeltaFunction<Tuple2<String,DTO>>() { >> > private static final long serialVersionUID = 1L; >> > >> > @Override >> > public double getDelta( >> > Tuple2<String,DTO> oldDataPoint, >> > Tuple2<String,DTO> newDataPoint) { >> > return <some_val>; >> > } >> > }, typeInfo.createSerializer(env.getConfig()).apply(new JoinStreams()); >> > >> > I am getting error cannot convert from DeltaTrigger to Trigger<? super >> > CoGroupedStreams... >> > What am I doing wrong here, I have referred the sample example. >> > >> > Regards, >> > Vinay Patil >> > >> > On Wed, Jun 29, 2016 at 7:15 PM, Aljoscha Krettek <aljos...@apache.org> >> > wrote: >> > >> > > Hi, >> > > you can use ingestion time if you don't care about the timestamps in >> your >> > > events, yes. If elements from the two streams happen to arrive at such >> > > times that they are not put into the same window then you won't get a >> > > match, correct. >> > > >> > > Regarding ingestion time and out-of-order events. I think this section >> > just >> > > reiterates that when using ingestion time the inherent timestamps in >> your >> > > events will not be considered and their order will not be respected. >> > > >> > > Regarding late data: right now, Flink always processes late data and >> it >> > is >> > > up to the Trigger to decide what to do with late data. You can >> implement >> > > your custom trigger based on EventTimeTrigger that would immediately >> > purge >> > > a window when an element arrives that is later than an allowed amount >> of >> > > lateness. In Flink 1.1 we will introduce a setting for windows that >> > allows >> > > to specify an allowed lateness. With this, late elements will be >> dropped >> > > automatically. This feature is already available in the master, by the >> > way. >> > > >> > > Cheers, >> > > Aljoscha >> > > >> > > On Wed, 29 Jun 2016 at 14:14 Vinay Patil <vinay18.pa...@gmail.com> >> > wrote: >> > > >> > > > Hi, >> > > > >> > > > Ok. >> > > > Inside the checkAndGetNextWatermark(lastElement, extractedTimestamp) >> > > method >> > > > both these parameters are coming same (timestamp value) , I was >> > expecting >> > > > last element timestamp value in the 1st param when I extract it. >> > > > >> > > > Lets say I decide to use IngestionTime (since I am getting accurate >> > > results >> > > > here for now), if the joining element of both streams are assigned >> to >> > > > different windows , then it that case I will not get the match , >> right >> > ? >> > > > >> > > > However in case of event time this guarantees to be in the same >> window >> > > > since we are assigning the timestamp, correct me here. >> > > > >> > > > According to documentation : >> > > > * Ingestion Time programs cannot handle any out-of-order events or >> late >> > > > data* >> > > > >> > > > In this context What do we mean by out-of-order events How does it >> know >> > > > that the events are out of order, I mean on which parameter does it >> > > decide >> > > > that the events are out-of-order ? As in case of event time we can >> say >> > > the >> > > > timestamps received are out of order. >> > > > >> > > > Late Data : does it have a threshold after which it does not accept >> > late >> > > > data ? >> > > > >> > > > >> > > > Regards, >> > > > Vinay Patil >> > > > >> > > > On Wed, Jun 29, 2016 at 5:15 PM, Aljoscha Krettek < >> aljos...@apache.org >> > > >> > > > wrote: >> > > > >> > > > > Hi, >> > > > > the element will be kept around indefinitely if no new watermark >> > > arrives. >> > > > > >> > > > > I think the same problem will persist for >> > > > AssignerWithPunctuatedWatermarks >> > > > > since there you also might not get the required "last watermark" >> to >> > > > trigger >> > > > > processing of the last window. >> > > > > >> > > > > Cheers, >> > > > > Aljoscha >> > > > > >> > > > > On Wed, 29 Jun 2016 at 13:18 Vinay Patil <vinay18.pa...@gmail.com >> > >> > > > wrote: >> > > > > >> > > > > > Hi Aljoscha, >> > > > > > >> > > > > > This clears a lot of doubts now. >> > > > > > So now lets say the stream paused for a while or it stops >> > completely >> > > on >> > > > > > Friday , let us assume that the last message did not get >> processed >> > > and >> > > > is >> > > > > > kept in the internal buffers. >> > > > > > >> > > > > > So when the stream starts again on Monday , will it consider the >> > last >> > > > > > element that is in the internal buffer for processing ? >> > > > > > How much time the internal buffer can hold the data or will it >> > flush >> > > > the >> > > > > > data after a threshold ? >> > > > > > >> > > > > > I have tried using AssignerWithPunctuatedWatermarks and >> generated >> > the >> > > > > > watermark for each event, still getting one record less. >> > > > > > >> > > > > > >> > > > > > Regards, >> > > > > > Vinay Patil >> > > > > > >> > > > > > On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek < >> > > aljos...@apache.org >> > > > > >> > > > > > wrote: >> > > > > > >> > > > > > > Hi, >> > > > > > > the reason why the last element might never be emitted is the >> way >> > > the >> > > > > > > ascending timestamp extractor works. I'll try and explain >> with an >> > > > > > example. >> > > > > > > >> > > > > > > Let's say we have a window size of 2 milliseconds, elements >> > arrive >> > > > > > starting >> > > > > > > with timestamp 0, window begin timestamp is inclusive, end >> > > timestamp >> > > > is >> > > > > > > exclusive: >> > > > > > > >> > > > > > > Element 0, Timestamp 0 (at this point the watermark is -1) >> > > > > > > Element 1, Timestamp 1 (at this point the watermark is 0) >> > > > > > > Element 2, Timestamp 1 (at this point the watermark is still >> 0) >> > > > > > > Element 3, Timestamp 2 (at this point the watermark is 1) >> > > > > > > >> > > > > > > now we can process the window (0, 2) because we know from the >> > > > watermark >> > > > > > > that no elements can arrive for that window anymore. The >> window >> > > > > contains >> > > > > > > elements 0,1,2 >> > > > > > > >> > > > > > > Element 4, Timestamp 3 (at this point the watermark is 2) >> > > > > > > Element 5, Timestamp 4 (at this point the watermark is 3) >> > > > > > > >> > > > > > > now we can process window (2, 4). The window contains elements >> > 3,4. >> > > > > > > >> > > > > > > At this point, we have Element 5 sitting in internal buffers >> for >> > > > window >> > > > > > (4, >> > > > > > > 6) but if we don't receive further elements the watermark will >> > > never >> > > > > > > advance and we will never process that window. >> > > > > > > >> > > > > > > If, however, we get new elements at some point the watermark >> > > advances >> > > > > and >> > > > > > > we don't have a problem. That's what I meant when I said that >> you >> > > > > > shouldn't >> > > > > > > have a problem if data keeps continuously arriving. >> > > > > > > >> > > > > > > Cheers, >> > > > > > > Aljoscha >> > > > > > > >> > > > > > > >> > > > > > > On Tue, 28 Jun 2016 at 17:14 Vinay Patil < >> > vinay18.pa...@gmail.com> >> > > > > > wrote: >> > > > > > > >> > > > > > > > Hi Aljoscha, >> > > > > > > > >> > > > > > > > Thanks a lot for your inputs. >> > > > > > > > >> > > > > > > > I still did not get you when you say you will not face this >> > issue >> > > > in >> > > > > > case >> > > > > > > > of continuous stream, lets consider the following example : >> > > > > > > > Assume that the stream runs continuously from Monday to >> > Friday, >> > > > and >> > > > > on >> > > > > > > > Friday it stops after 5.00 PM , will I still face this >> issue ? >> > > > > > > > >> > > > > > > > I am actually not able to understand how it will differ in >> real >> > > > time >> > > > > > > > streams. >> > > > > > > > >> > > > > > > > Regards, >> > > > > > > > Vinay Patil >> > > > > > > > >> > > > > > > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek < >> > > > > aljos...@apache.org >> > > > > > > >> > > > > > > > wrote: >> > > > > > > > >> > > > > > > > > Hi, >> > > > > > > > > ingestion time can only be used if you don't care about >> the >> > > > > timestamp >> > > > > > > in >> > > > > > > > > the elements. So if you have those you should probably use >> > > event >> > > > > > time. >> > > > > > > > > >> > > > > > > > > If your timestamps really are strictly increasing then the >> > > > > ascending >> > > > > > > > > extractor is good. And if you have a continuous stream of >> > > > incoming >> > > > > > > > elements >> > > > > > > > > you will not see the behavior of not getting the last >> > elements. >> > > > > > > > > >> > > > > > > > > By the way, when using Kafka you can also embed the >> timestamp >> > > > > > extractor >> > > > > > > > > directly in the Kafka consumer. This is described here: >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >> > > > > > > > > >> > > > > > > > > Cheers, >> > > > > > > > > Aljoscha >> > > > > > > > > >> > > > > > > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil < >> > > > vinay18.pa...@gmail.com> >> > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hi Aljoscha, >> > > > > > > > > > >> > > > > > > > > > Thank you for your response. >> > > > > > > > > > So do you suggest to use different approach for >> extracting >> > > > > > timestamp >> > > > > > > > (as >> > > > > > > > > > given in document) instead of AscendingTimeStamp >> Extractor >> > ? >> > > > > > > > > > Is that the reason I am seeing this unexpected >> behaviour ? >> > in >> > > > > case >> > > > > > of >> > > > > > > > > > continuous stream I would not see any data loss ? >> > > > > > > > > > >> > > > > > > > > > Also assuming that the records are always going to be in >> > > order >> > > > , >> > > > > > > which >> > > > > > > > is >> > > > > > > > > > the best approach : Ingestion Time or Event Time ? >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Regards, >> > > > > > > > > > Vinay Patil >> > > > > > > > > > >> > > > > > > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek < >> > > > > > > aljos...@apache.org >> > > > > > > > > >> > > > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Hi, >> > > > > > > > > > > first regarding tumbling windows: even if you have 5 >> > minute >> > > > > > windows >> > > > > > > > it >> > > > > > > > > > can >> > > > > > > > > > > happen that elements that are only seconds apart go >> into >> > > > > > different >> > > > > > > > > > windows. >> > > > > > > > > > > Consider the following case: >> > > > > > > > > > > >> > > > > > > > > > > | x | x | >> > > > > > > > > > > >> > > > > > > > > > > These are two 5-mintue windows and the two elements >> are >> > > only >> > > > > > > seconds >> > > > > > > > > > apart >> > > > > > > > > > > but go into different windows because windows are >> aligned >> > > to >> > > > > > epoch. >> > > > > > > > > > > >> > > > > > > > > > > Now, for the ascending timestamp extractor. The reason >> > this >> > > > can >> > > > > > > > behave >> > > > > > > > > in >> > > > > > > > > > > unexpected ways is that it emits a watermark that is >> > "last >> > > > > > > timestamp >> > > > > > > > - >> > > > > > > > > > 1", >> > > > > > > > > > > i.e. if it has seen timestamp t it can only emit >> > watermark >> > > > t-1 >> > > > > > > > because >> > > > > > > > > > > there might be other elements with timestamp t >> arriving. >> > If >> > > > you >> > > > > > > have >> > > > > > > > a >> > > > > > > > > > > continuous stream of elements you wouldn't notice >> this. >> > > Only >> > > > in >> > > > > > > this >> > > > > > > > > > > constructed example does it become visible. >> > > > > > > > > > > >> > > > > > > > > > > Cheers, >> > > > > > > > > > > Aljoscha >> > > > > > > > > > > >> > > > > > > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil < >> > > > > > vinay18.pa...@gmail.com> >> > > > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Hi, >> > > > > > > > > > > > >> > > > > > > > > > > > Following is the timestamp I am getting from DTO, >> here >> > is >> > > > the >> > > > > > > > > timestamp >> > > > > > > > > > > > difference between the two records : >> > > > > > > > > > > > 1466115892162154279 >> > > > > > > > > > > > 1466116026233613409 >> > > > > > > > > > > > >> > > > > > > > > > > > So the time difference is roughly 3 min, even if I >> > apply >> > > > the >> > > > > > > window >> > > > > > > > > of >> > > > > > > > > > > 5min >> > > > > > > > > > > > , I am not getting the last record (last timestamp >> > value >> > > > > > above), >> > > > > > > > > > > > using ascending timestamp extractor for generating >> the >> > > > > > timestamp >> > > > > > > > > > > (assuming >> > > > > > > > > > > > that the timestamp are always in order) >> > > > > > > > > > > > >> > > > > > > > > > > > I was at-least expecting data to reach the co-group >> > > > function. >> > > > > > > > > > > > What could be the reason for the data loss ? The >> data >> > we >> > > > are >> > > > > > > > getting >> > > > > > > > > is >> > > > > > > > > > > > critical, hence we cannot afford to loose any data >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > Regards, >> > > > > > > > > > > > Vinay Patil >> > > > > > > > > > > > >> > > > > > > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil < >> > > > > > > > > vinay18.pa...@gmail.com >> > > > > > > > > > > >> > > > > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > Just an update, when I keep IngestionTime and >> remove >> > > the >> > > > > > > > timestamp >> > > > > > > > > I >> > > > > > > > > > am >> > > > > > > > > > > > > generating, I am getting all the records, but for >> > Event >> > > > > Time >> > > > > > I >> > > > > > > am >> > > > > > > > > > > getting >> > > > > > > > > > > > > one less record, I checked the Time Difference >> > between >> > > > two >> > > > > > > > records, >> > > > > > > > > > it >> > > > > > > > > > > > is 3 >> > > > > > > > > > > > > min, I tried keeping the window time to 5 mins, >> but >> > > that >> > > > > even >> > > > > > > did >> > > > > > > > > not >> > > > > > > > > > > > work. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Even when I try assigning timestamp for >> > IngestionTime, >> > > I >> > > > > get >> > > > > > > one >> > > > > > > > > > record >> > > > > > > > > > > > > less, so should I safely use Ingestion Time or is >> it >> > > > always >> > > > > > > > > advisable >> > > > > > > > > > > to >> > > > > > > > > > > > > use EventTime ? >> > > > > > > > > > > > > >> > > > > > > > > > > > > Regards, >> > > > > > > > > > > > > Vinay Patil >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil < >> > > > > > > > > > vinay18.pa...@gmail.com> >> > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > >> Hi , >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> Actually I am only publishing 5 messages each to >> two >> > > > > > different >> > > > > > > > > kafka >> > > > > > > > > > > > >> topics (using Junit), even if I keep the window >> to >> > 500 >> > > > > > seconds >> > > > > > > > the >> > > > > > > > > > > > result >> > > > > > > > > > > > >> is same. >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> I am not understanding why it is not sending the >> 5th >> > > > > element >> > > > > > > to >> > > > > > > > > > > co-group >> > > > > > > > > > > > >> operator even when the keys are same. >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> I actually cannot share the actual client code. >> > > > > > > > > > > > >> But this is what the streams look like : >> > > > > > > > > > > > >> sourceStream.coGroup(destStream) >> > > > > > > > > > > > >> here the sourceStream and destStream is actually >> > > > > > > > > Tuple2<String,DTO> >> > > > > > > > > > , >> > > > > > > > > > > > and >> > > > > > > > > > > > >> the ElementSelector returns tuple.f0 which is the >> > key. >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> I am generating the timestamp based on a field >> from >> > > the >> > > > > DTO >> > > > > > > > which >> > > > > > > > > is >> > > > > > > > > > > > >> guaranteed to be in order. >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> Will using the triggers help here ? >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> Regards, >> > > > > > > > > > > > >> Vinay Patil >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> *+91-800-728-4749* >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha >> Krettek < >> > > > > > > > > > > aljos...@apache.org> >> > > > > > > > > > > > >> wrote: >> > > > > > > > > > > > >> >> > > > > > > > > > > > >>> Hi, >> > > > > > > > > > > > >>> what timestamps are you assigning? Is it >> guaranteed >> > > > that >> > > > > > all >> > > > > > > of >> > > > > > > > > > them >> > > > > > > > > > > > >>> would >> > > > > > > > > > > > >>> fall into the same 30 second window? >> > > > > > > > > > > > >>> >> > > > > > > > > > > > >>> The issue with duplicate printing in the >> > > > ElementSelector >> > > > > is >> > > > > > > > > > strange? >> > > > > > > > > > > > >>> Could >> > > > > > > > > > > > >>> you post a more complete code example so that I >> can >> > > > > > reproduce >> > > > > > > > the >> > > > > > > > > > > > >>> problem? >> > > > > > > > > > > > >>> >> > > > > > > > > > > > >>> Cheers, >> > > > > > > > > > > > >>> Aljoscha >> > > > > > > > > > > > >>> >> > > > > > > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil < >> > > > > > > > > vinay18.pa...@gmail.com> >> > > > > > > > > > > > >>> wrote: >> > > > > > > > > > > > >>> >> > > > > > > > > > > > >>> > Hi , >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > I am able to get the matching and non-matching >> > > > > elements. >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > However when I am unit testing the code , I am >> > > > getting >> > > > > > one >> > > > > > > > > record >> > > > > > > > > > > > less >> > > > > > > > > > > > >>> > inside the overriden cogroup function. >> > > > > > > > > > > > >>> > Testing the following way : >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > 1) Insert 5 messages into local kafka topic >> > (test1) >> > > > > > > > > > > > >>> > 2) Insert different 5 messages into local >> kafka >> > > topic >> > > > > > > (test2) >> > > > > > > > > > > > >>> > 3) Consume 1) and 2) and I have two different >> > kafka >> > > > > > > streams >> > > > > > > > > > > > >>> > 4) Generate ascending timestamp(using Event >> Time) >> > > for >> > > > > > both >> > > > > > > > > > streams >> > > > > > > > > > > > and >> > > > > > > > > > > > >>> > create key(String) >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > Now till 4) I am able to get all the records >> > > (checked >> > > > > by >> > > > > > > > > printing >> > > > > > > > > > > the >> > > > > > > > > > > > >>> > stream in text file) >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > However when I send the stream to co-group >> > > operator, >> > > > I >> > > > > am >> > > > > > > > > > receiving >> > > > > > > > > > > > one >> > > > > > > > > > > > >>> > less record, using the following syntax: >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > sourceStream.coGroup(destStream) >> > > > > > > > > > > > >>> > .where(new ElementSelector()) >> > > > > > > > > > > > >>> > .equalTo(new ElementSelector()) >> > > > > > > > > > > > >>> > >> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) >> > > > > > > > > > > > >>> > .apply(new JoinStreams); >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > Also in the Element Selector I have inserted a >> > > > sysout, >> > > > > I >> > > > > > am >> > > > > > > > > > getting >> > > > > > > > > > > > 20 >> > > > > > > > > > > > >>> > sysouts instead of 10 (10 sysouts for source >> and >> > 10 >> > > > for >> > > > > > > dest >> > > > > > > > > > > stream) >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > Unable to understand why one record is coming >> > less >> > > to >> > > > > > > > co-group >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > Regards, >> > > > > > > > > > > > >>> > Vinay Patil >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian >> Hueske < >> > > > > > > > > > fhue...@gmail.com> >> > > > > > > > > > > > >>> wrote: >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > > Can you add a flag to each element emitted >> by >> > the >> > > > > > > > > > CoGroupFunction >> > > > > > > > > > > > >>> that >> > > > > > > > > > > > >>> > > indicates whether it was joined or not? >> > > > > > > > > > > > >>> > > Then you can use split to distinguish >> between >> > > both >> > > > > > cases >> > > > > > > > and >> > > > > > > > > > > handle >> > > > > > > > > > > > >>> both >> > > > > > > > > > > > >>> > > streams differently. >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > > Best, Fabian >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil < >> > > > > > > > > vinay18.pa...@gmail.com >> > > > > > > > > > >: >> > > > > > > > > > > > >>> > > >> > > > > > > > > > > > >>> > > > Hi Jark, >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > I am able to get the non-matching elements >> > in a >> > > > > > stream >> > > > > > > :, >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > Of-course we can collect the matching >> > elements >> > > in >> > > > > the >> > > > > > > > same >> > > > > > > > > > > stream >> > > > > > > > > > > > >>> as >> > > > > > > > > > > > >>> > > well, >> > > > > > > > > > > > >>> > > > however I want to perform additional >> > operations >> > > > on >> > > > > > the >> > > > > > > > > joined >> > > > > > > > > > > > >>> stream >> > > > > > > > > > > > >>> > > before >> > > > > > > > > > > > >>> > > > writing it to S3, so I would have to >> include >> > a >> > > > > > separate >> > > > > > > > > join >> > > > > > > > > > > > >>> operator >> > > > > > > > > > > > >>> > for >> > > > > > > > > > > > >>> > > > the same two streams, right ? >> > > > > > > > > > > > >>> > > > Correct me if I am wrong. >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > I have pasted the dummy code which >> collects >> > the >> > > > > > > > > non-matching >> > > > > > > > > > > > >>> records (i >> > > > > > > > > > > > >>> > > > have to perform this on the actual data, >> > > correct >> > > > me >> > > > > > if >> > > > > > > I >> > > > > > > > am >> > > > > > > > > > > dong >> > > > > > > > > > > > >>> > wrong). >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > sourceStream.coGroup(destStream).where(new >> > > > > > > > > > > > >>> > ElementSelector()).equalTo(new >> > > > > > > > > > > > >>> > > > ElementSelector()) >> > > > > > > > > > > > >>> > > > >> > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) >> > > > > > > > > > > > >>> > > > .apply(new CoGroupFunction<Integer, >> Integer, >> > > > > > > Integer>() { >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > private static final long >> serialVersionUID = >> > > > > > > > > > > > 6408179761497497475L; >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > @Override >> > > > > > > > > > > > >>> > > > public void coGroup(Iterable<Integer> >> > > > > paramIterable, >> > > > > > > > > > > > >>> Iterable<Integer> >> > > > > > > > > > > > >>> > > > paramIterable1, >> > > > > > > > > > > > >>> > > > Collector<Integer> paramCollector) throws >> > > > > Exception { >> > > > > > > > > > > > >>> > > > long exactSizeIfKnown = >> > > > > > > > > > > > >>> > > >> > > paramIterable.spliterator().getExactSizeIfKnown(); >> > > > > > > > > > > > >>> > > > long exactSizeIfKnown2 = >> > > > > > > > > > > > >>> > > > >> > > > paramIterable1.spliterator().getExactSizeIfKnown(); >> > > > > > > > > > > > >>> > > > if(exactSizeIfKnown == 0 ) { >> > > > > > > > > > > > >>> > > > >> > > > > > > paramCollector.collect(paramIterable1.iterator().next()); >> > > > > > > > > > > > >>> > > > } else if (exactSizeIfKnown2 == 0) { >> > > > > > > > > > > > >>> > > > >> > > > > > > paramCollector.collect(paramIterable.iterator().next()); >> > > > > > > > > > > > >>> > > > } >> > > > > > > > > > > > >>> > > > } >> > > > > > > > > > > > >>> > > > }).print(); >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > Regards, >> > > > > > > > > > > > >>> > > > Vinay Patil >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay >> Patil >> > < >> > > > > > > > > > > > >>> vinay18.pa...@gmail.com> >> > > > > > > > > > > > >>> > > > wrote: >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > > You are right, debugged it for all >> elements >> > > , I >> > > > > can >> > > > > > > do >> > > > > > > > > that >> > > > > > > > > > > > now. >> > > > > > > > > > > > >>> > > > > Thanks a lot. >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > Regards, >> > > > > > > > > > > > >>> > > > > Vinay Patil >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark >> Wu < >> > > > > > > > > > > > >>> > wuchong...@alibaba-inc.com> >> > > > > > > > > > > > >>> > > > > wrote: >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1, >> > > > > > > Iterable<Integer> >> > > > > > > > > > > iter2, >> > > > > > > > > > > > >>> > > > >> Collector<Integer> out)` , when both >> > iter1 >> > > > and >> > > > > > > iter2 >> > > > > > > > > are >> > > > > > > > > > > not >> > > > > > > > > > > > >>> > empty, >> > > > > > > > > > > > >>> > > it >> > > > > > > > > > > > >>> > > > >> means they are matched elements from >> both >> > > > > stream. >> > > > > > > > > > > > >>> > > > >> When one of iter1 and iter2 is empty , >> it >> > > > means >> > > > > > that >> > > > > > > > > they >> > > > > > > > > > > are >> > > > > > > > > > > > >>> > > unmatched. >> > > > > > > > > > > > >>> > > > >> >> > > > > > > > > > > > >>> > > > >> >> > > > > > > > > > > > >>> > > > >> - Jark Wu (wuchong) >> > > > > > > > > > > > >>> > > > >> >> > > > > > > > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil < >> > > > > > > > > > vinay18.pa...@gmail.com >> > > > > > > > > > > > >> > > > > > > > > > > > >>> 写道: >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > >> > Hi Matthias , >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > >> > I did not get you, even if we use >> > Co-Group >> > > > we >> > > > > > have >> > > > > > > > to >> > > > > > > > > > > apply >> > > > > > > > > > > > >>> it on >> > > > > > > > > > > > >>> > a >> > > > > > > > > > > > >>> > > > key >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > >> > sourceStream.coGroup(destStream) >> > > > > > > > > > > > >>> > > > >> > .where(new ElementSelector()) >> > > > > > > > > > > > >>> > > > >> > .equalTo(new ElementSelector()) >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) >> > > > > > > > > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer, >> > > Integer, >> > > > > > > > > Integer>() >> > > > > > > > > > { >> > > > > > > > > > > > >>> > > > >> > private static final long >> > > serialVersionUID = >> > > > > > > > > > > > >>> 6408179761497497475L; >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > >> > @Override >> > > > > > > > > > > > >>> > > > >> > public void coGroup(Iterable<Integer> >> > > > > > > paramIterable, >> > > > > > > > > > > > >>> > > Iterable<Integer> >> > > > > > > > > > > > >>> > > > >> > paramIterable1, >> > > > > > > > > > > > >>> > > > >> > Collector<Integer> paramCollector) >> > throws >> > > > > > > Exception >> > > > > > > > { >> > > > > > > > > > > > >>> > > > >> > Iterator<Integer> iterator = >> > > > > > > > paramIterable.iterator(); >> > > > > > > > > > > > >>> > > > >> > while(iterator.hasNext()) { >> > > > > > > > > > > > >>> > > > >> > } >> > > > > > > > > > > > >>> > > > >> > } >> > > > > > > > > > > > >>> > > > >> > }); >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > >> > when I debug this ,only the matched >> > > element >> > > > > from >> > > > > > > > both >> > > > > > > > > > > stream >> > > > > > > > > > > > >>> will >> > > > > > > > > > > > >>> > > come >> > > > > > > > > > > > >>> > > > >> in >> > > > > > > > > > > > >>> > > > >> > the coGroup function. >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > >> > What I want is how do I check for >> > > unmatched >> > > > > > > elements >> > > > > > > > > > from >> > > > > > > > > > > > both >> > > > > > > > > > > > >>> > > streams >> > > > > > > > > > > > >>> > > > >> and >> > > > > > > > > > > > >>> > > > >> > write it to sink. >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > >> > Regards, >> > > > > > > > > > > > >>> > > > >> > Vinay Patil >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > >> > *+91-800-728-4749* >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, >> > Matthias >> > > J. >> > > > > > Sax < >> > > > > > > > > > > > >>> > mj...@apache.org> >> > > > > > > > > > > > >>> > > > >> wrote: >> > > > > > > > > > > > >>> > > > >> > >> > > > > > > > > > > > >>> > > > >> >> You need to do an outer-join. >> However, >> > > > there >> > > > > is >> > > > > > > no >> > > > > > > > > > > build-in >> > > > > > > > > > > > >>> > support >> > > > > > > > > > > > >>> > > > for >> > > > > > > > > > > > >>> > > > >> >> outer-joins yet. >> > > > > > > > > > > > >>> > > > >> >> >> > > > > > > > > > > > >>> > > > >> >> You can use Window-CoGroup to >> implement >> > > the >> > > > > > > > > outer-join >> > > > > > > > > > as >> > > > > > > > > > > > an >> > > > > > > > > > > > >>> own >> > > > > > > > > > > > >>> > > > >> operator. >> > > > > > > > > > > > >>> > > > >> >> >> > > > > > > > > > > > >>> > > > >> >> >> > > > > > > > > > > > >>> > > > >> >> -Matthias >> > > > > > > > > > > > >>> > > > >> >> >> > > > > > > > > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil >> > > wrote: >> > > > > > > > > > > > >>> > > > >> >>> Hi, >> > > > > > > > > > > > >>> > > > >> >>> >> > > > > > > > > > > > >>> > > > >> >>> I have a question regarding the >> join >> > > > > > operation, >> > > > > > > > > > consider >> > > > > > > > > > > > the >> > > > > > > > > > > > >>> > > > following >> > > > > > > > > > > > >>> > > > >> >>> dummy example: >> > > > > > > > > > > > >>> > > > >> >>> >> > > > > > > > > > > > >>> > > > >> >>> StreamExecutionEnvironment env = >> > > > > > > > > > > > >>> > > > >> >>> >> > > > > > > > > StreamExecutionEnvironment.getExecutionEnvironment(); >> > > > > > > > > > > > >>> > > > >> >>> >> > > > > > > > > > > > >>> > >> > > > > > > > > > >> > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); >> > > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer> >> > sourceStream = >> > > > > > > > > > > > >>> > > > >> >>> >> > > > env.fromElements(10,20,23,25,30,33,102,18); >> > > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer> >> destStream = >> > > > > > > > > > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10); >> > > > > > > > > > > > >>> > > > >> >>> >> > > > > > > > > > > > >>> > > > >> >>> sourceStream.join(destStream) >> > > > > > > > > > > > >>> > > > >> >>> .where(new ElementSelector()) >> > > > > > > > > > > > >>> > > > >> >>> .equalTo(new ElementSelector()) >> > > > > > > > > > > > >>> > > > >> >>> >> > > > > > > > > > > > >> > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) >> > > > > > > > > > > > >>> > > > >> >>> .apply(new JoinFunction<Integer, >> > > Integer, >> > > > > > > > > Integer>() { >> > > > > > > > > > > > >>> > > > >> >>> >> > > > > > > > > > > > >>> > > > >> >>> private static final long >> > > > serialVersionUID = >> > > > > > 1L; >> > > > > > > > > > > > >>> > > > >> >>> >> > > > > > > > > > > > >>> > > > >> >>> @Override >> > > > > > > > > > > > >>> > > > >> >>> public Integer join(Integer >> paramIN1, >> > > > > Integer >> > > > > > > > > > paramIN2) >> > > > > > > > > > > > >>> throws >> > > > > > > > > > > > >>> > > > >> Exception >> > > > > > > > > > > > >>> > > > >> >> { >> > > > > > > > > > > > >>> > > > >> >>> return paramIN1; >> > > > > > > > > > > > >>> > > > >> >>> } >> > > > > > > > > > > > >>> > > > >> >>> }).print(); >> > > > > > > > > > > > >>> > > > >> >>> >> > > > > > > > > > > > >>> > > > >> >>> I perfectly get the elements that >> are >> > > > > matching >> > > > > > > in >> > > > > > > > > both >> > > > > > > > > > > the >> > > > > > > > > > > > >>> > > streams, >> > > > > > > > > > > > >>> > > > >> >> however >> > > > > > > > > > > > >>> > > > >> >>> my requirement is to write these >> > matched >> > > > > > > elements >> > > > > > > > > and >> > > > > > > > > > > also >> > > > > > > > > > > > >>> the >> > > > > > > > > > > > >>> > > > >> unmatched >> > > > > > > > > > > > >>> > > > >> >>> elements to sink(S3) >> > > > > > > > > > > > >>> > > > >> >>> >> > > > > > > > > > > > >>> > > > >> >>> How do I get the unmatched elements >> > from >> > > > > each >> > > > > > > > > stream ? >> > > > > > > > > > > > >>> > > > >> >>> >> > > > > > > > > > > > >>> > > > >> >>> Regards, >> > > > > > > > > > > > >>> > > > >> >>> Vinay Patil