the watermark >  end timestamp of the window should trigger the window. 
----------------- ???????? ------------------
;./??????: "?? ??"<thinktothi...@yahoo.com.INVALID>;
????????: 2019??3??7??(??????) ????8:35
??????: "dev"<dev@flink.apache.org>;

????: Fwd: DataStream EventTime last data cannot be output??





> ????????????????????
> 
> ??????: ?? ?? <thinktothi...@yahoo.com>
> ????: DataStream EventTime last data cannot be output??
> ????: 2019??3??6?? GMT+8 ????10:51:14
> ??????: u...@flink.apache.org
> 
> DataStream EventTime last data cannot be output ??
> 
> 
> In the verification of EventTime plus watermark processing, I found that the 
> data sent to the socket cannot be output in time or output.
> ). The verification found that only the timestamp of the current send data of 
> getCurrentWatermark() > TimeWindow + maxOutOfOrderness will trigger the end 
> of the last window
> ). But the latest record can not be processed in time, or can not be processed
> ). How can I deal with this problem?
> 
> 
> 
> The following is the Flink program ,Flink 1.7.2
> ---------------------------------------------------------------------------
> 
> 
> 
> package 
> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime
> 
> import java.util.{Date, Properties}
> 
> import com.alibaba.fastjson.JSON
> import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.util.Collector
> 
> 
> object SockWordCountRun {
> 
> 
> 
>   def main(args: Array[String]): Unit = {
> 
> 
>     // get the execution environment
>    // val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> 
> 
>     val configuration : Configuration = 
> ConfigurationUtil.getConfiguration(true)
> 
>     val env:StreamExecutionEnvironment = 
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
> 
> 
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
> 
> 
>     import org.apache.flink.streaming.api.scala._
>     val dataStream = env.socketTextStream("localhost", 1234, '\n')
> 
>      // .setParallelism(3)
> 
> 
>     dataStream.assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[String] {
> 
>         val maxOutOfOrderness =  2 * 1000L // 3.5 seconds
>         var currentMaxTimestamp: Long = _
>         var currentTimestamp: Long = _
> 
>         override def getCurrentWatermark: Watermark =  new 
> Watermark(currentMaxTimestamp - maxOutOfOrderness)
> 
>         override def extractTimestamp(element: String, 
> previousElementTimestamp: Long): Long = {
>           val jsonObject = JSON.parseObject(element)
> 
>           val timestamp = jsonObject.getLongValue("extract_data_time")
>           currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
>           currentTimestamp = timestamp
> 
>         /*  println("===========watermark begin===========")
>           println()
>           println(new Date(currentMaxTimestamp - 20 * 1000))
>           println(jsonObject)
>           println("===========watermark end===========")
>           println()*/
>           timestamp
>         }
> 
>       })
>       .timeWindowAll(Time.seconds(3))
> 
>       .process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
>       override def process(context: Context, elements: Iterable[String], out: 
> Collector[String]): Unit = {
> 
> 
>         println()
>         println("????????window")
>         println(new Date())
>         for(e <- elements) out.collect(e)
>         println("????????window")
>         println(new Date())
>         println()
>       }
>     })
> 
>       .print()
>       //.setParallelism(3)
> 
> 
> 
> 
> 
>     
> println("==================================??????????????==================================")
>     println("????????(firefox????????):https://flink.apache.org/visualizer 
> <https://flink.apache.org/visualizer>")
>     //????????
>     println(env.getStreamGraph.getStreamingPlanAsJSON)
>     println("==================================?????????????? 
> JSON??==================================\n")
> 
> 
>     env.execute("Socket ????????")
> 
> 
> 
> 
> 
> 
>     println("????")
> 
>   }
> 
> 
>   // Data type for words with count
>   case class WordWithCount(word: String, count: Long){
>     //override def toString: String = Thread.currentThread().getName + word + 
> " : " + count
>   }
> 
> 
>   def getConfiguration(isDebug:Boolean = false):Configuration = {
> 
>     val configuration : Configuration = new Configuration()
> 
>     if(isDebug){
>       val timeout = "100000 s"
>       val timeoutHeartbeatPause = "1000000 s"
>       configuration.setString("akka.ask.timeout",timeout)
>       configuration.setString("akka.lookup.timeout",timeout)
>       configuration.setString("akka.tcp.timeout",timeout)
>       configuration.setString("akka.transport.heartbeat.interval",timeout)
>       
> configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
>       configuration.setString("akka.watch.heartbeat.pause",timeout)
>       configuration.setInteger("heartbeat.interval",10000000)
>       configuration.setInteger("heartbeat.timeout",50000000)
>     }
> 
> 
>     configuration
>   }
> 
> 
> }
> 
> 
> 
> 
> 
> best   thinktothings
>

Reply via email to