[ https://issues.apache.org/jira/browse/FLINK-25916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569746#comment-17569746 ]
Fu Kai commented on FLINK-25916: -------------------------------- Hi team, We also encountered the same bug and that blocks we stay on 1.13 and cannot move forward to adapt new features in 1.14+. May I know if this issue on the path of fix? > Using upsert-kafka with a flush buffer results in Null Pointer Exception > ------------------------------------------------------------------------ > > Key: FLINK-25916 > URL: https://issues.apache.org/jira/browse/FLINK-25916 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Table SQL / Runtime > Affects Versions: 1.14.3, 1.15.0 > Environment: CentOS 7.9 x64 > Intel Xeon Gold 6140 CPU > Reporter: Corey Shaw > Priority: Major > > Flink Version: 1.14.3 > upsert-kafka version: 1.14.3 > > I have been trying to buffer output from the upsert-kafka connector using the > documented parameters {{sink.buffer-flush.max-rows}} and > {{sink.buffer-flush.interval}} > Whenever I attempt to run an INSERT query with buffering, I receive the > following error (shortened for brevity): > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.flush(ReducingUpsertWriter.java:145) > > at > org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.lambda$registerFlush$3(ReducingUpsertWriter.java:124) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.lang.Thread.run(Thread.java:829) [?:?] {code} > > If I remove the parameters related to flush buffering, then everything works > as expected with no problems at all. For reference, here is the full setup > with source, destination, and queries. Yes, I realize the INSERT could use > an overhaul, but that's not the issue at hand :). > {code:java} > CREATE TABLE `source_topic` ( > `timeGMT` INT, > `eventtime` AS TO_TIMESTAMP(FROM_UNIXTIME(`timeGMT`)), > `visIdHigh` BIGINT, > `visIdLow` BIGINT, > `visIdStr` AS CONCAT(IF(`visIdHigh` IS NULL, '', CAST(`visIdHigh` AS > STRING)), IF(`visIdLow` IS NULL, '', CAST(`visIdLow` AS STRING))), > WATERMARK FOR eventtime AS eventtime - INTERVAL '25' SECONDS > ) WITH ( > 'connector' = 'kafka', > 'properties.group.id' = 'flink_metrics', > 'properties.bootstrap.servers' = 'brokers.example.com:9093', > 'topic' = 'source_topic', > 'scan.startup.mode' = 'earliest-offset', > 'value.format' = 'avro-confluent', > 'value.avro-confluent.url' = 'http://schema.example.com', > 'value.fields-include' = 'EXCEPT_KEY' > ); > CREATE TABLE dest_topic ( > `messageType` VARCHAR, > `observationID` BIGINT, > `obsYear` BIGINT, > `obsMonth` BIGINT, > `obsDay` BIGINT, > `obsHour` BIGINT, > `obsMinute` BIGINT, > `obsTz` VARCHAR(5), > `value` BIGINT, > PRIMARY KEY (observationID, messageType) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'key.format' = 'json', > 'properties.bootstrap.servers' = 'brokers.example.com:9092', > 'sink.buffer-flush.max-rows' = '50000', > 'sink.buffer-flush.interval' = '1000', > 'topic' = 'dest_topic ', > 'value.format' = 'json' > ); > INSERT INTO adobenow_metrics > SELECT `messageType`, `observationID`, obsYear, obsMonth, obsDay, > obsHour, obsMinute, obsTz, SUM(`value`) AS `value` FROM ( > SELECT `messageType`, `observationID`, obsYear, obsMonth, obsDay, > obsHour, obsMinute, '-0000' AS obsTz, 1 AS `value`, `visIdStr` FROM ( > SELECT > 'visit' AS `messageType`, > CAST(DATE_FORMAT(window_start, 'yyyyMMddHHmm') AS BIGINT) AS > `observationID`, > year(window_start) AS obsYear, > month(window_start) AS obsMonth, > dayofmonth(window_start) AS obsDay, > hour(window_start) AS obsHour, > minute(window_start) AS obsMinute, > '-0000' AS obsTz, > visIdStr > FROM TABLE(TUMBLE(TABLE `adobenow_sparkweb`, > DESCRIPTOR(`eventtime`), INTERVAL '60' SECONDS)) > WHERE visIdStr IS NOT NULL > GROUP BY window_start, window_end, visIdStr > ) > GROUP BY messageType, observationID, obsYear, obsMonth, obsDay, > obsHour, obsMinute, `visIdStr` > ) > GROUP BY messageType, observationID, obsYear, obsMonth, obsDay, obsHour, > obsMinute, obsTz;{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)