When bounded Flink sources reach the end of their input, a special
watermark with the value Watermark.MAX_WATERMARK is emitted that will take
care of flushing all windows.

One approach is to use a DeserializationSchema or
KafkaDeserializationSchema with an implementation of isEndOfStream that
returns true when the end of the input stream has been reached; something
like this, perhaps:

public class TestDeserializer extends YourKafkaDeserializer<T> {
  public final static String END_APP_MARKER = "END_APP_MARKER"; // tests
send as last record

  @Override
  public boolean isEndOfStream(T nextElement) {
    if (END_APP_MARKER.equals(nextElement.getRawData()))
      return true;

    return false;
  }
}

Or with Flink 1.12, you could use the new KafkaSource with its setBounded
option.

Best,
David


On Mon, Mar 1, 2021 at 6:56 PM Rion Williams <rionmons...@gmail.com> wrote:

> Hey David et all,
>
> I had one follow up question for this as I've been putting together some
> integration/unit tests to verify that things are working as expected with
> finite datasets (e.g. a text file with several hundred records that are
> serialized, injected into Kafka, and processed through the pipeline). I'm
> wondering if there's a good strategy to handle these finite sets (i.e. when
> I'm done reading through all of the records that I care about, I'd need to
> trigger something to explicitly flush the windows / evict messages. I'm not
> sure what a great approach would be to handle here? I don't think there's
> an easy way to simulate processing time delays outside of an explicit
> Thread.sleep() call prior to injecting some messages into the running
> pipeline asynchronously.
>
> Any recommendations for handling something like this? I must imagine that
> it's a fairly common use-case for testing, but maybe not?
>
> Thanks much!
>
> Rion
>
> On Sat, Feb 27, 2021 at 10:56 AM Rion Williams <rionmons...@gmail.com>
> wrote:
>
>> Thanks David,
>>
>> I figured that the correct approach would obviously be to adopt a keying
>> strategy upstream to ensure the same data that I used as a key downstream
>> fell on the same partition (ensuring the ordering guarantees I’m looking
>> for).
>>
>> I’m guessing implementation-wise, when I would normally evict a window
>> after some event time and allowed lateness, I could set a timer or just
>> explicitly keep the window open for some additional time to allow for out
>> of order data to make its way into the window.
>>
>> Either way - I think the keying is probably the right approach, but I
>> wanted to consider any other options should that become an issue upstream.
>>
>> Thanks!
>>
>> Rion
>>
>> On Feb 27, 2021, at 10:21 AM, David Anderson <dander...@apache.org>
>> wrote:
>>
>> 
>> Rion,
>>
>> If you can arrange for each tenant's events to be in only one kafka
>> partition, that should be the best way to simplify the processing you need
>> to do. Otherwise, a simple change that may help would be to increase the
>> bounded delay you use in calculating your own per-tenant watermarks,
>> thereby making late events less likely.
>>
>> David
>>
>> On Sat, Feb 27, 2021 at 3:29 AM Rion Williams <rionmons...@gmail.com>
>> wrote:
>>
>>> David and Timo,
>>>
>>> Firstly, thank you both so much for your contributions and advice. I
>>> believe I’ve implemented things along the lines that you both detailed and
>>> things appear to work just as expected (e.g. I can see things arriving,
>>> being added to windows, discarding late records, and ultimately writing out
>>> files as expected).
>>>
>>> With that said, I have one question / issue that I’ve run into with
>>> handling the data coming my Kafka topic. Currently, my tenant/source (i.e.
>>> my key) may be distributed across the 10 partitions of my Kafka topic. With
>>> the way that I’m consuming from this topic (with a Kafka Consumer), it
>>> looks like my data is arriving in a mixed order which seems to be causing
>>> my own watermarks (those stored in my ValueState) to process as later data
>>> may arrive earlier than other data and cause my windows to be evicted.
>>>
>>> I’m currently using the `withNoWatermarks()` along with a custom
>>> timestamp assigned to handle all of my timestamping, but is there a
>>> mechanism to handle the mixed ordering across partitions in this scenario
>>> at the Flink level?
>>>
>>> I know the answer here likely lies with Kafka and adopting a better
>>> keying strategy to ensure the same tenant/source (my key) lands on the same
>>> partition, which by definition ensures ordering. I’m just wondering if
>>> there’s some mechanism to accomplish this post-reading from Kafka in Flink
>>> within my pipeline to handle things in a similar fashion?
>>>
>>> Again - thank you both so much, I’m loving the granularity and control
>>> that Flink has been providing me over other streaming technologies I’ve
>>> used in the past. I’m totally sold on it and am looking forward to doing
>>> more incredible things with it.
>>>
>>> Best regards,
>>>
>>> Rion
>>>
>>> On Feb 26, 2021, at 4:36 AM, David Anderson <dander...@apache.org>
>>> wrote:
>>>
>>> 
>>> Yes indeed, Timo is correct -- I am proposing that you not use timers at
>>> all. Watermarks and event-time timers go hand in hand -- and neither
>>> mechanism can satisfy your requirements.
>>>
>>> You can instead put all of the timing logic in the processElement method
>>> -- effectively emulating what you would get if Flink were to offer per-key
>>> watermarking.
>>>
>>> The reason that the PseudoWindow example is using MapState is that for
>>> each key/tenant, more than one window can be active simultaneously. This
>>> occurs because the event stream is out-of-order with respect to time, so
>>> events for the "next window" are probably being processed before "the
>>> previous" window is complete. And if you want to accommodate allowed
>>> lateness, the requirement to have several windows open at once becomes even
>>> more important.
>>>
>>> MapState gives you a per-tenant hashmap, where each entry in that map
>>> corresponds to an open window for some particular tenant, where the map's
>>> key is the timestamp for a window, and the value is whatever state you want
>>> that window to hold.
>>>
>>> Best regards,
>>> David
>>>
>>>
>>>
>>>
>>> On Fri, Feb 26, 2021 at 9:44 AM Timo Walther <twal...@apache.org> wrote:
>>>
>>>> Hi Rion,
>>>>
>>>> I think what David was refering to is that you do the entire time
>>>> handling yourself in process function. That means not using the
>>>> `context.timerService()` or `onTimer()` that Flink provides but calling
>>>> your own logic based on the timestamps that enter your process function
>>>> and the stored state.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 26.02.21 00:29, Rion Williams wrote:
>>>> > 
>>>> > Hi David,
>>>> >
>>>> > Thanks for your prompt reply, it was very helpful and the
>>>> PseudoWindow
>>>> > example is excellent. I believe it closely aligns with an approach
>>>> that
>>>> > I was tinkering with but seemed to be missing a few key pieces. In my
>>>> > case, I'm essentially going to want to be aggregating the messages
>>>> that
>>>> > are coming into the window (a simple string-concatenation aggregation
>>>> > would work). Would I need another form of state to hold that, as
>>>> looking
>>>> > through this example with naive eyes, it seems that this function is
>>>> > currently storing multiple windows in state via the MapState provided:
>>>> >
>>>> > // Keyed, managed state, with an entry for each window, keyed by the
>>>> > window's end time.
>>>> > // There is a separate MapState object for each driver.
>>>> > private transient MapState<Long, Float> sumOfTips;
>>>> >
>>>> > If I wanted to perform an aggregation for each key/tenant, would a
>>>> > MapState be appropriate? Such as a MapState<Long, String> if I was
>>>> doing
>>>> > a string aggregation, so that within my processElement function I
>>>> could
>>>> > use something similar for building these aggregations and ultimately
>>>> > triggering them:
>>>> >
>>>> > // Keep track of a tenant/source specific watermark
>>>> > private lateinit var currentWatermark: ValueState<Long>
>>>> > // Keep track of the contents of each of the windows where the key
>>>> > represents the close
>>>> > // of the window and the contents represents an accumulation of the
>>>> > records for that window
>>>> > private lateinit var windowContents: MapState<Long, String>
>>>> >
>>>> > If that's the case, this is what I've thrown together thus far and I
>>>> > feel like it's moving in the right direction:
>>>> >
>>>> > class MagicWindow(private val duration: Long, private val lateness:
>>>> Long):
>>>> >      KeyedProcessFunction<String, Event, FileOutput>(){
>>>> >
>>>> >      // Keep track of a tenant/source specific watermark
>>>> >      private lateinit var currentWatermark: ValueState<Long>
>>>> >      // Keep track of the contents of each of the windows where the
>>>> key
>>>> > represents the close
>>>> >      // of the window and the contents represents an accumulation of
>>>> the
>>>> > records for that window
>>>> >      private lateinit var windowContents: MapState<Long, String>
>>>> >
>>>> >      override fun open(config: Configuration) {
>>>> >          currentWatermark = runtimeContext.getState(watermark)
>>>> >          currentWatermark.update(Long.MIN_VALUE)
>>>> >      }
>>>> >
>>>> >      override fun processElement(element: Event, context: Context,
>>>> out:
>>>> > Collector<FileOutput>) {
>>>> >          // Resolve the event time
>>>> >          val eventTime: Long = getEventTime(element)
>>>> >
>>>> >          // Update watermark (if applicable)
>>>> >          if (currentWatermark.value() < eventTime){
>>>> >              currentWatermark.update(eventTime)
>>>> >          }
>>>> >
>>>> >          // Define a timer for this window
>>>> >          val timerService = context.timerService()
>>>> >
>>>> >          if (eventTime <= timerService.currentWatermark()) {
>>>> >              // This event is late; its window has already been
>>>> triggered.
>>>> >          } else {
>>>> >              // Determine the "actual" window closure and start a
>>>> timer
>>>> > for it
>>>> >              // (eventTime + window
>>>> >              val endOfWindow= eventTime - (eventTime % duration) +
>>>> > duration - 1
>>>> >
>>>> >              // Schedule a callback for when the window has been
>>>> completed.
>>>> >              timerService.registerEventTimeTimer(endOfWindow)
>>>> >
>>>> >              // Add this element to the corresponding aggregation for
>>>> > this window
>>>> >              windowContents.put(endOfWindow,
>>>> windowContents[endOfWindow]
>>>> > + "$element")
>>>> >          }
>>>> >      }
>>>> >
>>>> >      override fun onTimer(timestamp: Long, context: OnTimerContext,
>>>> out:
>>>> > Collector<FileOutput>) {
>>>> >          val key = context.currentKey
>>>> >          val currentAggregation: String =
>>>> windowContents.get(timestamp)
>>>> >
>>>> >          // Output things here and clear the current aggregation for
>>>> this
>>>> >          // tenant/source combination in this window
>>>> >      }
>>>> >
>>>> >      companion object {
>>>> >          private val watermark = ValueStateDescriptor(
>>>> >              "watermark",
>>>> >              Long::class.java
>>>> >          )
>>>> >
>>>> >          private val windowContents = MapStateDescriptor(
>>>> >              "window-contents",
>>>> >              Long::class.java,
>>>> >              String::class.java
>>>> >          )
>>>> >
>>>> >          fun getEventTime(element: Event): Long {
>>>> >              return
>>>> Instant(element.`source$1`.createdTimestamp).millis
>>>> >          }
>>>> >      }
>>>> > }
>>>> >
>>>> > Is something glaringly off with this? I’ll need to do some
>>>> additionally
>>>> > reading on the timers, but any additional clarification would be
>>>> greatly
>>>> > appreciated.
>>>> >
>>>> > Thanks so much for your initial response again!
>>>> >
>>>> > Rion
>>>> >
>>>> >> On Feb 25, 2021, at 3:27 PM, David Anderson <dander...@apache.org>
>>>> wrote:
>>>> >>
>>>> >> 
>>>> >> Rion,
>>>> >>
>>>> >> What you want isn't really achievable with the APIs you are using.
>>>> >> Without some sort of per-key (per-tenant) watermarking -- which
>>>> Flink
>>>> >> doesn't offer -- the watermarks and windows for one tenant can be
>>>> held
>>>> >> up by the failure of another tenant's events to arrive in a timely
>>>> manner.
>>>> >>
>>>> >> However, your pipeline is pretty straightforward, and it shouldn't
>>>> be
>>>> >> particularly difficult to accomplish what you want. What you can do
>>>> is
>>>> >> to ignore the built-in watermarking and windowing APIs, and build
>>>> >> equivalent functionality in the form of a KeyedProcessFunction.
>>>> >>
>>>> >> The Flink docs include an example [1] showing how to implement your
>>>> >> own tumbling event time windows with a process function. That
>>>> >> implementation assumes you can rely on watermarks for triggering the
>>>> >> windows; you'll have to do that differently.
>>>> >>
>>>> >> What you can do instead is to track, in ValueState, the largest
>>>> >> timestamp you've seen so far (for each key/tenant). Whenever that
>>>> >> advances, you can subtract the bounded-out-of-orderness duration
>>>> from
>>>> >> that timestamp, and then check to see if the resulting value is now
>>>> >> large enough to trigger any of the windows for that key/tenant.
>>>> >>
>>>> >> Handling allowed lateness should be pretty straightforward.
>>>> >>
>>>> >> Hope this helps,
>>>> >> David
>>>> >>
>>>> >> [1]
>>>> >>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example
>>>> >> <
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example
>>>> >
>>>> >>
>>>> >> On Thu, Feb 25, 2021 at 9:05 PM Rion Williams <rionmons...@gmail.com
>>>> >> <mailto:rionmons...@gmail.com>> wrote:
>>>> >>
>>>> >>     Hey folks, I have a somewhat high-level/advice question regarding
>>>> >>     Flink and if it has the mechanisms in place to accomplish what
>>>> I’m
>>>> >>     trying to do. I’ve spent a good bit of time using Apache Beam,
>>>> but
>>>> >>     recently pivoted over to native Flink simply because some of the
>>>> >>     connectors weren’t as mature or didn’t support some of the
>>>> >>     functionality that I needed.
>>>> >>
>>>> >>     Basically - I have a single Kafka topic with 10 partitions that
>>>> >>     I’m consuming from. This is a multi-tenant topic containing data
>>>> >>     that comes in at various times from various tenants and is not at
>>>> >>     all guaranteed to be in order, at least with regards to “event
>>>> >>     time”, which is what I care about.
>>>> >>
>>>> >>     What I’m trying to accomplish is this: *Given a multi-tenant
>>>> topic
>>>> >>     with records eventually distributed across partitions, is it
>>>> >>     possible to consume and window each of these records
>>>> independently
>>>> >>     of one another without one tenant potentially influencing another
>>>> >>     and write out to separate files per tenant/source (i.e. some
>>>> other
>>>> >>     defined property on the records)?”
>>>> >>     *
>>>> >>     My pipeline currently looks something like this:
>>>> >>
>>>> >>     @JvmStatic
>>>> >>     fun main(args: Array<String>) {
>>>> >>         val pipeline = StreamExecutionEnvironment
>>>> >>             .getExecutionEnvironment()
>>>> >>             //.createLocalEnvironmentWithWebUI(Configuration())
>>>> >>
>>>> >>         val properties = buildPropertiesFromArgs(args)
>>>> >>         val stream = pipeline
>>>> >>             .addSource(readFromKafka("events", properties))
>>>> >>             .assignTimestampsAndWatermarks(
>>>> >>                 WatermarkStrategy
>>>> >>
>>>> >>     .forBoundedOutOfOrderness<Event>(Duration.ofSeconds(...))
>>>> >>                     .withTimestampAssigner { event: Event, _: Long ->
>>>> >>                         // Assign the created timestamp as the event
>>>> >>     timestamp
>>>> >>                         Instant(event.createdTimestamp).millis
>>>> >>                     }
>>>> >>             )
>>>> >>
>>>> >>         // There are multiple data sources that each have their own
>>>> >>     windows and allowed lateness
>>>> >>         // so ensure that each source only handles records for it
>>>> >>         DataSources.forEach { source ->
>>>> >>             stream
>>>> >>                 .filter { event ->
>>>> >>                     event.source == source.name <http://source.name>
>>>> >>                 }
>>>> >>                 .keyBy { event ->
>>>> >>                     //print("Keying record with id ${record.`id$1`}
>>>> by
>>>> >>     tenant ${record.`source$1`.tenantName}")
>>>> >>                     event.tenant
>>>> >>                 }
>>>> >>                 .window(
>>>> >>
>>>> >>     TumblingEventTimeWindows.of(Time.minutes(source.windowDuration))
>>>> >>                 )
>>>> >>                 .allowedLateness(
>>>> >>                     Time.minutes(source.allowedLateness)
>>>> >>                 )
>>>> >>                 .process(
>>>> >>                     // This just contains some logic to take the
>>>> >>     existing windows and construct a file
>>>> >>                     // using the window range and keys
>>>> (tenant/source)
>>>> >>     with the values being
>>>> >>                     // an aggregation of all of the records
>>>> >>                     WindowedEventProcessFunction(source.name
>>>> >>     <http://source.name>)
>>>> >>                 )
>>>> >>                 .map { summary ->
>>>> >>                     // This would be a sink to write to a file
>>>> >>                 }
>>>> >>         }
>>>> >>         pipeline.execute("event-processor")
>>>> >>     }
>>>> >>
>>>> >>     My overarching question is really - *Can I properly separate the
>>>> >>     data with custom watermark strategies and ensure that keying (or
>>>> >>     some other construct) is enough to allow each tenant/source
>>>> >>     combination to be treated as it’s own stream with it’s own
>>>> >>     watermarking? *I know I could possibly break the single topic up
>>>> >>     into multiple disparate topics, however that level of granularity
>>>> >>     would likely result in several thousand (7000+) topics so I'm
>>>> >>     hoping that some of the constructs available within Flink may
>>>> help
>>>> >>     with this (WatermarkStrategies, etc.)
>>>> >>
>>>> >>     Any recommendations / advice would be extremely helpful as I'm
>>>> >>     quite new to the Flink world, however I have quite a bit of
>>>> >>     experience in Apache Beam, Kafka Streams, and a smattering of
>>>> >>     other streaming technologies.
>>>> >>
>>>> >>     Thanks much,
>>>> >>
>>>> >>     Rion
>>>> >>
>>>>
>>>>

Reply via email to