Hi Jing, Yes, FIRST is a UDAF.
I've been trying to reproduce this locally without success so far. The query itself has more fields and aggregates. Once I can reproduce this locally I'll try to narrow down the problematic field and share more information. On Tue, Jul 27, 2021, 05:17 JING ZHANG <beyond1...@gmail.com> wrote: > Hi Yuval, > I run a similar SQL (without `FIRST` aggregate function), there is nothing > wrong. > `FIRST` is a custom aggregate function? Would you please check if there is > a drawback in `FIRST`? Whether the query could run without `FIRST`? > > Best, > JING ZHANG > > Yuval Itzchakov <yuva...@gmail.com> 于2021年7月27日周二 上午12:29写道: > >> Hi, >> >> *Setup:* >> >> 1 JM, >> 1 TM, >> Flink 1.13.1 >> RocksDBStateBackend. >> >> I have a query with the rough sketch of the following: >> >> SELECT CAST(TUMBLE_START(event_time, INTERVAL '2' MINUTE) AS TIMESTAMP) >> START_TIME >> CAST(TUMBLE_END(event_time, INTERVAL '2' MINUTE) AS >> TIMESTAMP) END_TIME >> FOO, >> BAR, >> FIRST(BAZ) >> WHERE QWAK = FALSE >> GROUP BY TUMBLE(event_time, INTERVAL '2' MINUTE), >> FOO, >> BAR >> HAVING COUNT(DISTINCT BUN) >= 10 >> >> The query itself is a bit more complicated than that. When executing it >> in the cluster, I see the following error: >> >> java.lang.RuntimeException: No copy finished, this should be a bug, The >> remaining length is: 73728 >> at >> org.apache.flink.table.data.binary.BinarySegmentUtils.copyToView(BinarySegmentUtils.java:236) >> at >> org.apache.flink.table.runtime.typeutils.StringDataSerializer.serialize(StringDataSerializer.java:76) >> at >> org.apache.flink.table.runtime.typeutils.StringDataSerializer.serialize(StringDataSerializer.java:34) >> at >> org.apache.flink.table.runtime.typeutils.ExternalSerializer.serialize(ExternalSerializer.java:150) >> at >> org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.buildCompositeKeyNamesSpaceUserKey(SerializedCompositeKeyBuilder.java:152) >> at >> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespacePlusUserKey(AbstractRocksDBState.java:152) >> at >> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123) >> at >> org.apache.flink.table.runtime.dataview.StateMapView$StateMapViewWithKeysNullable.get(StateMapView.java:159) >> at >> org.apache.flink.table.runtime.dataview.StateMapView$NamespacedStateMapViewWithKeysNullable.get(StateMapView.java:392) >> at GroupingWindowAggsHandler$217.accumulate(Unknown Source) >> at >> org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:366) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) >> at java.base/java.lang.Thread.run(Thread.java:829) >> >> Would appreciate help in the direction of how to debug this issue, or if >> anyone has encountered this before. >> >> >> -- >> Best Regards, >> Yuval Itzchakov. >> >