??????????Blinkplanner??????????????????????????????oldplanner??????????????????1.10




package test.table.sql


import java.util.Properties


import com.souhu.msns.huyou.PublicParams
import com.souhu.msns.huyou.utils.KafkaPbSchema
import org.apache.flink.api.common.time.Time
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.windowing.time.{Time => WindowTime}
import org.apache.flink.types.Row




object test {


  def main(args: Array[String]): Unit = {


    
//----------------------------????????????------------------------------------------------
    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    bsEnv.setNumberOfExecutionRetries(1)
    bsEnv.setParallelism(1)
    //bsEnv.getConfig.setAutoWatermarkInterval(10000)
    bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    bsEnv.setStateBackend(new 
FsStateBackend("hdfs://dc1:8020/user/msns/streaming/checkpoint/flink/Circ", 
true))
    bsEnv.getCheckpointConfig.setCheckpointInterval(300000)
    bsEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(60000)
    bsEnv.setParallelism(3)
    bsEnv.setNumberOfExecutionRetries(1)


    
//----------------------------????TABLE????------------------------------------------------


    val setting = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val bstEnv = StreamTableEnvironment.create(bsEnv,setting)
    val tConfig = bstEnv.getConfig
    
tConfig.setIdleStateRetentionTime(Time.minutes(10),Time.minutes(20))
    val config = bstEnv.getConfig.getConfiguration()
    config.setString("table.exec.mini-batch.enabled", "true") // 
local-global aggregation depends on mini-batch is enabled
    config.setString("table.exec.mini-batch.allow-latency", "5 s")
    config.setString("table.exec.mini-batch.size", "5000")
    config.setString("table.optimizer.agg-phase-strategy", 
"TWO_PHASE") // enable two-phase, i.e. local-global aggregation
    config.setString("table.optimizer.distinct-agg.split.enabled", 
"true")
    //bstEnv.getConfig.setLocalTimeZone(ZoneId.of("Etc/GMT+8"))
  


    
//----------------------------??????????????------------------------------------------------
    val kafkaProps = new Properties()
    kafkaProps.setProperty("bootstrap.servers", PublicParams.brokers)
    val source = ....
                      
.toTable(bstEnv,'userId,'createTime.rowtime,'action,'circleName,'flowName,'ts,'content,'feedid,'postfeedid,'sessionId)


    bstEnv.createTemporaryView("source",source)


    val q1=bstEnv.sqlQuery(
      """select sessionId from source
        |where sessionId is not null
        |and action='P_TIMELINE'""".stripMargin)
      q1.toAppendStream[Row].print("source")
     bstEnv.createTemporaryView("sourcefeed",q1)
         
    val q2=bstEnv.sqlQuery(
      """select sessionId from source
        |where sessionId is not null
        |and action='V_TIMELINE_FEED'""".stripMargin)
    bstEnv.createTemporaryView("postfeed",q2)
        
    bstEnv.sqlQuery(
      """
        |select count(b.sessionId) from
        |sourcefeed a
        |join postfeed b
        |on a.sessionId=b.sessionId
      """.stripMargin).toRetractStream[Row].print("")




    bstEnv.execute("")
  }
}









------------------ ???????? ------------------
??????:&nbsp;"Leonard Xu"<[email protected]&gt;;
????????:&nbsp;2020??6??11??(??????) ????2:40
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: BLinkPlanner sql join????????



Hi??

????????????????????????flink??????????case, ????????????????????

Best??
Leonard Xu
&gt; ?? 2020??6??11????14:30??op <[email protected]&gt; ??????
&gt; 
&gt; ????????????????????????
&gt; 
??????????????????oldPlanner????IdleStateRetentionTime??join??????????????????????????????????blinkplanner????????????????????????????????????????????bug????

回复