Hi, 请带上完整的错误信息,或者错误的结果。
Regards, Jark > 在 2019年4月15日,15:19,492341344 <[email protected]> 写道: > > import java.util.Collections > > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.table.api.TableEnvironment > import org.apache.flink.table.api.scala._ > import org.apache.flink.table.api.types.DataTypes > import org.apache.flink.table.sources.csv.CsvTableSource > import org.apache.flink.types.Row > import scala.collection.mutable > > object LateralJoinList { > > def procLateralJoinPrint(sql: String) = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > > // rates_data: > // currency,rate > // US Dollar,102 > // Euro,114 > // Yen,1 > // Euro,116 > val tableRateSource = CsvTableSource > .builder > .path("/Users/xxx/Desktop/csv") > .field("currency", DataTypes.STRING) > .field("rate", DataTypes.DOUBLE) > .uniqueKeys(Collections.singleton(Collections.singleton("currency"))) > .fieldDelimiter(",") > .ignoreFirstLine > .ignoreParseErrors > .build > > // 构造订单数据 > val ordersData = new mutable.MutableList[(Int, String)] > ordersData.+=((2, "Euro")) > ordersData.+=((1, "US Dollar")) > ordersData.+=((50, "Yen")) > ordersData.+=((3, "Euro")) > ordersData.+=((3, "Euroxxx")) // not emit > > val order_data = env > .fromCollection(ordersData) > .toTable(tEnv, 'amount, 'currency, 'proctime.proctime) > > tEnv.registerTableSource("LatestRates", tableRateSource) > tEnv.registerTable("Orders", order_data) > > tEnv.sqlQuery(sql).toRetractStream[Row].print() > env.execute() > } > > def main(args: Array[String]): Unit = { > val sql1 = > """ > |SELECT > | o.amount, o.currency, r.rate, o.amount * r.rate > |FROM > | Orders AS o > |LEFT OUTER JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r > |ON o.currency = r.currency > """.stripMargin > > procLateralJoinPrint(sql1) > } > }
