Hi,
yes, the window operator is stateful, which means that it will pick up
where it left in case of a failure and restore.

You're right about the graph, chained operators are shown as one box.

Cheers,
Aljoscha

On Fri, 1 Jul 2016 at 04:52 Vinay Patil <vinay18.pa...@gmail.com> wrote:

> Hi,
>
> Just watched the video on Robust Stream Processing .
> So when we say Window is a stateful operator , does it mean that even if
> the task manager doing the window operation fails,  will it pick up from
> the state left earlier when it comes up ? (Have not read more on state for
> now)
>
>
> Also in one of our project when we deploy on cluster and check the Job
> Graph , everything is shown in one box , why this happens ? Is it because
> of chaining of streams ?
> So the box here represent the function flow, right ?
>
>
>
> Regards,
> Vinay Patil
>
> On Thu, Jun 30, 2016 at 7:29 PM, Vinay Patil <vinay18.pa...@gmail.com>
> wrote:
>
> > Hi Aljoscha,
> >
> > Just wanted to check if it works with it.
> > Anyways to solve the problem what we have thought of is to push heartbeat
> > message to Kafka after certain interval, so that we get continuous stream
> > always and that edge case will never occur, right ?
> >
> > One more question I have regarding the failover case :
> > Lets say I have a window of 10 secs , and in that there are e0 to en
> > elements , what if during this time node goes down ?
> > When the node comes up will it resume from the same state or will it
> > resume from the last checkpointed state ?
> >
> > Can we explicitly checkpoint inside the window , may be at the start of
> > the window or before we are applying window ?
> >
> >
> > Regards,
> > Vinay Patil
> >
> > On Thu, Jun 30, 2016 at 2:11 PM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> >> Hi,
> >> I think the problem is that the DeltaFunction needs to have this
> >> signature:
> >>
> >> DeltaFunction<CoGroupedStreams.TaggedUnion<Tuple2<String,DTO>,
> >> Tuple2<String,DTO>>>
> >>
> >> because the Trigger will see elements from both input streams which are
> >> represented as a TaggedUnion that can contain an element from either
> side.
> >>
> >> May I ask why you want to use the DeltaTrigger?
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Wed, 29 Jun 2016 at 19:06 Vinay Patil <vinay18.pa...@gmail.com>
> wrote:
> >>
> >> > Hi,
> >> >
> >> > Yes , now I am getting clear with the concepts here.
> >> > One last thing I want to try before going for custom trigger, I want
> to
> >> try
> >> > Delta Trigger but I am not able to get the syntax right , this is how
> I
> >> am
> >> > trying it :
> >> >
> >> > TypeInformation<Tuple2<String, DTO>> typeInfo = TypeInformation.of(new
> >> > TypeHint<Tuple2<String, DTO>>() {
> >> > });
> >> > // source and destStream : Tuple2<String,DTO>
> >> > sourceStream.coGroup(destStream).where(new
> >> ElementSelector()).equalTo(new
> >> > ElementSelector())
> >> > .window(TumblingTimeEventWindows.of(Time.seconds(10)))
> >> > .trigger(DeltaTrigger.of(triggerMeters,
> >> > new DeltaFunction<Tuple2<String,DTO>>() {
> >> > private static final long serialVersionUID = 1L;
> >> >
> >> > @Override
> >> > public double getDelta(
> >> > Tuple2<String,DTO> oldDataPoint,
> >> > Tuple2<String,DTO> newDataPoint) {
> >> > return <some_val>;
> >> > }
> >> > }, typeInfo.createSerializer(env.getConfig()).apply(new
> JoinStreams());
> >> >
> >> > I am getting error cannot convert from DeltaTrigger to Trigger<? super
> >> > CoGroupedStreams...
> >> > What am I doing wrong here, I have referred the sample example.
> >> >
> >> > Regards,
> >> > Vinay Patil
> >> >
> >> > On Wed, Jun 29, 2016 at 7:15 PM, Aljoscha Krettek <
> aljos...@apache.org>
> >> > wrote:
> >> >
> >> > > Hi,
> >> > > you can use ingestion time if you don't care about the timestamps in
> >> your
> >> > > events, yes. If elements from the two streams happen to arrive at
> such
> >> > > times that they are not put into the same window then you won't get
> a
> >> > > match, correct.
> >> > >
> >> > > Regarding ingestion time and out-of-order events. I think this
> section
> >> > just
> >> > > reiterates that when using ingestion time the inherent timestamps in
> >> your
> >> > > events will not be considered and their order will not be respected.
> >> > >
> >> > > Regarding late data: right now, Flink always processes late data and
> >> it
> >> > is
> >> > > up to the Trigger to decide what to do with late data. You can
> >> implement
> >> > > your custom trigger based on EventTimeTrigger that would immediately
> >> > purge
> >> > > a window when an element arrives that is later than an allowed
> amount
> >> of
> >> > > lateness. In Flink 1.1 we will introduce a setting for windows that
> >> > allows
> >> > > to specify an allowed lateness. With this, late elements will be
> >> dropped
> >> > > automatically. This feature is already available in the master, by
> the
> >> > way.
> >> > >
> >> > > Cheers,
> >> > > Aljoscha
> >> > >
> >> > > On Wed, 29 Jun 2016 at 14:14 Vinay Patil <vinay18.pa...@gmail.com>
> >> > wrote:
> >> > >
> >> > > > Hi,
> >> > > >
> >> > > > Ok.
> >> > > > Inside the checkAndGetNextWatermark(lastElement,
> extractedTimestamp)
> >> > > method
> >> > > > both these parameters are coming same (timestamp value) , I was
> >> > expecting
> >> > > > last element timestamp value in the 1st param when I extract it.
> >> > > >
> >> > > > Lets say I decide to use IngestionTime (since I am getting
> accurate
> >> > > results
> >> > > > here for now), if the joining element of both streams are assigned
> >> to
> >> > > > different windows , then it that case I will not get the match ,
> >> right
> >> > ?
> >> > > >
> >> > > > However in case of event time this guarantees to be in the same
> >> window
> >> > > > since we are assigning the timestamp, correct me here.
> >> > > >
> >> > > >  According to documentation :
> >> > > > * Ingestion Time programs cannot handle any out-of-order events or
> >> late
> >> > > > data*
> >> > > >
> >> > > > In this context What do we mean by out-of-order events How does it
> >> know
> >> > > > that the events are out of order, I mean on which parameter does
> it
> >> > > decide
> >> > > > that the events are out-of-order  ? As in case of event time we
> can
> >> say
> >> > > the
> >> > > > timestamps received are out of order.
> >> > > >
> >> > > > Late Data : does it have a threshold after which it does not
> accept
> >> > late
> >> > > > data ?
> >> > > >
> >> > > >
> >> > > > Regards,
> >> > > > Vinay Patil
> >> > > >
> >> > > > On Wed, Jun 29, 2016 at 5:15 PM, Aljoscha Krettek <
> >> aljos...@apache.org
> >> > >
> >> > > > wrote:
> >> > > >
> >> > > > > Hi,
> >> > > > > the element will be kept around indefinitely if no new watermark
> >> > > arrives.
> >> > > > >
> >> > > > > I think the same problem will persist for
> >> > > > AssignerWithPunctuatedWatermarks
> >> > > > > since there you also might not get the required "last watermark"
> >> to
> >> > > > trigger
> >> > > > > processing of the last window.
> >> > > > >
> >> > > > > Cheers,
> >> > > > > Aljoscha
> >> > > > >
> >> > > > > On Wed, 29 Jun 2016 at 13:18 Vinay Patil <
> vinay18.pa...@gmail.com
> >> >
> >> > > > wrote:
> >> > > > >
> >> > > > > > Hi Aljoscha,
> >> > > > > >
> >> > > > > > This clears a lot of doubts now.
> >> > > > > > So now lets say the stream paused for a while or it stops
> >> > completely
> >> > > on
> >> > > > > > Friday , let us assume that the last message did not get
> >> processed
> >> > > and
> >> > > > is
> >> > > > > > kept in the internal buffers.
> >> > > > > >
> >> > > > > > So when the stream starts again on Monday , will it consider
> the
> >> > last
> >> > > > > > element that is in the internal buffer for processing ?
> >> > > > > >  How much time the internal buffer can hold the data or will
> it
> >> > flush
> >> > > > the
> >> > > > > > data after a threshold ?
> >> > > > > >
> >> > > > > > I have tried using AssignerWithPunctuatedWatermarks and
> >> generated
> >> > the
> >> > > > > > watermark for each event, still getting one record less.
> >> > > > > >
> >> > > > > >
> >> > > > > > Regards,
> >> > > > > > Vinay Patil
> >> > > > > >
> >> > > > > > On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek <
> >> > > aljos...@apache.org
> >> > > > >
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi,
> >> > > > > > > the reason why the last element might never be emitted is
> the
> >> way
> >> > > the
> >> > > > > > > ascending timestamp extractor works. I'll try and explain
> >> with an
> >> > > > > > example.
> >> > > > > > >
> >> > > > > > > Let's say we have a window size of 2 milliseconds, elements
> >> > arrive
> >> > > > > > starting
> >> > > > > > > with timestamp 0, window begin timestamp is inclusive, end
> >> > > timestamp
> >> > > > is
> >> > > > > > > exclusive:
> >> > > > > > >
> >> > > > > > > Element 0, Timestamp 0 (at this point the watermark is -1)
> >> > > > > > > Element 1, Timestamp 1 (at this point the watermark is 0)
> >> > > > > > > Element 2, Timestamp 1 (at this point the watermark is still
> >> 0)
> >> > > > > > > Element 3, Timestamp 2 (at this point the watermark is 1)
> >> > > > > > >
> >> > > > > > > now we can process the window (0, 2) because we know from
> the
> >> > > > watermark
> >> > > > > > > that no elements can arrive for that window anymore. The
> >> window
> >> > > > > contains
> >> > > > > > > elements 0,1,2
> >> > > > > > >
> >> > > > > > > Element 4, Timestamp 3 (at this point the watermark is 2)
> >> > > > > > > Element 5, Timestamp 4 (at this point the watermark is 3)
> >> > > > > > >
> >> > > > > > > now we can process window (2, 4). The window contains
> elements
> >> > 3,4.
> >> > > > > > >
> >> > > > > > > At this point, we have Element 5 sitting in internal buffers
> >> for
> >> > > > window
> >> > > > > > (4,
> >> > > > > > > 6) but if we don't receive further elements the watermark
> will
> >> > > never
> >> > > > > > > advance and we will never process that window.
> >> > > > > > >
> >> > > > > > > If, however, we get new elements at some point the watermark
> >> > > advances
> >> > > > > and
> >> > > > > > > we don't have a problem. That's what I meant when I said
> that
> >> you
> >> > > > > > shouldn't
> >> > > > > > > have a problem if data keeps continuously arriving.
> >> > > > > > >
> >> > > > > > > Cheers,
> >> > > > > > > Aljoscha
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Tue, 28 Jun 2016 at 17:14 Vinay Patil <
> >> > vinay18.pa...@gmail.com>
> >> > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hi Aljoscha,
> >> > > > > > > >
> >> > > > > > > > Thanks a lot for your inputs.
> >> > > > > > > >
> >> > > > > > > > I still did not get you when you say you will not face
> this
> >> > issue
> >> > > > in
> >> > > > > > case
> >> > > > > > > > of continuous stream, lets consider the following example
> :
> >> > > > > > > > Assume that the stream runs continuously from Monday  to
> >> > Friday,
> >> > > > and
> >> > > > > on
> >> > > > > > > > Friday it stops after 5.00 PM , will I still face this
> >> issue ?
> >> > > > > > > >
> >> > > > > > > > I am actually not able to understand how it will differ in
> >> real
> >> > > > time
> >> > > > > > > > streams.
> >> > > > > > > >
> >> > > > > > > > Regards,
> >> > > > > > > > Vinay Patil
> >> > > > > > > >
> >> > > > > > > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek <
> >> > > > > aljos...@apache.org
> >> > > > > > >
> >> > > > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Hi,
> >> > > > > > > > > ingestion time can only be used if you don't care about
> >> the
> >> > > > > timestamp
> >> > > > > > > in
> >> > > > > > > > > the elements. So if you have those you should probably
> use
> >> > > event
> >> > > > > > time.
> >> > > > > > > > >
> >> > > > > > > > > If your timestamps really are strictly increasing then
> the
> >> > > > > ascending
> >> > > > > > > > > extractor is good. And if you have a continuous stream
> of
> >> > > > incoming
> >> > > > > > > > elements
> >> > > > > > > > > you will not see the behavior of not getting the last
> >> > elements.
> >> > > > > > > > >
> >> > > > > > > > > By the way, when using Kafka you can also embed the
> >> timestamp
> >> > > > > > extractor
> >> > > > > > > > > directly in the Kafka consumer. This is described here:
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> >> > > > > > > > >
> >> > > > > > > > > Cheers,
> >> > > > > > > > > Aljoscha
> >> > > > > > > > >
> >> > > > > > > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil <
> >> > > > vinay18.pa...@gmail.com>
> >> > > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > Hi Aljoscha,
> >> > > > > > > > > >
> >> > > > > > > > > > Thank you for your response.
> >> > > > > > > > > > So do you suggest to use different approach for
> >> extracting
> >> > > > > > timestamp
> >> > > > > > > > (as
> >> > > > > > > > > > given in document) instead of AscendingTimeStamp
> >> Extractor
> >> > ?
> >> > > > > > > > > > Is that the reason I am seeing this unexpected
> >> behaviour ?
> >> > in
> >> > > > > case
> >> > > > > > of
> >> > > > > > > > > > continuous stream I would not see any data loss ?
> >> > > > > > > > > >
> >> > > > > > > > > > Also assuming that the records are always going to be
> in
> >> > > order
> >> > > > ,
> >> > > > > > > which
> >> > > > > > > > is
> >> > > > > > > > > > the best approach : Ingestion Time or Event Time ?
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > Regards,
> >> > > > > > > > > > Vinay Patil
> >> > > > > > > > > >
> >> > > > > > > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <
> >> > > > > > > aljos...@apache.org
> >> > > > > > > > >
> >> > > > > > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > > Hi,
> >> > > > > > > > > > > first regarding tumbling windows: even if you have 5
> >> > minute
> >> > > > > > windows
> >> > > > > > > > it
> >> > > > > > > > > > can
> >> > > > > > > > > > > happen that elements that are only seconds apart go
> >> into
> >> > > > > > different
> >> > > > > > > > > > windows.
> >> > > > > > > > > > > Consider the following case:
> >> > > > > > > > > > >
> >> > > > > > > > > > > |                x | x                 |
> >> > > > > > > > > > >
> >> > > > > > > > > > > These are two 5-mintue windows and the two elements
> >> are
> >> > > only
> >> > > > > > > seconds
> >> > > > > > > > > > apart
> >> > > > > > > > > > > but go into different windows because windows are
> >> aligned
> >> > > to
> >> > > > > > epoch.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Now, for the ascending timestamp extractor. The
> reason
> >> > this
> >> > > > can
> >> > > > > > > > behave
> >> > > > > > > > > in
> >> > > > > > > > > > > unexpected ways is that it emits a watermark that is
> >> > "last
> >> > > > > > > timestamp
> >> > > > > > > > -
> >> > > > > > > > > > 1",
> >> > > > > > > > > > > i.e. if it has seen timestamp t it can only emit
> >> > watermark
> >> > > > t-1
> >> > > > > > > > because
> >> > > > > > > > > > > there might be other elements with timestamp t
> >> arriving.
> >> > If
> >> > > > you
> >> > > > > > > have
> >> > > > > > > > a
> >> > > > > > > > > > > continuous stream of elements you wouldn't notice
> >> this.
> >> > > Only
> >> > > > in
> >> > > > > > > this
> >> > > > > > > > > > > constructed example does it become visible.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Cheers,
> >> > > > > > > > > > > Aljoscha
> >> > > > > > > > > > >
> >> > > > > > > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <
> >> > > > > > vinay18.pa...@gmail.com>
> >> > > > > > > > > > wrote:
> >> > > > > > > > > > >
> >> > > > > > > > > > > > Hi,
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Following is the timestamp I am getting from DTO,
> >> here
> >> > is
> >> > > > the
> >> > > > > > > > > timestamp
> >> > > > > > > > > > > > difference between the two records :
> >> > > > > > > > > > > > 1466115892162154279
> >> > > > > > > > > > > > 1466116026233613409
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > So the time difference is roughly 3 min, even if I
> >> > apply
> >> > > > the
> >> > > > > > > window
> >> > > > > > > > > of
> >> > > > > > > > > > > 5min
> >> > > > > > > > > > > > , I am not getting the last record (last timestamp
> >> > value
> >> > > > > > above),
> >> > > > > > > > > > > > using ascending timestamp extractor for generating
> >> the
> >> > > > > > timestamp
> >> > > > > > > > > > > (assuming
> >> > > > > > > > > > > > that the timestamp are always in order)
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > I was at-least expecting data to reach the
> co-group
> >> > > > function.
> >> > > > > > > > > > > > What could be the reason for the data loss ? The
> >> data
> >> > we
> >> > > > are
> >> > > > > > > > getting
> >> > > > > > > > > is
> >> > > > > > > > > > > > critical, hence we cannot afford to loose any data
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Regards,
> >> > > > > > > > > > > > Vinay Patil
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <
> >> > > > > > > > > vinay18.pa...@gmail.com
> >> > > > > > > > > > >
> >> > > > > > > > > > > > wrote:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > > Just an update, when I keep IngestionTime and
> >> remove
> >> > > the
> >> > > > > > > > timestamp
> >> > > > > > > > > I
> >> > > > > > > > > > am
> >> > > > > > > > > > > > > generating, I am getting all the records, but
> for
> >> > Event
> >> > > > > Time
> >> > > > > > I
> >> > > > > > > am
> >> > > > > > > > > > > getting
> >> > > > > > > > > > > > > one less record, I checked the Time Difference
> >> > between
> >> > > > two
> >> > > > > > > > records,
> >> > > > > > > > > > it
> >> > > > > > > > > > > > is 3
> >> > > > > > > > > > > > > min, I tried keeping the window time to 5 mins,
> >> but
> >> > > that
> >> > > > > even
> >> > > > > > > did
> >> > > > > > > > > not
> >> > > > > > > > > > > > work.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Even when I try assigning timestamp for
> >> > IngestionTime,
> >> > > I
> >> > > > > get
> >> > > > > > > one
> >> > > > > > > > > > record
> >> > > > > > > > > > > > > less, so should I safely use Ingestion Time or
> is
> >> it
> >> > > > always
> >> > > > > > > > > advisable
> >> > > > > > > > > > > to
> >> > > > > > > > > > > > > use EventTime ?
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Regards,
> >> > > > > > > > > > > > > Vinay Patil
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
> >> > > > > > > > > > vinay18.pa...@gmail.com>
> >> > > > > > > > > > > > > wrote:
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >> Hi ,
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> Actually I am only publishing 5 messages each
> to
> >> two
> >> > > > > > different
> >> > > > > > > > > kafka
> >> > > > > > > > > > > > >> topics (using Junit), even if I keep the window
> >> to
> >> > 500
> >> > > > > > seconds
> >> > > > > > > > the
> >> > > > > > > > > > > > result
> >> > > > > > > > > > > > >> is same.
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> I am not understanding why it is not sending
> the
> >> 5th
> >> > > > > element
> >> > > > > > > to
> >> > > > > > > > > > > co-group
> >> > > > > > > > > > > > >> operator even when the keys are same.
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> I actually cannot share the actual client code.
> >> > > > > > > > > > > > >> But this is what the streams look like :
> >> > > > > > > > > > > > >> sourceStream.coGroup(destStream)
> >> > > > > > > > > > > > >> here the sourceStream and destStream is
> actually
> >> > > > > > > > > Tuple2<String,DTO>
> >> > > > > > > > > > ,
> >> > > > > > > > > > > > and
> >> > > > > > > > > > > > >> the ElementSelector returns tuple.f0 which is
> the
> >> > key.
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> I am generating the timestamp based on a field
> >> from
> >> > > the
> >> > > > > DTO
> >> > > > > > > > which
> >> > > > > > > > > is
> >> > > > > > > > > > > > >> guaranteed to be in order.
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> Will using the triggers help here ?
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> Regards,
> >> > > > > > > > > > > > >> Vinay Patil
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> *+91-800-728-4749*
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha
> >> Krettek <
> >> > > > > > > > > > > aljos...@apache.org>
> >> > > > > > > > > > > > >> wrote:
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >>> Hi,
> >> > > > > > > > > > > > >>> what timestamps are you assigning? Is it
> >> guaranteed
> >> > > > that
> >> > > > > > all
> >> > > > > > > of
> >> > > > > > > > > > them
> >> > > > > > > > > > > > >>> would
> >> > > > > > > > > > > > >>> fall into the same 30 second window?
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>> The issue with duplicate printing in the
> >> > > > ElementSelector
> >> > > > > is
> >> > > > > > > > > > strange?
> >> > > > > > > > > > > > >>> Could
> >> > > > > > > > > > > > >>> you post a more complete code example so that
> I
> >> can
> >> > > > > > reproduce
> >> > > > > > > > the
> >> > > > > > > > > > > > >>> problem?
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>> Cheers,
> >> > > > > > > > > > > > >>> Aljoscha
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <
> >> > > > > > > > > vinay18.pa...@gmail.com>
> >> > > > > > > > > > > > >>> wrote:
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>> > Hi ,
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > I am able to get the matching and
> non-matching
> >> > > > > elements.
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > However when I am unit testing the code , I
> am
> >> > > > getting
> >> > > > > > one
> >> > > > > > > > > record
> >> > > > > > > > > > > > less
> >> > > > > > > > > > > > >>> > inside the overriden cogroup function.
> >> > > > > > > > > > > > >>> > Testing the following way :
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > 1) Insert 5 messages into local kafka topic
> >> > (test1)
> >> > > > > > > > > > > > >>> > 2) Insert different 5 messages into local
> >> kafka
> >> > > topic
> >> > > > > > > (test2)
> >> > > > > > > > > > > > >>> > 3) Consume 1) and 2) and I have two
> different
> >> > kafka
> >> > > > > > > streams
> >> > > > > > > > > > > > >>> > 4) Generate ascending timestamp(using Event
> >> Time)
> >> > > for
> >> > > > > > both
> >> > > > > > > > > > streams
> >> > > > > > > > > > > > and
> >> > > > > > > > > > > > >>> > create key(String)
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > Now till 4) I am able to get all the records
> >> > > (checked
> >> > > > > by
> >> > > > > > > > > printing
> >> > > > > > > > > > > the
> >> > > > > > > > > > > > >>> > stream in text file)
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > However when I send the stream to co-group
> >> > > operator,
> >> > > > I
> >> > > > > am
> >> > > > > > > > > > receiving
> >> > > > > > > > > > > > one
> >> > > > > > > > > > > > >>> > less record, using the following syntax:
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > sourceStream.coGroup(destStream)
> >> > > > > > > > > > > > >>> > .where(new ElementSelector())
> >> > > > > > > > > > > > >>> > .equalTo(new ElementSelector())
> >> > > > > > > > > > > > >>> >
> >> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> >> > > > > > > > > > > > >>> > .apply(new JoinStreams);
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > Also in the Element Selector I have
> inserted a
> >> > > > sysout,
> >> > > > > I
> >> > > > > > am
> >> > > > > > > > > > getting
> >> > > > > > > > > > > > 20
> >> > > > > > > > > > > > >>> > sysouts instead of 10 (10 sysouts for source
> >> and
> >> > 10
> >> > > > for
> >> > > > > > > dest
> >> > > > > > > > > > > stream)
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > Unable to understand why one record is
> coming
> >> > less
> >> > > to
> >> > > > > > > > co-group
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > Regards,
> >> > > > > > > > > > > > >>> > Vinay Patil
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian
> >> Hueske <
> >> > > > > > > > > > fhue...@gmail.com>
> >> > > > > > > > > > > > >>> wrote:
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > > Can you add a flag to each element emitted
> >> by
> >> > the
> >> > > > > > > > > > CoGroupFunction
> >> > > > > > > > > > > > >>> that
> >> > > > > > > > > > > > >>> > > indicates whether it was joined or not?
> >> > > > > > > > > > > > >>> > > Then you can use split to distinguish
> >> between
> >> > > both
> >> > > > > > cases
> >> > > > > > > > and
> >> > > > > > > > > > > handle
> >> > > > > > > > > > > > >>> both
> >> > > > > > > > > > > > >>> > > streams differently.
> >> > > > > > > > > > > > >>> > >
> >> > > > > > > > > > > > >>> > > Best, Fabian
> >> > > > > > > > > > > > >>> > >
> >> > > > > > > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <
> >> > > > > > > > > vinay18.pa...@gmail.com
> >> > > > > > > > > > >:
> >> > > > > > > > > > > > >>> > >
> >> > > > > > > > > > > > >>> > > > Hi Jark,
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > I am able to get the non-matching
> elements
> >> > in a
> >> > > > > > stream
> >> > > > > > > :,
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > Of-course we can collect the matching
> >> > elements
> >> > > in
> >> > > > > the
> >> > > > > > > > same
> >> > > > > > > > > > > stream
> >> > > > > > > > > > > > >>> as
> >> > > > > > > > > > > > >>> > > well,
> >> > > > > > > > > > > > >>> > > > however I want to perform additional
> >> > operations
> >> > > > on
> >> > > > > > the
> >> > > > > > > > > joined
> >> > > > > > > > > > > > >>> stream
> >> > > > > > > > > > > > >>> > > before
> >> > > > > > > > > > > > >>> > > > writing it to S3, so I would have to
> >> include
> >> > a
> >> > > > > > separate
> >> > > > > > > > > join
> >> > > > > > > > > > > > >>> operator
> >> > > > > > > > > > > > >>> > for
> >> > > > > > > > > > > > >>> > > > the same two streams, right ?
> >> > > > > > > > > > > > >>> > > > Correct me if I am wrong.
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > I have pasted the dummy code which
> >> collects
> >> > the
> >> > > > > > > > > non-matching
> >> > > > > > > > > > > > >>> records (i
> >> > > > > > > > > > > > >>> > > > have to perform this on the actual data,
> >> > > correct
> >> > > > me
> >> > > > > > if
> >> > > > > > > I
> >> > > > > > > > am
> >> > > > > > > > > > > dong
> >> > > > > > > > > > > > >>> > wrong).
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > >
> sourceStream.coGroup(destStream).where(new
> >> > > > > > > > > > > > >>> > ElementSelector()).equalTo(new
> >> > > > > > > > > > > > >>> > > > ElementSelector())
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> >> > > > > > > > > > > > >>> > > > .apply(new CoGroupFunction<Integer,
> >> Integer,
> >> > > > > > > Integer>() {
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > private static final long
> >> serialVersionUID =
> >> > > > > > > > > > > > 6408179761497497475L;
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > @Override
> >> > > > > > > > > > > > >>> > > > public void coGroup(Iterable<Integer>
> >> > > > > paramIterable,
> >> > > > > > > > > > > > >>> Iterable<Integer>
> >> > > > > > > > > > > > >>> > > > paramIterable1,
> >> > > > > > > > > > > > >>> > > > Collector<Integer> paramCollector)
> throws
> >> > > > > Exception {
> >> > > > > > > > > > > > >>> > > > long exactSizeIfKnown =
> >> > > > > > > > > > > > >>> > >
> >> > > paramIterable.spliterator().getExactSizeIfKnown();
> >> > > > > > > > > > > > >>> > > > long exactSizeIfKnown2 =
> >> > > > > > > > > > > > >>> > > >
> >> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> >> > > > > > > > > > > > >>> > > > if(exactSizeIfKnown == 0 ) {
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > paramCollector.collect(paramIterable1.iterator().next());
> >> > > > > > > > > > > > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > paramCollector.collect(paramIterable.iterator().next());
> >> > > > > > > > > > > > >>> > > > }
> >> > > > > > > > > > > > >>> > > > }
> >> > > > > > > > > > > > >>> > > > }).print();
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > Regards,
> >> > > > > > > > > > > > >>> > > > Vinay Patil
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay
> >> Patil
> >> > <
> >> > > > > > > > > > > > >>> vinay18.pa...@gmail.com>
> >> > > > > > > > > > > > >>> > > > wrote:
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > > You are right, debugged it for all
> >> elements
> >> > > , I
> >> > > > > can
> >> > > > > > > do
> >> > > > > > > > > that
> >> > > > > > > > > > > > now.
> >> > > > > > > > > > > > >>> > > > > Thanks a lot.
> >> > > > > > > > > > > > >>> > > > >
> >> > > > > > > > > > > > >>> > > > > Regards,
> >> > > > > > > > > > > > >>> > > > > Vinay Patil
> >> > > > > > > > > > > > >>> > > > >
> >> > > > > > > > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark
> >> Wu <
> >> > > > > > > > > > > > >>> > wuchong...@alibaba-inc.com>
> >> > > > > > > > > > > > >>> > > > > wrote:
> >> > > > > > > > > > > > >>> > > > >
> >> > > > > > > > > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1,
> >> > > > > > > Iterable<Integer>
> >> > > > > > > > > > > iter2,
> >> > > > > > > > > > > > >>> > > > >> Collector<Integer> out)` ,   when
> both
> >> > iter1
> >> > > > and
> >> > > > > > > iter2
> >> > > > > > > > > are
> >> > > > > > > > > > > not
> >> > > > > > > > > > > > >>> > empty,
> >> > > > > > > > > > > > >>> > > it
> >> > > > > > > > > > > > >>> > > > >> means they are matched elements from
> >> both
> >> > > > > stream.
> >> > > > > > > > > > > > >>> > > > >> When one of iter1 and iter2 is empty
> ,
> >> it
> >> > > > means
> >> > > > > > that
> >> > > > > > > > > they
> >> > > > > > > > > > > are
> >> > > > > > > > > > > > >>> > > unmatched.
> >> > > > > > > > > > > > >>> > > > >>
> >> > > > > > > > > > > > >>> > > > >>
> >> > > > > > > > > > > > >>> > > > >> - Jark Wu (wuchong)
> >> > > > > > > > > > > > >>> > > > >>
> >> > > > > > > > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <
> >> > > > > > > > > > vinay18.pa...@gmail.com
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > >>> 写道:
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > Hi Matthias ,
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > I did not get you, even if we use
> >> > Co-Group
> >> > > > we
> >> > > > > > have
> >> > > > > > > > to
> >> > > > > > > > > > > apply
> >> > > > > > > > > > > > >>> it on
> >> > > > > > > > > > > > >>> > a
> >> > > > > > > > > > > > >>> > > > key
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > sourceStream.coGroup(destStream)
> >> > > > > > > > > > > > >>> > > > >> > .where(new ElementSelector())
> >> > > > > > > > > > > > >>> > > > >> > .equalTo(new ElementSelector())
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> >> > > > > > > > > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer,
> >> > > Integer,
> >> > > > > > > > > Integer>()
> >> > > > > > > > > > {
> >> > > > > > > > > > > > >>> > > > >> > private static final long
> >> > > serialVersionUID =
> >> > > > > > > > > > > > >>> 6408179761497497475L;
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > @Override
> >> > > > > > > > > > > > >>> > > > >> > public void
> coGroup(Iterable<Integer>
> >> > > > > > > paramIterable,
> >> > > > > > > > > > > > >>> > > Iterable<Integer>
> >> > > > > > > > > > > > >>> > > > >> > paramIterable1,
> >> > > > > > > > > > > > >>> > > > >> > Collector<Integer> paramCollector)
> >> > throws
> >> > > > > > > Exception
> >> > > > > > > > {
> >> > > > > > > > > > > > >>> > > > >> > Iterator<Integer> iterator =
> >> > > > > > > > paramIterable.iterator();
> >> > > > > > > > > > > > >>> > > > >> > while(iterator.hasNext()) {
> >> > > > > > > > > > > > >>> > > > >> > }
> >> > > > > > > > > > > > >>> > > > >> > }
> >> > > > > > > > > > > > >>> > > > >> > });
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > when I debug this ,only the matched
> >> > > element
> >> > > > > from
> >> > > > > > > > both
> >> > > > > > > > > > > stream
> >> > > > > > > > > > > > >>> will
> >> > > > > > > > > > > > >>> > > come
> >> > > > > > > > > > > > >>> > > > >> in
> >> > > > > > > > > > > > >>> > > > >> > the coGroup function.
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > What I want is how do I check for
> >> > > unmatched
> >> > > > > > > elements
> >> > > > > > > > > > from
> >> > > > > > > > > > > > both
> >> > > > > > > > > > > > >>> > > streams
> >> > > > > > > > > > > > >>> > > > >> and
> >> > > > > > > > > > > > >>> > > > >> > write it to sink.
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > Regards,
> >> > > > > > > > > > > > >>> > > > >> > Vinay Patil
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > *+91-800-728-4749*
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM,
> >> > Matthias
> >> > > J.
> >> > > > > > Sax <
> >> > > > > > > > > > > > >>> > mj...@apache.org>
> >> > > > > > > > > > > > >>> > > > >> wrote:
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> >> You need to do an outer-join.
> >> However,
> >> > > > there
> >> > > > > is
> >> > > > > > > no
> >> > > > > > > > > > > build-in
> >> > > > > > > > > > > > >>> > support
> >> > > > > > > > > > > > >>> > > > for
> >> > > > > > > > > > > > >>> > > > >> >> outer-joins yet.
> >> > > > > > > > > > > > >>> > > > >> >>
> >> > > > > > > > > > > > >>> > > > >> >> You can use Window-CoGroup to
> >> implement
> >> > > the
> >> > > > > > > > > outer-join
> >> > > > > > > > > > as
> >> > > > > > > > > > > > an
> >> > > > > > > > > > > > >>> own
> >> > > > > > > > > > > > >>> > > > >> operator.
> >> > > > > > > > > > > > >>> > > > >> >>
> >> > > > > > > > > > > > >>> > > > >> >>
> >> > > > > > > > > > > > >>> > > > >> >> -Matthias
> >> > > > > > > > > > > > >>> > > > >> >>
> >> > > > > > > > > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay
> Patil
> >> > > wrote:
> >> > > > > > > > > > > > >>> > > > >> >>> Hi,
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> I have a question regarding the
> >> join
> >> > > > > > operation,
> >> > > > > > > > > > consider
> >> > > > > > > > > > > > the
> >> > > > > > > > > > > > >>> > > > following
> >> > > > > > > > > > > > >>> > > > >> >>> dummy example:
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> StreamExecutionEnvironment env =
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > >
> >> > > > >
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> >> > > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer>
> >> > sourceStream =
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > env.fromElements(10,20,23,25,30,33,102,18);
> >> > > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer>
> >> destStream =
> >> > > > > > > > > > > > >>> > > > >> >>
> env.fromElements(20,30,40,50,60,10);
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> sourceStream.join(destStream)
> >> > > > > > > > > > > > >>> > > > >> >>> .where(new ElementSelector())
> >> > > > > > > > > > > > >>> > > > >> >>> .equalTo(new ElementSelector())
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > >
> >> > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> >> > > > > > > > > > > > >>> > > > >> >>> .apply(new JoinFunction<Integer,
> >> > > Integer,
> >> > > > > > > > > Integer>() {
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> private static final long
> >> > > > serialVersionUID =
> >> > > > > > 1L;
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> @Override
> >> > > > > > > > > > > > >>> > > > >> >>> public Integer join(Integer
> >> paramIN1,
> >> > > > > Integer
> >> > > > > > > > > > paramIN2)
> >> > > > > > > > > > > > >>> throws
> >> > > > > > > > > > > > >>> > > > >> Exception
> >> > > > > > > > > > > > >>> > > > >> >> {
> >> > > > > > > > > > > > >>> > > > >> >>> return paramIN1;
> >> > > > > > > > > > > > >>> > > > >> >>> }
> >> > > > > > > > > > > > >>> > > > >> >>> }).print();
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> I perfectly get the elements that
> >> are
> >> > > > > matching
> >> > > > > > > in
> >> > > > > > > > > both
> >> > > > > > > > > > > the
> >> > > > > > > > > > > > >>> > > streams,
> >> > > > > > > > > > > > >>> > > > >> >> however
> >> > > > > > > > > > > > >>> > > > >> >>> my requirement is to write these
> >> > matched
> >> > > > > > > elements
> >> > > > > > > > > and
> >> > > > > > > > > > > also
> >> > > > > > > > > > > > >>> the
> >> > > > > > > > > > > > >>> > > > >> unmatched
> >> > > > > > > > > > > > >>> > > > >> >>> elements to sink(S3)
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> How do I get the unmatched
> elements
> >> > from
> >> > > > > each
> >> > > > > > > > > stream ?
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> Regards,
> >> > > > > > > > > > > > >>> > > > >> >>> Vinay Patil
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>
> >> > > > > > > > > > > > >>> > > > >> >>
> >> > > > > > > > > > > > >>> > > > >>
> >> > > > > > > > > > > > >>> > > > >>
> >> > > > > > > > > > > > >>> > > > >
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > >
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to