Thanks David,

I figured that the correct approach would obviously be to adopt a keying 
strategy upstream to ensure the same data that I used as a key downstream fell 
on the same partition (ensuring the ordering guarantees I’m looking for).

I’m guessing implementation-wise, when I would normally evict a window after 
some event time and allowed lateness, I could set a timer or just explicitly 
keep the window open for some additional time to allow for out of order data to 
make its way into the window.

Either way - I think the keying is probably the right approach, but I wanted to 
consider any other options should that become an issue upstream.

Thanks!

Rion

> On Feb 27, 2021, at 10:21 AM, David Anderson <dander...@apache.org> wrote:
> 
> 
> Rion,
> 
> If you can arrange for each tenant's events to be in only one kafka 
> partition, that should be the best way to simplify the processing you need to 
> do. Otherwise, a simple change that may help would be to increase the bounded 
> delay you use in calculating your own per-tenant watermarks, thereby making 
> late events less likely.
> 
> David
> 
>> On Sat, Feb 27, 2021 at 3:29 AM Rion Williams <rionmons...@gmail.com> wrote:
>> David and Timo,
>> 
>> Firstly, thank you both so much for your contributions and advice. I believe 
>> I’ve implemented things along the lines that you both detailed and things 
>> appear to work just as expected (e.g. I can see things arriving, being added 
>> to windows, discarding late records, and ultimately writing out files as 
>> expected).
>> 
>> With that said, I have one question / issue that I’ve run into with handling 
>> the data coming my Kafka topic. Currently, my tenant/source (i.e. my key) 
>> may be distributed across the 10 partitions of my Kafka topic. With the way 
>> that I’m consuming from this topic (with a Kafka Consumer), it looks like my 
>> data is arriving in a mixed order which seems to be causing my own 
>> watermarks (those stored in my ValueState) to process as later data may 
>> arrive earlier than other data and cause my windows to be evicted.
>> 
>> I’m currently using the `withNoWatermarks()` along with a custom timestamp 
>> assigned to handle all of my timestamping, but is there a mechanism to 
>> handle the mixed ordering across partitions in this scenario at the Flink 
>> level?
>> 
>> I know the answer here likely lies with Kafka and adopting a better keying 
>> strategy to ensure the same tenant/source (my key) lands on the same 
>> partition, which by definition ensures ordering. I’m just wondering if 
>> there’s some mechanism to accomplish this post-reading from Kafka in Flink 
>> within my pipeline to handle things in a similar fashion?
>> 
>> Again - thank you both so much, I’m loving the granularity and control that 
>> Flink has been providing me over other streaming technologies I’ve used in 
>> the past. I’m totally sold on it and am looking forward to doing more 
>> incredible things with it.
>> 
>> Best regards,
>> 
>> Rion
>> 
>>>> On Feb 26, 2021, at 4:36 AM, David Anderson <dander...@apache.org> wrote:
>>>> 
>>> 
>>> Yes indeed, Timo is correct -- I am proposing that you not use timers at 
>>> all. Watermarks and event-time timers go hand in hand -- and neither 
>>> mechanism can satisfy your requirements.
>>> 
>>> You can instead put all of the timing logic in the processElement method -- 
>>> effectively emulating what you would get if Flink were to offer per-key 
>>> watermarking.
>>> 
>>> The reason that the PseudoWindow example is using MapState is that for each 
>>> key/tenant, more than one window can be active simultaneously. This occurs 
>>> because the event stream is out-of-order with respect to time, so events 
>>> for the "next window" are probably being processed before "the previous" 
>>> window is complete. And if you want to accommodate allowed lateness, the 
>>> requirement to have several windows open at once becomes even more 
>>> important. 
>>> 
>>> MapState gives you a per-tenant hashmap, where each entry in that map 
>>> corresponds to an open window for some particular tenant, where the map's 
>>> key is the timestamp for a window, and the value is whatever state you want 
>>> that window to hold.
>>> 
>>> Best regards,
>>> David
>>> 
>>> 
>>> 
>>> 
>>>> On Fri, Feb 26, 2021 at 9:44 AM Timo Walther <twal...@apache.org> wrote:
>>>> Hi Rion,
>>>> 
>>>> I think what David was refering to is that you do the entire time 
>>>> handling yourself in process function. That means not using the 
>>>> `context.timerService()` or `onTimer()` that Flink provides but calling 
>>>> your own logic based on the timestamps that enter your process function 
>>>> and the stored state.
>>>> 
>>>> Regards,
>>>> Timo
>>>> 
>>>> 
>>>> On 26.02.21 00:29, Rion Williams wrote:
>>>> > 
>>>> > Hi David,
>>>> > 
>>>> > Thanks for your prompt reply, it was very helpful and the PseudoWindow 
>>>> > example is excellent. I believe it closely aligns with an approach that 
>>>> > I was tinkering with but seemed to be missing a few key pieces. In my 
>>>> > case, I'm essentially going to want to be aggregating the messages that 
>>>> > are coming into the window (a simple string-concatenation aggregation 
>>>> > would work). Would I need another form of state to hold that, as looking 
>>>> > through this example with naive eyes, it seems that this function is 
>>>> > currently storing multiple windows in state via the MapState provided:
>>>> > 
>>>> > // Keyed, managed state, with an entry for each window, keyed by the 
>>>> > window's end time.
>>>> > // There is a separate MapState object for each driver.
>>>> > private transient MapState<Long, Float> sumOfTips;
>>>> > 
>>>> > If I wanted to perform an aggregation for each key/tenant, would a 
>>>> > MapState be appropriate? Such as a MapState<Long, String> if I was doing 
>>>> > a string aggregation, so that within my processElement function I could 
>>>> > use something similar for building these aggregations and ultimately 
>>>> > triggering them:
>>>> > 
>>>> > // Keep track of a tenant/source specific watermark
>>>> > private lateinit var currentWatermark: ValueState<Long>
>>>> > // Keep track of the contents of each of the windows where the key 
>>>> > represents the close
>>>> > // of the window and the contents represents an accumulation of the 
>>>> > records for that window
>>>> > private lateinit var windowContents: MapState<Long, String>
>>>> > 
>>>> > If that's the case, this is what I've thrown together thus far and I 
>>>> > feel like it's moving in the right direction:
>>>> > 
>>>> > class MagicWindow(private val duration: Long, private val lateness: 
>>>> > Long):
>>>> >      KeyedProcessFunction<String, Event, FileOutput>(){
>>>> > 
>>>> >      // Keep track of a tenant/source specific watermark
>>>> >      private lateinit var currentWatermark: ValueState<Long>
>>>> >      // Keep track of the contents of each of the windows where the key 
>>>> > represents the close
>>>> >      // of the window and the contents represents an accumulation of the 
>>>> > records for that window
>>>> >      private lateinit var windowContents: MapState<Long, String>
>>>> > 
>>>> >      override fun open(config: Configuration) {
>>>> >          currentWatermark = runtimeContext.getState(watermark)
>>>> >          currentWatermark.update(Long.MIN_VALUE)
>>>> >      }
>>>> > 
>>>> >      override fun processElement(element: Event, context: Context, out: 
>>>> > Collector<FileOutput>) {
>>>> >          // Resolve the event time
>>>> >          val eventTime: Long = getEventTime(element)
>>>> > 
>>>> >          // Update watermark (if applicable)
>>>> >          if (currentWatermark.value() < eventTime){
>>>> >              currentWatermark.update(eventTime)
>>>> >          }
>>>> > 
>>>> >          // Define a timer for this window
>>>> >          val timerService = context.timerService()
>>>> > 
>>>> >          if (eventTime <= timerService.currentWatermark()) {
>>>> >              // This event is late; its window has already been 
>>>> > triggered.
>>>> >          } else {
>>>> >              // Determine the "actual" window closure and start a timer 
>>>> > for it
>>>> >              // (eventTime + window
>>>> >              val endOfWindow= eventTime - (eventTime % duration) + 
>>>> > duration - 1
>>>> > 
>>>> >              // Schedule a callback for when the window has been 
>>>> > completed.
>>>> >              timerService.registerEventTimeTimer(endOfWindow)
>>>> > 
>>>> >              // Add this element to the corresponding aggregation for 
>>>> > this window
>>>> >              windowContents.put(endOfWindow, windowContents[endOfWindow] 
>>>> > + "$element")
>>>> >          }
>>>> >      }
>>>> > 
>>>> >      override fun onTimer(timestamp: Long, context: OnTimerContext, out: 
>>>> > Collector<FileOutput>) {
>>>> >          val key = context.currentKey
>>>> >          val currentAggregation: String = windowContents.get(timestamp)
>>>> > 
>>>> >          // Output things here and clear the current aggregation for this
>>>> >          // tenant/source combination in this window
>>>> >      }
>>>> > 
>>>> >      companion object {
>>>> >          private val watermark = ValueStateDescriptor(
>>>> >              "watermark",
>>>> >              Long::class.java
>>>> >          )
>>>> > 
>>>> >          private val windowContents = MapStateDescriptor(
>>>> >              "window-contents",
>>>> >              Long::class.java,
>>>> >              String::class.java
>>>> >          )
>>>> > 
>>>> >          fun getEventTime(element: Event): Long {
>>>> >              return Instant(element.`source$1`.createdTimestamp).millis
>>>> >          }
>>>> >      }
>>>> > }
>>>> > 
>>>> > Is something glaringly off with this? I’ll need to do some additionally 
>>>> > reading on the timers, but any additional clarification would be greatly 
>>>> > appreciated.
>>>> > 
>>>> > Thanks so much for your initial response again!
>>>> > 
>>>> > Rion
>>>> > 
>>>> >> On Feb 25, 2021, at 3:27 PM, David Anderson <dander...@apache.org> 
>>>> >> wrote:
>>>> >>
>>>> >> 
>>>> >> Rion,
>>>> >>
>>>> >> What you want isn't really achievable with the APIs you are using. 
>>>> >> Without some sort of per-key (per-tenant) watermarking -- which Flink 
>>>> >> doesn't offer -- the watermarks and windows for one tenant can be held 
>>>> >> up by the failure of another tenant's events to arrive in a timely 
>>>> >> manner.
>>>> >>
>>>> >> However, your pipeline is pretty straightforward, and it shouldn't be 
>>>> >> particularly difficult to accomplish what you want. What you can do is 
>>>> >> to ignore the built-in watermarking and windowing APIs, and build 
>>>> >> equivalent functionality in the form of a KeyedProcessFunction.
>>>> >>
>>>> >> The Flink docs include an example [1] showing how to implement your 
>>>> >> own tumbling event time windows with a process function. That 
>>>> >> implementation assumes you can rely on watermarks for triggering the 
>>>> >> windows; you'll have to do that differently.
>>>> >>
>>>> >> What you can do instead is to track, in ValueState, the largest 
>>>> >> timestamp you've seen so far (for each key/tenant). Whenever that 
>>>> >> advances, you can subtract the bounded-out-of-orderness duration from 
>>>> >> that timestamp, and then check to see if the resulting value is now 
>>>> >> large enough to trigger any of the windows for that key/tenant.
>>>> >>
>>>> >> Handling allowed lateness should be pretty straightforward.
>>>> >>
>>>> >> Hope this helps,
>>>> >> David
>>>> >>
>>>> >> [1] 
>>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example
>>>> >>  
>>>> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example>
>>>> >>
>>>> >> On Thu, Feb 25, 2021 at 9:05 PM Rion Williams <rionmons...@gmail.com 
>>>> >> <mailto:rionmons...@gmail.com>> wrote:
>>>> >>
>>>> >>     Hey folks, I have a somewhat high-level/advice question regarding
>>>> >>     Flink and if it has the mechanisms in place to accomplish what I’m
>>>> >>     trying to do. I’ve spent a good bit of time using Apache Beam, but
>>>> >>     recently pivoted over to native Flink simply because some of the
>>>> >>     connectors weren’t as mature or didn’t support some of the
>>>> >>     functionality that I needed.
>>>> >>
>>>> >>     Basically - I have a single Kafka topic with 10 partitions that
>>>> >>     I’m consuming from. This is a multi-tenant topic containing data
>>>> >>     that comes in at various times from various tenants and is not at
>>>> >>     all guaranteed to be in order, at least with regards to “event
>>>> >>     time”, which is what I care about.
>>>> >>
>>>> >>     What I’m trying to accomplish is this: *Given a multi-tenant topic
>>>> >>     with records eventually distributed across partitions, is it
>>>> >>     possible to consume and window each of these records independently
>>>> >>     of one another without one tenant potentially influencing another
>>>> >>     and write out to separate files per tenant/source (i.e. some other
>>>> >>     defined property on the records)?”
>>>> >>     *
>>>> >>     My pipeline currently looks something like this:
>>>> >>
>>>> >>     @JvmStatic
>>>> >>     fun main(args: Array<String>) {
>>>> >>         val pipeline = StreamExecutionEnvironment
>>>> >>             .getExecutionEnvironment()
>>>> >>             //.createLocalEnvironmentWithWebUI(Configuration())
>>>> >>
>>>> >>         val properties = buildPropertiesFromArgs(args)
>>>> >>         val stream = pipeline
>>>> >>             .addSource(readFromKafka("events", properties))
>>>> >>             .assignTimestampsAndWatermarks(
>>>> >>                 WatermarkStrategy
>>>> >>                    
>>>> >>     .forBoundedOutOfOrderness<Event>(Duration.ofSeconds(...))
>>>> >>                     .withTimestampAssigner { event: Event, _: Long ->
>>>> >>                         // Assign the created timestamp as the event
>>>> >>     timestamp
>>>> >>                         Instant(event.createdTimestamp).millis
>>>> >>                     }
>>>> >>             )
>>>> >>
>>>> >>         // There are multiple data sources that each have their own
>>>> >>     windows and allowed lateness
>>>> >>         // so ensure that each source only handles records for it
>>>> >>         DataSources.forEach { source ->
>>>> >>             stream
>>>> >>                 .filter { event ->
>>>> >>                     event.source == source.name <http://source.name>
>>>> >>                 }
>>>> >>                 .keyBy { event ->
>>>> >>                     //print("Keying record with id ${record.`id$1`} by
>>>> >>     tenant ${record.`source$1`.tenantName}")
>>>> >>                     event.tenant
>>>> >>                 }
>>>> >>                 .window(
>>>> >>                    
>>>> >>     TumblingEventTimeWindows.of(Time.minutes(source.windowDuration))
>>>> >>                 )
>>>> >>                 .allowedLateness(
>>>> >>                     Time.minutes(source.allowedLateness)
>>>> >>                 )
>>>> >>                 .process(
>>>> >>                     // This just contains some logic to take the
>>>> >>     existing windows and construct a file
>>>> >>                     // using the window range and keys (tenant/source)
>>>> >>     with the values being
>>>> >>                     // an aggregation of all of the records
>>>> >>                     WindowedEventProcessFunction(source.name
>>>> >>     <http://source.name>)
>>>> >>                 )
>>>> >>                 .map { summary ->
>>>> >>                     // This would be a sink to write to a file
>>>> >>                 }
>>>> >>         }
>>>> >>         pipeline.execute("event-processor")
>>>> >>     }
>>>> >>
>>>> >>     My overarching question is really - *Can I properly separate the
>>>> >>     data with custom watermark strategies and ensure that keying (or
>>>> >>     some other construct) is enough to allow each tenant/source
>>>> >>     combination to be treated as it’s own stream with it’s own
>>>> >>     watermarking? *I know I could possibly break the single topic up
>>>> >>     into multiple disparate topics, however that level of granularity
>>>> >>     would likely result in several thousand (7000+) topics so I'm
>>>> >>     hoping that some of the constructs available within Flink may help
>>>> >>     with this (WatermarkStrategies, etc.)
>>>> >>
>>>> >>     Any recommendations / advice would be extremely helpful as I'm
>>>> >>     quite new to the Flink world, however I have quite a bit of
>>>> >>     experience in Apache Beam, Kafka Streams, and a smattering of
>>>> >>     other streaming technologies.
>>>> >>
>>>> >>     Thanks much,
>>>> >>
>>>> >>     Rion
>>>> >>
>>>> 

Reply via email to