mzz created FLINK-18652: --------------------------- Summary: JDBCAppendTableSink to ClickHouse (data always repeating) Key: FLINK-18652 URL: https://issues.apache.org/jira/browse/FLINK-18652 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.10.0 Reporter: mzz
Hi all, data stream is : kafka->flinkSQL->clickhouse。 The window is 15 min,but,15 minutes after the first time, the data kepping repeat sink to ClickHouse, plz help me ,thx。 {code:java} *// data source from kafka * streamTableEnvironment.sqlUpdate(createTableSql) LOG.info("kafka source table has created !") val groupTable = streamTableEnvironment.sqlQuery(tempSql) streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable) *// this is window sql ,use ProcessingTime * val re_table = streamTableEnvironment.sqlQuery(windowSql) re_table.printSchema() // groupTable.printSchema() val rr = streamTableEnvironment.toAppendStream[Result](re_table) * // The data here is printed normally * rr.print() streamTableEnvironment.createTemporaryView("result_table", rr) val s = streamTableEnvironment.sqlQuery(sql) *// sink to clickhouse* val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") .setDBUrl(URL) .setQuery(insertCKSql) .setUsername(USERNAME) .setPassword(PASSWORD) .setBatchSize(10000) .setParameterTypes( Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG() ) .build() streamTableEnvironment.registerTableSink("ckResult", Array[String]("data_date", "point", "platform", "page_name", "component_name", "booth_name", "position1", "advertiser", "adv_code", "request_num", "return_num", "fill_rate", "expose_num", "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"), Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()), sink) // insert into TableSink s.insertInto("ckResult") {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)