Gabor, I took a look at your PR and it looks good to me but didn't test
your changes locally. If still necessary, let me know and I'll find the
time!


On Fri, Dec 6, 2024 at 1:49 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Salva, I've tested the code by myslelf but do you have the possibility to
> test this[1] fix?
>
> [1] https://github.com/apache/flink/pull/25755
>
> G
>
>
> On Thu, Dec 5, 2024 at 3:15 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> I'm intended to file a jira + PR.
>>
>> G
>>
>>
>> On Thu, Dec 5, 2024 at 2:40 PM Salva Alcántara <salcantara...@gmail.com>
>> wrote:
>>
>>> So what's next? Do you want me to do something Gabor?
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>> On Thu, Dec 5, 2024, 13:48 Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Based on this I can confirm that the maxBatchSize and socketTimeout
>>>> values are coming from default value Flink config is not considered.
>>>>
>>>> G
>>>>
>>>>
>>>> On Thu, Dec 5, 2024 at 8:20 AM Salva Alcántara <salcantara...@gmail.com>
>>>> wrote:
>>>>
>>>>> Sorry for my late reply Gabor, here you are the whole trace:
>>>>>
>>>>> SLF4J(W): No SLF4J providers were found.
>>>>> SLF4J(W): Defaulting to no-operation (NOP) logger implementation
>>>>> SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for
>>>>> further details.
>>>>> SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api
>>>>> versions 1.7.x or earlier.
>>>>> SLF4J(W): Ignoring binding found at
>>>>> [jar:file:/private/var/tmp/_bazel_salva/d416ba92e8fb4416e3182f6204d8237a/external/maven/v1/https/
>>>>> repo1.maven.org/maven2/org/apache/logging/log4j/log4j-slf4j-impl/2.17.1/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class
>>>>> ]
>>>>> SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for an
>>>>> explanation.
>>>>> WARNING: An illegal reflective access operation has occurred
>>>>> WARNING: Illegal reflective access by
>>>>> com.twitter.chill.java.ArraysAsListSerializer
>>>>> (file:/private/var/tmp/_bazel_salva/d416ba92e8fb4416e3182f6204d8237a/external/maven/v1/https/
>>>>> repo1.maven.org/maven2/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar)
>>>>> to field java.util.Arrays$ArrayList.a
>>>>> WARNING: Please consider reporting this to the maintainers of
>>>>> com.twitter.chill.java.ArraysAsListSerializer
>>>>> WARNING: Use --illegal-access=warn to enable warnings of further
>>>>> illegal reflective access operations
>>>>> WARNING: All illegal access operations will be denied in a future
>>>>> release
>>>>> Exception in thread "main" java.lang.RuntimeException: Failed to fetch
>>>>> next result
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
>>>>>         at
>>>>> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1365)
>>>>>         at
>>>>> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1348)
>>>>>         at
>>>>> com.auvik.platform.triggering.statecli.savepoint.SavepointReader.run(SavepointReader.java:41)
>>>>>         at
>>>>> com.auvik.platform.triggering.statecli.savepoint.SavepointReader.main(SavepointReader.java:33)
>>>>> Caused by: java.io.IOException: Failed to fetch job execution result
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:185)
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
>>>>>         ... 5 more
>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution 
>>>>> failed.
>>>>>         at
>>>>> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>>>>>         at
>>>>> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:183)
>>>>>         ... 7 more
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.
>>>>>         at
>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>>>>>         at
>>>>> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
>>>>>         at
>>>>> java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
>>>>>         at
>>>>> java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
>>>>>         at
>>>>> java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094)
>>>>>         at
>>>>> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138)
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
>>>>>         ... 7 more
>>>>> Caused by: org.apache.flink.runtime.JobException: Recovery is
>>>>> suppressed by NoRestartBackoffTimeStrategy
>>>>>         at
>>>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
>>>>>         at
>>>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
>>>>>         at
>>>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
>>>>>         at
>>>>> jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
>>>>>         at
>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>>         at
>>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
>>>>>         at
>>>>> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>>>>>         at
>>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
>>>>>         at
>>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
>>>>>         at
>>>>> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
>>>>>         at
>>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
>>>>>         at
>>>>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
>>>>>         at
>>>>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
>>>>>         at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>>>>>         at
>>>>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>>>>>         at
>>>>> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>>>>>         at
>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>>>>>         at
>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>>>>>         at
>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>>>>>         at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
>>>>>         at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
>>>>>         at
>>>>> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
>>>>>         at
>>>>> org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
>>>>>         at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
>>>>>         at
>>>>> org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
>>>>>         at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
>>>>>         at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
>>>>>         at
>>>>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
>>>>>         at
>>>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
>>>>>         at
>>>>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
>>>>>         at
>>>>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
>>>>>         at
>>>>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
>>>>> Caused by: java.lang.RuntimeException: Record size is too large for
>>>>> CollectSinkFunction. Record size is 7462652 bytes, but max bytes per batch
>>>>> is only 2097152 bytes. Please consider increasing max bytes per batch 
>>>>> value
>>>>> by setting collect-sink.batch-size.max
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.invoke(CollectSinkFunction.java:285)
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
>>>>>         at
>>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>>>>>         at
>>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
>>>>>         at
>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>>>>>         at java.base/java.lang.Thread.run(Thread.java:829)
>>>>>
>>>>> On Mon, Jul 29, 2024 at 11:06 AM Gabor Somogyi <
>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>
>>>>>> I've double checked and I think that CollectSinkOperatorFactory is
>>>>>> initialized in DataStream.collectAsync without MAX_BATCH_SIZE
>>>>>> and SOCKET_TIMEOUT values coming from the Flink config.
>>>>>> Could you plz share the whole stacktrace to double check my
>>>>>> assumption?
>>>>>>
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 23, 2024 at 12:46 PM Salva Alcántara <
>>>>>> salcantara...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> Just to share my findings so far. Regarding tweaking the setting, it
>>>>>>> has been impossible for me to do so. So, the only way to work around 
>>>>>>> this
>>>>>>> has been to duplicate some Flink code directly to allow me to do the 
>>>>>>> tweak.
>>>>>>> More precisely, this is how my code looks like now (kudos to my dear
>>>>>>> colleague Den!):
>>>>>>>
>>>>>>> ```
>>>>>>>   private static <T> List<T> executeAndCollect(DataStream<T>
>>>>>>> dataStream, StreamExecutionEnvironment env,
>>>>>>>                                                String maxBatchSize,
>>>>>>> int limit) throws Exception {
>>>>>>>
>>>>>>>     TypeSerializer<T> serializer =
>>>>>>> dataStream.getType().createSerializer(env.getConfig());
>>>>>>>     String accumulatorName = "dataStreamCollect_" +
>>>>>>> UUID.randomUUID();
>>>>>>>
>>>>>>>     CollectSinkOperatorFactory<T> factory =
>>>>>>>         new CollectSinkOperatorFactory<>(serializer,
>>>>>>> accumulatorName, MemorySize.parse(maxBatchSize),
>>>>>>> SOCKET_TIMEOUT.defaultValue());
>>>>>>>     CollectSinkOperator<MatcherState> operator =
>>>>>>> (CollectSinkOperator<MatcherState>) factory.getOperator();
>>>>>>>     CollectResultIterator<T> iterator =
>>>>>>>         new CollectResultIterator<>(
>>>>>>>             operator.getOperatorIdFuture(),
>>>>>>>             serializer,
>>>>>>>             accumulatorName,
>>>>>>>             env.getCheckpointConfig());
>>>>>>>     CollectStreamSink<T> sink = new CollectStreamSink<>(dataStream,
>>>>>>> factory);
>>>>>>>     sink.name("Data stream collect sink");
>>>>>>>     env.addOperator(sink.getTransformation());
>>>>>>>
>>>>>>>     final JobClient jobClient = env.executeAsync("DataStream
>>>>>>> Collect");
>>>>>>>     iterator.setJobClient(jobClient);
>>>>>>>
>>>>>>>     var clientAndIterator = new ClientAndIterator<>(jobClient,
>>>>>>> iterator);
>>>>>>>     List<T> results = new ArrayList<>(limit);
>>>>>>>     while (limit > 0 && clientAndIterator.iterator.hasNext()) {
>>>>>>>       results.add(clientAndIterator.iterator.next());
>>>>>>>       limit--;
>>>>>>>     }
>>>>>>>     return results;
>>>>>>>   }
>>>>>>> ```
>>>>>>>
>>>>>>> Essentially, I'm just adding a parameter to the
>>>>>>> CollectSinkOperatorFactory constructor here:
>>>>>>> -
>>>>>>> https://github.com/apache/flink/blob/release-1.15.4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L1378
>>>>>>>
>>>>>>> This works but it's obviously inconvenient for the user. If this
>>>>>>> limitation is confirmed, Den and I will be glad to send a MR to fix 
>>>>>>> that.
>>>>>>>
>>>>>>> Makes sense?
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Salva
>>>>>>>
>>>>>>> On Mon, Jul 22, 2024 at 10:04 AM Salva Alcántara <
>>>>>>> salcantara...@gmail.com> wrote:
>>>>>>>
>>>>>>>> The same happens with this slight variation:
>>>>>>>>
>>>>>>>> ```
>>>>>>>> Configuration config = new Configuration();
>>>>>>>> config.setString("collect-sink.batch-size.max", "100mb");
>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>> env.configure(config);
>>>>>>>> SavepointReader savepoint = SavepointReader.read(env,
>>>>>>>> savepointPath, new HashMapStateBackend());
>>>>>>>> ```
>>>>>>>>
>>>>>>>> Salva
>>>>>>>>
>>>>>>>> On Mon, Jul 22, 2024 at 10:00 AM Salva Alcántara <
>>>>>>>> salcantara...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Zhanghao,
>>>>>>>>>
>>>>>>>>> Thanks for your suggestion. Unfortunately, this does not work, I
>>>>>>>>> still get the same error message:
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> Record size is too large for CollectSinkFunction. Record size is
>>>>>>>>> 9623137 bytes, but max bytes per batch is only 2097152 bytes.
>>>>>>>>> Please consider increasing max bytes per batch value by setting
>>>>>>>>> collect-sink.batch-size.max
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> The code looks like this now:
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> Configuration config = new Configuration();
>>>>>>>>> config.setString("collect-sink.batch-size.max", "10mb");
>>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>>>>>>>>> SavepointReader savepoint = SavepointReader.read(env,
>>>>>>>>> savepointPath, new HashMapStateBackend());
>>>>>>>>>
>>>>>>>>> var matcher = savepoint.readKeyedState("Raw Triggers", new
>>>>>>>>> MatcherReader());
>>>>>>>>> var matcherState = matcher.executeAndCollect(1000);
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> I have tried other ways but none has worked (the setting is always
>>>>>>>>> ignored in the end).
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>>
>>>>>>>>> Salva
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Jul 21, 2024 at 9:10 AM Zhanghao Chen <
>>>>>>>>> zhanghao.c...@outlook.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi, you could increase it as follows:
>>>>>>>>>>
>>>>>>>>>> Configuration config = new Configuration();
>>>>>>>>>> config.setString(collect-sink.batch-size.max, "10mb");
>>>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>>>>>>>>>> ------------------------------
>>>>>>>>>> *From:* Salva Alcántara <salcantara...@gmail.com>
>>>>>>>>>> *Sent:* Saturday, July 20, 2024 15:05
>>>>>>>>>> *To:* user <user@flink.apache.org>
>>>>>>>>>> *Subject:* SavepointReader: Record size is too large for
>>>>>>>>>> CollectSinkFunction
>>>>>>>>>>
>>>>>>>>>> Hi all!
>>>>>>>>>>
>>>>>>>>>> I'm trying to debug a job via inspecting its savepoints but I'm
>>>>>>>>>> getting this error message:
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>> Caused by: java.lang.RuntimeException: Record size is too large
>>>>>>>>>> for CollectSinkFunction. Record size is 9627127 bytes, but max bytes 
>>>>>>>>>> per
>>>>>>>>>> batch is only 2097152 bytes. Please consider increasing max bytes 
>>>>>>>>>> per batch
>>>>>>>>>> value by setting collect-sink.batch-size.max
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> My code looks like this:
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>>   private static void run(String savepointPath) throws Exception {
>>>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>     org.apache.flink.state.api.SavepointReader savepoint =
>>>>>>>>>>         org.apache.flink.state.api.SavepointReader.read(env,
>>>>>>>>>> savepointPath, new HashMapStateBackend());
>>>>>>>>>>
>>>>>>>>>>     var operator = savepoint.readKeyedState("uuid", new
>>>>>>>>>> MyKeyedOperatorReader());
>>>>>>>>>>     var operatorState = matcher.executeAndCollect(1000);
>>>>>>>>>>   }
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> I haven't found the way to increase the
>>>>>>>>>> `collect-sink.batch-size.max` as suggested in the error msg.
>>>>>>>>>>
>>>>>>>>>> Any help on this will be appreciated!
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>>
>>>>>>>>>> Salva
>>>>>>>>>>
>>>>>>>>>

Reply via email to