[ https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16247674#comment-16247674 ]
Seth Wiesman commented on FLINK-7999: ------------------------------------- Sure, I will use the example of my use case. I am using Flink to aggregate spend about campaigns that I am running. Each record in the main stream represents contains both a campaign id as well as information about a single financial transaction, this stream is analogous to the fact table in a traditional data warehouse. However, each campaign runs under a different currency so I need to join with metadata containing the currency code for that campaign. Campaigns have both start and end dates, which can span any amount of time from one day to several months. Events: Metadata: timestamp | id | spend id | start_date | end_date | currency_code ----------------------------------------- ------------------------------------------------------- 2017-11-11 03:00 | 1 | 0.25 1 | 2017-11-10 | 2017-11-12 | "USD" 2017-11-11 03:02 | 2 | 0.03 2 | 2017-04-02 | 2019-12-31 | "EUR" 2017-11-11 03:05 | 1 | 0.11 I have a valid window of each event can be joined, but it varies by id, today implement this join with the following `CoProcessFunction`. {code:java} class CampaignJoin(allowedLateness: Long) extends CoProcessFunction[Event, CampaignMetadata, Event] { @transient private lazy val descriptor = new ValueStateDescriptor[CampaignMetadata]("campaign", createTypeInformation[CampaignMetadata]) override def processElement1(value: Event, ctx: CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = { val campaign = getRuntimeContext.getState(descriptor).value() if (campaign != null && campaign.start <= ctx.timestamp()) { out.collect(value.copy(meta = campaign)) } } override def processElement2(value: CampaignMetadata, ctx: CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = { val end = value.end.getTime + allowedLateness if (end < ctx.timerService().currentWatermark()) { return } ctx.timerService().registerEventTimeTimer(end) getRuntimeContext.getState(descriptor).update(value) } override def onTimer(timestamp: Long, ctx: CoProcessFunction[Record, CampaignMetadata, Record]#OnTimerContext, out: Collector[Record]): Unit = { val state = getRuntimeContext.getState(descriptor) val campaign = state.value() if (campaign != null) { val end = campaign.end.getTime + allowedLateness if (end == timestamp) { state.clear() } } } } {code} > Variable Join Window Boundaries > ------------------------------- > > Key: FLINK-7999 > URL: https://issues.apache.org/jira/browse/FLINK-7999 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Seth Wiesman > > Allow window joins with variable length based on row attributes. > Consider a two streams joined on an id, where one has start and end dates, it > would be useful to be able to join each row during is live durations. Today > this can be expressed in the datastream api using a CoProcessFunction. > left.id = right.id AND (left.time > right.start and left.time < right.end) -- This message was sent by Atlassian JIRA (v6.4.14#64029)