Hi Yuval,
I run a similar SQL (without `FIRST` aggregate function), there is nothing
wrong.
`FIRST` is a custom aggregate function? Would you please check if there is
a drawback in `FIRST`? Whether the query could run without `FIRST`?

Best,
JING ZHANG

Yuval Itzchakov <yuva...@gmail.com> 于2021年7月27日周二 上午12:29写道:

> Hi,
>
> *Setup:*
>
> 1 JM,
> 1 TM,
> Flink 1.13.1
> RocksDBStateBackend.
>
> I have a query with the rough sketch of the following:
>
> SELECT CAST(TUMBLE_START(event_time, INTERVAL '2' MINUTE) AS TIMESTAMP)
> START_TIME
>                CAST(TUMBLE_END(event_time, INTERVAL '2' MINUTE) AS
> TIMESTAMP)     END_TIME
>                FOO,
>                BAR,
>                FIRST(BAZ)
> WHERE QWAK = FALSE
> GROUP BY TUMBLE(event_time, INTERVAL '2' MINUTE),
>                     FOO,
>                     BAR
> HAVING COUNT(DISTINCT BUN) >= 10
>
> The query itself is a bit more complicated than that. When executing it in
> the cluster, I see the following error:
>
> java.lang.RuntimeException: No copy finished, this should be a bug, The
> remaining length is: 73728
> at
> org.apache.flink.table.data.binary.BinarySegmentUtils.copyToView(BinarySegmentUtils.java:236)
> at
> org.apache.flink.table.runtime.typeutils.StringDataSerializer.serialize(StringDataSerializer.java:76)
> at
> org.apache.flink.table.runtime.typeutils.StringDataSerializer.serialize(StringDataSerializer.java:34)
> at
> org.apache.flink.table.runtime.typeutils.ExternalSerializer.serialize(ExternalSerializer.java:150)
> at
> org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.buildCompositeKeyNamesSpaceUserKey(SerializedCompositeKeyBuilder.java:152)
> at
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespacePlusUserKey(AbstractRocksDBState.java:152)
> at
> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
> at
> org.apache.flink.table.runtime.dataview.StateMapView$StateMapViewWithKeysNullable.get(StateMapView.java:159)
> at
> org.apache.flink.table.runtime.dataview.StateMapView$NamespacedStateMapViewWithKeysNullable.get(StateMapView.java:392)
> at GroupingWindowAggsHandler$217.accumulate(Unknown Source)
> at
> org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:366)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> Would appreciate help in the direction of how to debug this issue, or if
> anyone has encountered this before.
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>

Reply via email to