[ https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu closed FLINK-18652. --------------------------- Resolution: Not A Problem > JDBCAppendTableSink to ClickHouse (data always repeating) > -------------------------------------------------------------- > > Key: FLINK-18652 > URL: https://issues.apache.org/jira/browse/FLINK-18652 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC, Table SQL / Ecosystem > Affects Versions: 1.10.0 > Reporter: mzz > Priority: Not a Priority > Labels: auto-deprioritized-critical, auto-deprioritized-major, > auto-deprioritized-minor > Attachments: FLINK-UI.png, checkpoint-failed.png > > > Hi all, > data stream is : kafka->flinkSQL->clickhouse。 > The window is 15 min,but,15 minutes after the first time, the data > kepping repeat sink to ClickHouse, plz help me ,thx。 > {code:java} > *// data source from kafka > * streamTableEnvironment.sqlUpdate(createTableSql) > LOG.info("kafka source table has created !") > val groupTable = streamTableEnvironment.sqlQuery(tempSql) > streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable) > *// this is window sql ,use ProcessingTime > * val re_table = streamTableEnvironment.sqlQuery(windowSql) > re_table.printSchema() > // groupTable.printSchema() > val rr = streamTableEnvironment.toAppendStream[Result](re_table) > * // The data here is printed normally > * rr.print() > streamTableEnvironment.createTemporaryView("result_table", rr) > val s = streamTableEnvironment.sqlQuery(sql) > *// sink to clickhouse* > val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder() > .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") > .setDBUrl(URL) > .setQuery(insertCKSql) > .setUsername(USERNAME) > .setPassword(PASSWORD) > .setBatchSize(10000) > .setParameterTypes( > Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, > Types.FLOAT, > Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, > Types.FLOAT, Types.LONG() > ) > .build() > streamTableEnvironment.registerTableSink("ckResult", > Array[String]("data_date", "point", "platform", "page_name", > "component_name", "booth_name", "position1", "advertiser", > "adv_code", "request_num", "return_num", "fill_rate", "expose_num", > "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"), > Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, > Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()), > sink) > // insert into TableSink > s.insertInto("ckResult") > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)