I had this issue myself. Your timestamp assigner will only advance the window as it receives data, thus when you reach the end of the data there will be data which is newer than the last window.
One solution is to have the source flag that there will be no more data. If you can do this then that is the best solution. Another solution is to mix event time and wall clock time in deciding the window, thus the window will eventually move past and output the data. Note that if you use this approach and you are reprocessing the data, because the wall clock will be different, your data may be grouped differently and you could see different results depending on what kind of computation you are using. The next gotcha that I hit was parallelism, if you are assigning timestamps in a parallel task (say after a keyBy) then each of the parallel tasks will have their own window assigner. If your data is poorly distributed for your key function then you might end up with one of those parallel timestamp assigners only getting one or zero data points and thus all data output is blocked forever! This is all hinted at on https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html but it could be more explicit. https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#idling-sources is describing your exact issue... HTH On Wed, 6 Mar 2019 at 14:51, 刘 文 <thinktothi...@yahoo.com> wrote: > DataStream EventTime last data cannot be output ? > > > In the verification of EventTime plus watermark processing, I found that > the data sent to the socket cannot be output in time or output. > ). The verification found that only the timestamp of the current send data > of getCurrentWatermark() > TimeWindow + maxOutOfOrderness will trigger the > end of the last window > ). But the latest record can not be processed in time, or can not be > processed > ). How can I deal with this problem? > > > > The following is the Flink program ,Flink 1.7.2 > --------------------------------------------------------------------------- > > > > package > com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime > > import java.util.{Date, Properties} > > import com.alibaba.fastjson.JSON > import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil > import org.apache.flink.api.common.serialization.SimpleStringSchema > import org.apache.flink.configuration.Configuration > import org.apache.flink.streaming.api.TimeCharacteristic > import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction > import org.apache.flink.streaming.api.watermark.Watermark > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.windowing.windows.TimeWindow > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer > import org.apache.flink.util.Collector > > > object SockWordCountRun { > > > > def main(args: Array[String]): Unit = { > > > // get the execution environment > // val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > > > val configuration : Configuration = > ConfigurationUtil.getConfiguration(true) > > val env:StreamExecutionEnvironment = > StreamExecutionEnvironment.createLocalEnvironment(1,configuration) > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > > > import org.apache.flink.streaming.api.scala._ > val dataStream = env.socketTextStream("localhost", 1234, '\n') > > // .setParallelism(3) > > > dataStream.assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks[String] { > > val maxOutOfOrderness = 2 * 1000L // 3.5 seconds > var currentMaxTimestamp: Long = _ > var currentTimestamp: Long = _ > > override def getCurrentWatermark: Watermark = new > Watermark(currentMaxTimestamp - maxOutOfOrderness) > > override def extractTimestamp(element: String, > previousElementTimestamp: Long): Long = { > val jsonObject = JSON.parseObject(element) > > val timestamp = jsonObject.getLongValue("extract_data_time") > currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp) > currentTimestamp = timestamp > > /* println("===========watermark begin===========") > println() > println(new Date(currentMaxTimestamp - 20 * 1000)) > println(jsonObject) > println("===========watermark end===========") > println()*/ > timestamp > } > > }) > .timeWindowAll(Time.seconds(3)) > > .process(new ProcessAllWindowFunction[String,String,TimeWindow]() { > override def process(context: Context, elements: Iterable[String], out: > Collector[String]): Unit = { > > > println() > println("开始提交window") > println(new Date()) > for(e <- elements) out.collect(e) > println("结束提交window") > println(new Date()) > println() > } > }) > > .print() > //.setParallelism(3) > > > > > > > println("==================================以下为执行计划==================================") > println("执行地址(firefox效果更好):https://flink.apache.org/visualizer") > //执行计划 > println(env.getStreamGraph.getStreamingPlanAsJSON) > println("==================================以上为执行计划 > JSON串==================================\n") > > > env.execute("Socket 水印作业") > > > > > > > println("结束") > > } > > > // Data type for words with count > case class WordWithCount(word: String, count: Long){ > //override def toString: String = Thread.currentThread().getName + word + > " : " + count > } > > > def getConfiguration(isDebug:Boolean = false):Configuration = { > > val configuration : Configuration = new Configuration() > > if(isDebug){ > val timeout = "100000 s" > val timeoutHeartbeatPause = "1000000 s" > configuration.setString("akka.ask.timeout",timeout) > configuration.setString("akka.lookup.timeout",timeout) > configuration.setString("akka.tcp.timeout",timeout) > configuration.setString("akka.transport.heartbeat.interval",timeout) > > configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause) > configuration.setString("akka.watch.heartbeat.pause",timeout) > configuration.setInteger("heartbeat.interval",10000000) > configuration.setInteger("heartbeat.timeout",50000000) > } > > > configuration > } > > > } > > > > > > > best thinktothings > >