Hi,

请带上完整的错误信息,或者错误的结果。


Regards,
Jark

> 在 2019年4月15日,15:19,492341344 <[email protected]> 写道:
> 
> import java.util.Collections
> 
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.scala._
> import org.apache.flink.table.api.types.DataTypes
> import org.apache.flink.table.sources.csv.CsvTableSource
> import org.apache.flink.types.Row
> import scala.collection.mutable
> 
> object LateralJoinList {
> 
>  def procLateralJoinPrint(sql: String) = {
>    val env = StreamExecutionEnvironment.getExecutionEnvironment
>    val tEnv = TableEnvironment.getTableEnvironment(env)
> 
>    // rates_data:
>    //    currency,rate
>    //    US Dollar,102
>    //    Euro,114
>    //    Yen,1
>    //    Euro,116
>    val tableRateSource = CsvTableSource
>      .builder
>      .path("/Users/xxx/Desktop/csv")
>      .field("currency", DataTypes.STRING)
>      .field("rate", DataTypes.DOUBLE)
>      .uniqueKeys(Collections.singleton(Collections.singleton("currency")))
>      .fieldDelimiter(",")
>      .ignoreFirstLine
>      .ignoreParseErrors
>      .build
> 
>    // 构造订单数据
>    val ordersData = new mutable.MutableList[(Int, String)]
>    ordersData.+=((2, "Euro"))
>    ordersData.+=((1, "US Dollar"))
>    ordersData.+=((50, "Yen"))
>    ordersData.+=((3, "Euro"))
>    ordersData.+=((3, "Euroxxx")) // not emit
> 
>    val order_data = env
>      .fromCollection(ordersData)
>      .toTable(tEnv, 'amount, 'currency, 'proctime.proctime)
> 
>    tEnv.registerTableSource("LatestRates", tableRateSource)
>    tEnv.registerTable("Orders", order_data)
> 
>    tEnv.sqlQuery(sql).toRetractStream[Row].print()
>    env.execute()
>  }
> 
>  def main(args: Array[String]): Unit = {
>    val sql1 =
>      """
>        |SELECT
>        |  o.amount, o.currency, r.rate, o.amount * r.rate
>        |FROM
>        |  Orders AS o
>        |LEFT OUTER JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
>        |ON o.currency = r.currency
>      """.stripMargin
> 
>    procLateralJoinPrint(sql1)
>  }
> }

回复