Hi Fabian, Guowei Thanks for the help. My flow is as the attached photo. Where (1) and (2) are the main data streams from file sources, while (3) and (4) are the enrichment data, also from file sources. <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2019-05-01_at_08.png>
(5) is to merge-parse (1) and (2), which consists of: A tumbling window function, with early trigger (basing on the number of records in the window: FIRE when there have been at least one msg from each stream 1 & 2, not waiting for window end-time) A flat map function to parse the incoming msg A filter and a map (6) works as a data enricher, to enrich output of (5) with data from (3) and (4). As (4) is broadcasted, what My implementation for (6) is like: /stream5.union(stream3).keyBy(key2).connect(stream4).process(MyFunction6 extends KeyedBroadcastProcessFunction)/ In this KeyedBroadcastProcessFunction, one msg from (5) would trigger one output, while a msg from (3) or (4) doesn't send out any records, but update the States only. Regarding messages type: Outputs of (1) and (2) are of the same type EventType1. Output of (3) is of type EventType2_1 extends EventType2 Output of (5) is of type EventType2_2 extends EventType2 Input of (6) is of type EventType2 (from the unioned-keyed-stream), and of type Type3 (from the broadcast stream) Output of (6) is of the type EventType2_3, which is mapped from EvenType2_1 As seen on my screenshot, only (5) showed watermark, not (6) nor (7). I noticed that problem because my (7) didn't work as expected. And when I put an eventTimeExtractor between (6) and (7), then (7) worked. Typing all the way until now, I guess that I have known where my issue came from: I have not assign timestamp/watermark for (3) and (4) because I thought that they are just idle sources of enrichment data. /*Because of this, I have another question:*/ I read the text regarding Idling sources [1], but not sure how to implement that for my file sources. Could you please recommend a solution/good-practice here? I have one more question about the recommendation [2] to emit timestamp and watermark from within the source function. Is there any way to do that with the file sources? Thanks and best regards, Averell [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#idling-sources [2] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html#source-functions-with-timestamps-and-watermarks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/