
Seth Wiesman edited comment on FLINK-7999 at 11/10/17 3:47 PM:

Sure, I will use the example of my use case. 

I am using Flink to aggregate spend about campaigns that I am running. Each 
record in the main stream represents contains both a campaign id as well as 
information about a single financial transaction, this stream is analogous to 
the fact table in a traditional data warehouse. However, each campaign runs 
under a different currency so I need to join with metadata containing the 
currency code for that campaign. Campaigns have both start and end dates, which 
can span any amount of time from one day to several months. 

timestamp             | id | spend           
2017-11-11 03:00 | 1  | 0.25               
2017-11-11 03:02 | 2  | 0.03               
2017-11-11 03:05 | 1  | 0.11

id | start_date   | end_date     | currency_code
1 | 2017-11-10 | 2017-11-12 |   "USD"        
2 | 2017-04-02 | 2019-12-31 |   "EUR"

I have a valid window of each event can be joined, but it varies by id, today 
implement this join with the following `CoProcessFunction`. 

class CampaignJoin(allowedLateness: Long) extends CoProcessFunction[Event, 
CampaignMetadata, Event] {

  @transient private lazy val descriptor = new 

  override def processElement1(value: Event, ctx: CoProcessFunction[Event, 
CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = {
    val campaign = getRuntimeContext.getState(descriptor).value()

    if (campaign != null && campaign.start <= ctx.timestamp()) {
      out.collect(value.copy(meta = campaign))

  override def processElement2(value: CampaignMetadata, ctx: 
CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: 
Collector[Event]): Unit = {
    val end = value.end.getTime + allowedLateness
    if (end < ctx.timerService().currentWatermark()) {


  override def onTimer(timestamp: Long, ctx: CoProcessFunction[Record, 
CampaignMetadata, Record]#OnTimerContext, out: Collector[Record]): Unit = {
    val state    = getRuntimeContext.getState(descriptor)
    val campaign = state.value()

    if (campaign != null) {
      val end = campaign.end.getTime + allowedLateness

      if (end == timestamp) {


was (Author: sjwiesman):
Sure, I will use the example of my use case. 

I am using Flink to aggregate spend about campaigns that I am running. Each 
record in the main stream represents contains both a campaign id as well as 
information about a single financial transaction, this stream is analogous to 
the fact table in a traditional data warehouse. However, each campaign runs 
under a different currency so I need to join with metadata containing the 
currency code for that campaign. Campaigns have both start and end dates, which 
can span any amount of time from one day to several months. 

Events:                                              Metadata: 
timestamp             | id | spend           id | start_date   | end_date     | 
2017-11-11 03:00 | 1  | 0.25               1 | 2017-11-10 | 2017-11-12 |   "USD"
2017-11-11 03:02 | 2  | 0.03               2 | 2017-04-02 | 2019-12-31 |   "EUR"
2017-11-11 03:05 | 1  | 0.11

I have a valid window of each event can be joined, but it varies by id, today 
implement this join with the following `CoProcessFunction`. 

class CampaignJoin(allowedLateness: Long) extends CoProcessFunction[Event, 
CampaignMetadata, Event] {

  @transient private lazy val descriptor = new 

  override def processElement1(value: Event, ctx: CoProcessFunction[Event, 
CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = {
    val campaign = getRuntimeContext.getState(descriptor).value()

    if (campaign != null && campaign.start <= ctx.timestamp()) {
      out.collect(value.copy(meta = campaign))

  override def processElement2(value: CampaignMetadata, ctx: 
CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: 
Collector[Event]): Unit = {
    val end = value.end.getTime + allowedLateness
    if (end < ctx.timerService().currentWatermark()) {


  override def onTimer(timestamp: Long, ctx: CoProcessFunction[Record, 
CampaignMetadata, Record]#OnTimerContext, out: Collector[Record]): Unit = {
    val state    = getRuntimeContext.getState(descriptor)
    val campaign = state.value()

    if (campaign != null) {
      val end = campaign.end.getTime + allowedLateness

      if (end == timestamp) {


> Variable Join Window Boundaries
> -------------------------------
>                 Key: FLINK-7999
>                 URL: https://issues.apache.org/jira/browse/FLINK-7999
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Seth Wiesman
> Allow window joins with variable length based on row attributes. 
> Consider a two streams joined on an id, where one has start and end dates, it 
> would be useful to be able to join each row during is live durations. Today 
> this can be expressed in the datastream api using a CoProcessFunction. 
>  left.id = right.id AND (left.time > right.start and left.time < right.end)

This message was sent by Atlassian JIRA

Reply via email to