I'm intended to file a jira + PR.

G


On Thu, Dec 5, 2024 at 2:40 PM Salva Alcántara <salcantara...@gmail.com>
wrote:

> So what's next? Do you want me to do something Gabor?
>
> Regards,
>
> Salva
>
> On Thu, Dec 5, 2024, 13:48 Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> Based on this I can confirm that the maxBatchSize and socketTimeout
>> values are coming from default value Flink config is not considered.
>>
>> G
>>
>>
>> On Thu, Dec 5, 2024 at 8:20 AM Salva Alcántara <salcantara...@gmail.com>
>> wrote:
>>
>>> Sorry for my late reply Gabor, here you are the whole trace:
>>>
>>> SLF4J(W): No SLF4J providers were found.
>>> SLF4J(W): Defaulting to no-operation (NOP) logger implementation
>>> SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further
>>> details.
>>> SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api
>>> versions 1.7.x or earlier.
>>> SLF4J(W): Ignoring binding found at
>>> [jar:file:/private/var/tmp/_bazel_salva/d416ba92e8fb4416e3182f6204d8237a/external/maven/v1/https/
>>> repo1.maven.org/maven2/org/apache/logging/log4j/log4j-slf4j-impl/2.17.1/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class
>>> ]
>>> SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for an
>>> explanation.
>>> WARNING: An illegal reflective access operation has occurred
>>> WARNING: Illegal reflective access by
>>> com.twitter.chill.java.ArraysAsListSerializer
>>> (file:/private/var/tmp/_bazel_salva/d416ba92e8fb4416e3182f6204d8237a/external/maven/v1/https/
>>> repo1.maven.org/maven2/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar)
>>> to field java.util.Arrays$ArrayList.a
>>> WARNING: Please consider reporting this to the maintainers of
>>> com.twitter.chill.java.ArraysAsListSerializer
>>> WARNING: Use --illegal-access=warn to enable warnings of further illegal
>>> reflective access operations
>>> WARNING: All illegal access operations will be denied in a future release
>>> Exception in thread "main" java.lang.RuntimeException: Failed to fetch
>>> next result
>>>         at
>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
>>>         at
>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
>>>         at
>>> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1365)
>>>         at
>>> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1348)
>>>         at
>>> com.auvik.platform.triggering.statecli.savepoint.SavepointReader.run(SavepointReader.java:41)
>>>         at
>>> com.auvik.platform.triggering.statecli.savepoint.SavepointReader.main(SavepointReader.java:33)
>>> Caused by: java.io.IOException: Failed to fetch job execution result
>>>         at
>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:185)
>>>         at
>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
>>>         at
>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
>>>         ... 5 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>>         at
>>> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>>>         at
>>> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
>>>         at
>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:183)
>>>         ... 7 more
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>> execution failed.
>>>         at
>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>>>         at
>>> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
>>>         at
>>> java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
>>>         at
>>> java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
>>>         at
>>> java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094)
>>>         at
>>> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138)
>>>         at
>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
>>>         ... 7 more
>>> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
>>> by NoRestartBackoffTimeStrategy
>>>         at
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
>>>         at
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
>>>         at
>>> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
>>>         at
>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
>>>         at
>>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
>>>         at
>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
>>>         at jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown
>>> Source)
>>>         at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>         at
>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
>>>         at
>>> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>>>         at
>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
>>>         at
>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
>>>         at
>>> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
>>>         at
>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
>>>         at
>>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
>>>         at
>>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
>>>         at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>>>         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>>>         at
>>> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>>>         at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>>>         at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>>>         at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>>>         at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
>>>         at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
>>>         at
>>> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
>>>         at
>>> org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
>>>         at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
>>>         at
>>> org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
>>>         at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
>>>         at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
>>>         at
>>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
>>>         at
>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
>>>         at
>>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
>>>         at
>>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
>>>         at
>>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
>>> Caused by: java.lang.RuntimeException: Record size is too large for
>>> CollectSinkFunction. Record size is 7462652 bytes, but max bytes per batch
>>> is only 2097152 bytes. Please consider increasing max bytes per batch value
>>> by setting collect-sink.batch-size.max
>>>         at
>>> org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.invoke(CollectSinkFunction.java:285)
>>>         at
>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
>>>         at
>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
>>>         at
>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
>>>         at
>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>>>         at
>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
>>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>>>         at java.base/java.lang.Thread.run(Thread.java:829)
>>>
>>> On Mon, Jul 29, 2024 at 11:06 AM Gabor Somogyi <
>>> gabor.g.somo...@gmail.com> wrote:
>>>
>>>> I've double checked and I think that CollectSinkOperatorFactory is
>>>> initialized in DataStream.collectAsync without MAX_BATCH_SIZE
>>>> and SOCKET_TIMEOUT values coming from the Flink config.
>>>> Could you plz share the whole stacktrace to double check my assumption?
>>>>
>>>> G
>>>>
>>>>
>>>> On Tue, Jul 23, 2024 at 12:46 PM Salva Alcántara <
>>>> salcantara...@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> Just to share my findings so far. Regarding tweaking the setting, it
>>>>> has been impossible for me to do so. So, the only way to work around this
>>>>> has been to duplicate some Flink code directly to allow me to do the 
>>>>> tweak.
>>>>> More precisely, this is how my code looks like now (kudos to my dear
>>>>> colleague Den!):
>>>>>
>>>>> ```
>>>>>   private static <T> List<T> executeAndCollect(DataStream<T>
>>>>> dataStream, StreamExecutionEnvironment env,
>>>>>                                                String maxBatchSize,
>>>>> int limit) throws Exception {
>>>>>
>>>>>     TypeSerializer<T> serializer =
>>>>> dataStream.getType().createSerializer(env.getConfig());
>>>>>     String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
>>>>>
>>>>>     CollectSinkOperatorFactory<T> factory =
>>>>>         new CollectSinkOperatorFactory<>(serializer, accumulatorName,
>>>>> MemorySize.parse(maxBatchSize), SOCKET_TIMEOUT.defaultValue());
>>>>>     CollectSinkOperator<MatcherState> operator =
>>>>> (CollectSinkOperator<MatcherState>) factory.getOperator();
>>>>>     CollectResultIterator<T> iterator =
>>>>>         new CollectResultIterator<>(
>>>>>             operator.getOperatorIdFuture(),
>>>>>             serializer,
>>>>>             accumulatorName,
>>>>>             env.getCheckpointConfig());
>>>>>     CollectStreamSink<T> sink = new CollectStreamSink<>(dataStream,
>>>>> factory);
>>>>>     sink.name("Data stream collect sink");
>>>>>     env.addOperator(sink.getTransformation());
>>>>>
>>>>>     final JobClient jobClient = env.executeAsync("DataStream Collect");
>>>>>     iterator.setJobClient(jobClient);
>>>>>
>>>>>     var clientAndIterator = new ClientAndIterator<>(jobClient,
>>>>> iterator);
>>>>>     List<T> results = new ArrayList<>(limit);
>>>>>     while (limit > 0 && clientAndIterator.iterator.hasNext()) {
>>>>>       results.add(clientAndIterator.iterator.next());
>>>>>       limit--;
>>>>>     }
>>>>>     return results;
>>>>>   }
>>>>> ```
>>>>>
>>>>> Essentially, I'm just adding a parameter to the
>>>>> CollectSinkOperatorFactory constructor here:
>>>>> -
>>>>> https://github.com/apache/flink/blob/release-1.15.4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L1378
>>>>>
>>>>> This works but it's obviously inconvenient for the user. If this
>>>>> limitation is confirmed, Den and I will be glad to send a MR to fix that.
>>>>>
>>>>> Makes sense?
>>>>>
>>>>> Regards,
>>>>>
>>>>> Salva
>>>>>
>>>>> On Mon, Jul 22, 2024 at 10:04 AM Salva Alcántara <
>>>>> salcantara...@gmail.com> wrote:
>>>>>
>>>>>> The same happens with this slight variation:
>>>>>>
>>>>>> ```
>>>>>> Configuration config = new Configuration();
>>>>>> config.setString("collect-sink.batch-size.max", "100mb");
>>>>>> StreamExecutionEnvironment env =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>> env.configure(config);
>>>>>> SavepointReader savepoint = SavepointReader.read(env, savepointPath,
>>>>>> new HashMapStateBackend());
>>>>>> ```
>>>>>>
>>>>>> Salva
>>>>>>
>>>>>> On Mon, Jul 22, 2024 at 10:00 AM Salva Alcántara <
>>>>>> salcantara...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Zhanghao,
>>>>>>>
>>>>>>> Thanks for your suggestion. Unfortunately, this does not work, I
>>>>>>> still get the same error message:
>>>>>>>
>>>>>>> ```
>>>>>>> Record size is too large for CollectSinkFunction. Record size is
>>>>>>> 9623137 bytes, but max bytes per batch is only 2097152 bytes.
>>>>>>> Please consider increasing max bytes per batch value by setting
>>>>>>> collect-sink.batch-size.max
>>>>>>> ```
>>>>>>>
>>>>>>> The code looks like this now:
>>>>>>>
>>>>>>> ```
>>>>>>> Configuration config = new Configuration();
>>>>>>> config.setString("collect-sink.batch-size.max", "10mb");
>>>>>>> StreamExecutionEnvironment env =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>>>>>>> SavepointReader savepoint = SavepointReader.read(env, savepointPath,
>>>>>>> new HashMapStateBackend());
>>>>>>>
>>>>>>> var matcher = savepoint.readKeyedState("Raw Triggers", new
>>>>>>> MatcherReader());
>>>>>>> var matcherState = matcher.executeAndCollect(1000);
>>>>>>> ```
>>>>>>>
>>>>>>> I have tried other ways but none has worked (the setting is always
>>>>>>> ignored in the end).
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Salva
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Jul 21, 2024 at 9:10 AM Zhanghao Chen <
>>>>>>> zhanghao.c...@outlook.com> wrote:
>>>>>>>
>>>>>>>> Hi, you could increase it as follows:
>>>>>>>>
>>>>>>>> Configuration config = new Configuration();
>>>>>>>> config.setString(collect-sink.batch-size.max, "10mb");
>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>>>>>>>> ------------------------------
>>>>>>>> *From:* Salva Alcántara <salcantara...@gmail.com>
>>>>>>>> *Sent:* Saturday, July 20, 2024 15:05
>>>>>>>> *To:* user <user@flink.apache.org>
>>>>>>>> *Subject:* SavepointReader: Record size is too large for
>>>>>>>> CollectSinkFunction
>>>>>>>>
>>>>>>>> Hi all!
>>>>>>>>
>>>>>>>> I'm trying to debug a job via inspecting its savepoints but I'm
>>>>>>>> getting this error message:
>>>>>>>>
>>>>>>>> ```
>>>>>>>> Caused by: java.lang.RuntimeException: Record size is too large for
>>>>>>>> CollectSinkFunction. Record size is 9627127 bytes, but max bytes per 
>>>>>>>> batch
>>>>>>>> is only 2097152 bytes. Please consider increasing max bytes per batch 
>>>>>>>> value
>>>>>>>> by setting collect-sink.batch-size.max
>>>>>>>> ```
>>>>>>>>
>>>>>>>> My code looks like this:
>>>>>>>>
>>>>>>>> ```
>>>>>>>>   private static void run(String savepointPath) throws Exception {
>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>     org.apache.flink.state.api.SavepointReader savepoint =
>>>>>>>>         org.apache.flink.state.api.SavepointReader.read(env,
>>>>>>>> savepointPath, new HashMapStateBackend());
>>>>>>>>
>>>>>>>>     var operator = savepoint.readKeyedState("uuid", new
>>>>>>>> MyKeyedOperatorReader());
>>>>>>>>     var operatorState = matcher.executeAndCollect(1000);
>>>>>>>>   }
>>>>>>>> ```
>>>>>>>>
>>>>>>>> I haven't found the way to increase the
>>>>>>>> `collect-sink.batch-size.max` as suggested in the error msg.
>>>>>>>>
>>>>>>>> Any help on this will be appreciated!
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Salva
>>>>>>>>
>>>>>>>

Reply via email to