Salva, I've tested the code by myslelf but do you have the possibility to
test this[1] fix?



On Thu, Dec 5, 2024 at 3:15 PM Gabor Somogyi <>

> I'm intended to file a jira + PR.
> G
> On Thu, Dec 5, 2024 at 2:40 PM Salva Alcántara <>
> wrote:
>> So what's next? Do you want me to do something Gabor?
>> Regards,
>> Salva
>> On Thu, Dec 5, 2024, 13:48 Gabor Somogyi <>
>> wrote:
>>> Based on this I can confirm that the maxBatchSize and socketTimeout
>>> values are coming from default value Flink config is not considered.
>>> G
>>> On Thu, Dec 5, 2024 at 8:20 AM Salva Alcántara <>
>>> wrote:
>>>> Sorry for my late reply Gabor, here you are the whole trace:
>>>> SLF4J(W): No SLF4J providers were found.
>>>> SLF4J(W): Defaulting to no-operation (NOP) logger implementation
>>>> SLF4J(W): See for further
>>>> details.
>>>> SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api
>>>> versions 1.7.x or earlier.
>>>> SLF4J(W): Ignoring binding found at
>>>> [jar:file:/private/var/tmp/_bazel_salva/d416ba92e8fb4416e3182f6204d8237a/external/maven/v1/https/
>>>> ]
>>>> SLF4J(W): See for an
>>>> explanation.
>>>> WARNING: An illegal reflective access operation has occurred
>>>> WARNING: Illegal reflective access by
>>>> (file:/private/var/tmp/_bazel_salva/d416ba92e8fb4416e3182f6204d8237a/external/maven/v1/https/
>>>> to field java.util.Arrays$ArrayList.a
>>>> WARNING: Please consider reporting this to the maintainers of
>>>> WARNING: Use --illegal-access=warn to enable warnings of further
>>>> illegal reflective access operations
>>>> WARNING: All illegal access operations will be denied in a future
>>>> release
>>>> Exception in thread "main" java.lang.RuntimeException: Failed to fetch
>>>> next result
>>>>         at
>>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(
>>>>         at
>>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(
>>>>         at
>>>> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(
>>>>         at
>>>> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(
>>>>         at
>>>>         at
>>>> com.auvik.platform.triggering.statecli.savepoint.SavepointReader.main(
>>>> Caused by: Failed to fetch job execution result
>>>>         at
>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(
>>>>         at
>>>>         at
>>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(
>>>>         ... 5 more
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution 
>>>> failed.
>>>>         at
>>>> java.base/java.util.concurrent.CompletableFuture.reportGet(
>>>>         at
>>>> java.base/java.util.concurrent.CompletableFuture.get(
>>>>         at
>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(
>>>>         ... 7 more
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>> execution failed.
>>>>         at
>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(
>>>>         at
>>>> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(
>>>>         at
>>>> java.base/java.util.concurrent.CompletableFuture.uniApplyNow(
>>>>         at
>>>> java.base/java.util.concurrent.CompletableFuture.uniApplyStage(
>>>>         at
>>>> java.base/java.util.concurrent.CompletableFuture.thenApply(
>>>>         at
>>>> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(
>>>>         at
>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(
>>>>         ... 7 more
>>>> Caused by: org.apache.flink.runtime.JobException: Recovery is
>>>> suppressed by NoRestartBackoffTimeStrategy
>>>>         at
>>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(
>>>>         at
>>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(
>>>>         at
>>>> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(
>>>>         at
>>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(
>>>>         at
>>>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(
>>>>         at
>>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(
>>>>         at
>>>> jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
>>>>         at
>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>         at java.base/java.lang.reflect.Method.invoke(
>>>>         at
>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(
>>>>         at
>>>> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(
>>>>         at
>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(
>>>>         at
>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(
>>>>         at
>>>> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(
>>>>         at
>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(
>>>>         at
>>>>         at
>>>>         at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>>>>         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>>>>         at
>>>>         at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>>>>         at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>>>>         at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>>>>         at
>>>>         at$(Actor.scala:545)
>>>>         at
>>>>         at
>>>>         at
>>>>         at
>>>> org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
>>>>         at
>>>>         at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
>>>>         at
>>>> java.base/java.util.concurrent.ForkJoinTask.doExec(
>>>>         at
>>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(
>>>>         at
>>>> java.base/java.util.concurrent.ForkJoinPool.scan(
>>>>         at
>>>> java.base/java.util.concurrent.ForkJoinPool.runWorker(
>>>>         at
>>>> java.base/
>>>> Caused by: java.lang.RuntimeException: Record size is too large for
>>>> CollectSinkFunction. Record size is 7462652 bytes, but max bytes per batch
>>>> is only 2097152 bytes. Please consider increasing max bytes per batch value
>>>> by setting collect-sink.batch-size.max
>>>>         at
>>>> org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.invoke(
>>>>         at
>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>         at
>>>>         at
>>>>         at
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>         at
>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
>>>>         at
>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(
>>>>         at
>>>> org.apache.flink.runtime.taskmanager.Task.doRun(
>>>>         at
>>>>         at java.base/
>>>> On Mon, Jul 29, 2024 at 11:06 AM Gabor Somogyi <
>>>>> wrote:
>>>>> I've double checked and I think that CollectSinkOperatorFactory is
>>>>> initialized in DataStream.collectAsync without MAX_BATCH_SIZE
>>>>> and SOCKET_TIMEOUT values coming from the Flink config.
>>>>> Could you plz share the whole stacktrace to double check my assumption?
>>>>> G
>>>>> On Tue, Jul 23, 2024 at 12:46 PM Salva Alcántara <
>>>>>> wrote:
>>>>>> Hi all,
>>>>>> Just to share my findings so far. Regarding tweaking the setting, it
>>>>>> has been impossible for me to do so. So, the only way to work around this
>>>>>> has been to duplicate some Flink code directly to allow me to do the 
>>>>>> tweak.
>>>>>> More precisely, this is how my code looks like now (kudos to my dear
>>>>>> colleague Den!):
>>>>>> ```
>>>>>>   private static <T> List<T> executeAndCollect(DataStream<T>
>>>>>> dataStream, StreamExecutionEnvironment env,
>>>>>>                                                String maxBatchSize,
>>>>>> int limit) throws Exception {
>>>>>>     TypeSerializer<T> serializer =
>>>>>> dataStream.getType().createSerializer(env.getConfig());
>>>>>>     String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
>>>>>>     CollectSinkOperatorFactory<T> factory =
>>>>>>         new CollectSinkOperatorFactory<>(serializer, accumulatorName,
>>>>>> MemorySize.parse(maxBatchSize), SOCKET_TIMEOUT.defaultValue());
>>>>>>     CollectSinkOperator<MatcherState> operator =
>>>>>> (CollectSinkOperator<MatcherState>) factory.getOperator();
>>>>>>     CollectResultIterator<T> iterator =
>>>>>>         new CollectResultIterator<>(
>>>>>>             operator.getOperatorIdFuture(),
>>>>>>             serializer,
>>>>>>             accumulatorName,
>>>>>>             env.getCheckpointConfig());
>>>>>>     CollectStreamSink<T> sink = new CollectStreamSink<>(dataStream,
>>>>>> factory);
>>>>>>"Data stream collect sink");
>>>>>>     env.addOperator(sink.getTransformation());
>>>>>>     final JobClient jobClient = env.executeAsync("DataStream
>>>>>> Collect");
>>>>>>     iterator.setJobClient(jobClient);
>>>>>>     var clientAndIterator = new ClientAndIterator<>(jobClient,
>>>>>> iterator);
>>>>>>     List<T> results = new ArrayList<>(limit);
>>>>>>     while (limit > 0 && clientAndIterator.iterator.hasNext()) {
>>>>>>       results.add(;
>>>>>>       limit--;
>>>>>>     }
>>>>>>     return results;
>>>>>>   }
>>>>>> ```
>>>>>> Essentially, I'm just adding a parameter to the
>>>>>> CollectSinkOperatorFactory constructor here:
>>>>>> -
>>>>>> This works but it's obviously inconvenient for the user. If this
>>>>>> limitation is confirmed, Den and I will be glad to send a MR to fix that.
>>>>>> Makes sense?
>>>>>> Regards,
>>>>>> Salva
>>>>>> On Mon, Jul 22, 2024 at 10:04 AM Salva Alcántara <
>>>>>>> wrote:
>>>>>>> The same happens with this slight variation:
>>>>>>> ```
>>>>>>> Configuration config = new Configuration();
>>>>>>> config.setString("collect-sink.batch-size.max", "100mb");
>>>>>>> StreamExecutionEnvironment env =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>> env.configure(config);
>>>>>>> SavepointReader savepoint =, savepointPath,
>>>>>>> new HashMapStateBackend());
>>>>>>> ```
>>>>>>> Salva
>>>>>>> On Mon, Jul 22, 2024 at 10:00 AM Salva Alcántara <
>>>>>>>> wrote:
>>>>>>>> Hi Zhanghao,
>>>>>>>> Thanks for your suggestion. Unfortunately, this does not work, I
>>>>>>>> still get the same error message:
>>>>>>>> ```
>>>>>>>> Record size is too large for CollectSinkFunction. Record size is
>>>>>>>> 9623137 bytes, but max bytes per batch is only 2097152 bytes.
>>>>>>>> Please consider increasing max bytes per batch value by setting
>>>>>>>> collect-sink.batch-size.max
>>>>>>>> ```
>>>>>>>> The code looks like this now:
>>>>>>>> ```
>>>>>>>> Configuration config = new Configuration();
>>>>>>>> config.setString("collect-sink.batch-size.max", "10mb");
>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>>>>>>>> SavepointReader savepoint =,
>>>>>>>> savepointPath, new HashMapStateBackend());
>>>>>>>> var matcher = savepoint.readKeyedState("Raw Triggers", new
>>>>>>>> MatcherReader());
>>>>>>>> var matcherState = matcher.executeAndCollect(1000);
>>>>>>>> ```
>>>>>>>> I have tried other ways but none has worked (the setting is always
>>>>>>>> ignored in the end).
>>>>>>>> Regards,
>>>>>>>> Salva
>>>>>>>> On Sun, Jul 21, 2024 at 9:10 AM Zhanghao Chen <
>>>>>>>>> wrote:
>>>>>>>>> Hi, you could increase it as follows:
>>>>>>>>> Configuration config = new Configuration();
>>>>>>>>> config.setString(collect-sink.batch-size.max, "10mb");
>>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>>>>>>>>> ------------------------------
>>>>>>>>> *From:* Salva Alcántara <>
>>>>>>>>> *Sent:* Saturday, July 20, 2024 15:05
>>>>>>>>> *To:* user <>
>>>>>>>>> *Subject:* SavepointReader: Record size is too large for
>>>>>>>>> CollectSinkFunction
>>>>>>>>> Hi all!
>>>>>>>>> I'm trying to debug a job via inspecting its savepoints but I'm
>>>>>>>>> getting this error message:
>>>>>>>>> ```
>>>>>>>>> Caused by: java.lang.RuntimeException: Record size is too large
>>>>>>>>> for CollectSinkFunction. Record size is 9627127 bytes, but max bytes 
>>>>>>>>> per
>>>>>>>>> batch is only 2097152 bytes. Please consider increasing max bytes per 
>>>>>>>>> batch
>>>>>>>>> value by setting collect-sink.batch-size.max
>>>>>>>>> ```
>>>>>>>>> My code looks like this:
>>>>>>>>> ```
>>>>>>>>>   private static void run(String savepointPath) throws Exception {
>>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>     org.apache.flink.state.api.SavepointReader savepoint =
>>>>>>>>> savepointPath, new HashMapStateBackend());
>>>>>>>>>     var operator = savepoint.readKeyedState("uuid", new
>>>>>>>>> MyKeyedOperatorReader());
>>>>>>>>>     var operatorState = matcher.executeAndCollect(1000);
>>>>>>>>>   }
>>>>>>>>> ```
>>>>>>>>> I haven't found the way to increase the
>>>>>>>>> `collect-sink.batch-size.max` as suggested in the error msg.
>>>>>>>>> Any help on this will be appreciated!
>>>>>>>>> Regards,
>>>>>>>>> Salva

Reply via email to