So what's next? Do you want me to do something Gabor? Regards,
Salva On Thu, Dec 5, 2024, 13:48 Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Based on this I can confirm that the maxBatchSize and socketTimeout values > are coming from default value Flink config is not considered. > > G > > > On Thu, Dec 5, 2024 at 8:20 AM Salva Alcántara <salcantara...@gmail.com> > wrote: > >> Sorry for my late reply Gabor, here you are the whole trace: >> >> SLF4J(W): No SLF4J providers were found. >> SLF4J(W): Defaulting to no-operation (NOP) logger implementation >> SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further >> details. >> SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api versions >> 1.7.x or earlier. >> SLF4J(W): Ignoring binding found at >> [jar:file:/private/var/tmp/_bazel_salva/d416ba92e8fb4416e3182f6204d8237a/external/maven/v1/https/ >> repo1.maven.org/maven2/org/apache/logging/log4j/log4j-slf4j-impl/2.17.1/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class >> ] >> SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for an >> explanation. >> WARNING: An illegal reflective access operation has occurred >> WARNING: Illegal reflective access by >> com.twitter.chill.java.ArraysAsListSerializer >> (file:/private/var/tmp/_bazel_salva/d416ba92e8fb4416e3182f6204d8237a/external/maven/v1/https/ >> repo1.maven.org/maven2/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar) >> to field java.util.Arrays$ArrayList.a >> WARNING: Please consider reporting this to the maintainers of >> com.twitter.chill.java.ArraysAsListSerializer >> WARNING: Use --illegal-access=warn to enable warnings of further illegal >> reflective access operations >> WARNING: All illegal access operations will be denied in a future release >> Exception in thread "main" java.lang.RuntimeException: Failed to fetch >> next result >> at >> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) >> at >> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) >> at >> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1365) >> at >> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1348) >> at >> com.auvik.platform.triggering.statecli.savepoint.SavepointReader.run(SavepointReader.java:41) >> at >> com.auvik.platform.triggering.statecli.savepoint.SavepointReader.main(SavepointReader.java:33) >> Caused by: java.io.IOException: Failed to fetch job execution result >> at >> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:185) >> at >> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) >> at >> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) >> ... 5 more >> Caused by: java.util.concurrent.ExecutionException: >> org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >> at >> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) >> at >> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) >> at >> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:183) >> ... 7 more >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >> execution failed. >> at >> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) >> at >> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) >> at >> java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680) >> at >> java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658) >> at >> java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094) >> at >> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138) >> at >> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) >> ... 7 more >> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed >> by NoRestartBackoffTimeStrategy >> at >> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176) >> at >> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285) >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276) >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269) >> at >> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764) >> at >> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741) >> at >> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) >> at >> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) >> at jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown >> Source) >> at >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >> at >> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) >> at >> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) >> at >> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) >> at >> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) >> at >> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) >> at >> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) >> at >> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) >> at >> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) >> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) >> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) >> at >> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) >> at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) >> at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) >> at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) >> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) >> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) >> at >> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) >> at >> org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) >> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) >> at >> org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) >> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) >> at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) >> at >> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) >> at >> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) >> at >> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) >> at >> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) >> at >> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) >> Caused by: java.lang.RuntimeException: Record size is too large for >> CollectSinkFunction. Record size is 7462652 bytes, but max bytes per batch >> is only 2097152 bytes. Please consider increasing max bytes per batch value >> by setting collect-sink.batch-size.max >> at >> org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.invoke(CollectSinkFunction.java:285) >> at >> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) >> at >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) >> at >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) >> at java.base/java.lang.Thread.run(Thread.java:829) >> >> On Mon, Jul 29, 2024 at 11:06 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> I've double checked and I think that CollectSinkOperatorFactory is >>> initialized in DataStream.collectAsync without MAX_BATCH_SIZE >>> and SOCKET_TIMEOUT values coming from the Flink config. >>> Could you plz share the whole stacktrace to double check my assumption? >>> >>> G >>> >>> >>> On Tue, Jul 23, 2024 at 12:46 PM Salva Alcántara < >>> salcantara...@gmail.com> wrote: >>> >>>> Hi all, >>>> >>>> Just to share my findings so far. Regarding tweaking the setting, it >>>> has been impossible for me to do so. So, the only way to work around this >>>> has been to duplicate some Flink code directly to allow me to do the tweak. >>>> More precisely, this is how my code looks like now (kudos to my dear >>>> colleague Den!): >>>> >>>> ``` >>>> private static <T> List<T> executeAndCollect(DataStream<T> >>>> dataStream, StreamExecutionEnvironment env, >>>> String maxBatchSize, int >>>> limit) throws Exception { >>>> >>>> TypeSerializer<T> serializer = >>>> dataStream.getType().createSerializer(env.getConfig()); >>>> String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); >>>> >>>> CollectSinkOperatorFactory<T> factory = >>>> new CollectSinkOperatorFactory<>(serializer, accumulatorName, >>>> MemorySize.parse(maxBatchSize), SOCKET_TIMEOUT.defaultValue()); >>>> CollectSinkOperator<MatcherState> operator = >>>> (CollectSinkOperator<MatcherState>) factory.getOperator(); >>>> CollectResultIterator<T> iterator = >>>> new CollectResultIterator<>( >>>> operator.getOperatorIdFuture(), >>>> serializer, >>>> accumulatorName, >>>> env.getCheckpointConfig()); >>>> CollectStreamSink<T> sink = new CollectStreamSink<>(dataStream, >>>> factory); >>>> sink.name("Data stream collect sink"); >>>> env.addOperator(sink.getTransformation()); >>>> >>>> final JobClient jobClient = env.executeAsync("DataStream Collect"); >>>> iterator.setJobClient(jobClient); >>>> >>>> var clientAndIterator = new ClientAndIterator<>(jobClient, >>>> iterator); >>>> List<T> results = new ArrayList<>(limit); >>>> while (limit > 0 && clientAndIterator.iterator.hasNext()) { >>>> results.add(clientAndIterator.iterator.next()); >>>> limit--; >>>> } >>>> return results; >>>> } >>>> ``` >>>> >>>> Essentially, I'm just adding a parameter to the >>>> CollectSinkOperatorFactory constructor here: >>>> - >>>> https://github.com/apache/flink/blob/release-1.15.4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L1378 >>>> >>>> This works but it's obviously inconvenient for the user. If this >>>> limitation is confirmed, Den and I will be glad to send a MR to fix that. >>>> >>>> Makes sense? >>>> >>>> Regards, >>>> >>>> Salva >>>> >>>> On Mon, Jul 22, 2024 at 10:04 AM Salva Alcántara < >>>> salcantara...@gmail.com> wrote: >>>> >>>>> The same happens with this slight variation: >>>>> >>>>> ``` >>>>> Configuration config = new Configuration(); >>>>> config.setString("collect-sink.batch-size.max", "100mb"); >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> env.configure(config); >>>>> SavepointReader savepoint = SavepointReader.read(env, savepointPath, >>>>> new HashMapStateBackend()); >>>>> ``` >>>>> >>>>> Salva >>>>> >>>>> On Mon, Jul 22, 2024 at 10:00 AM Salva Alcántara < >>>>> salcantara...@gmail.com> wrote: >>>>> >>>>>> Hi Zhanghao, >>>>>> >>>>>> Thanks for your suggestion. Unfortunately, this does not work, I >>>>>> still get the same error message: >>>>>> >>>>>> ``` >>>>>> Record size is too large for CollectSinkFunction. Record size is >>>>>> 9623137 bytes, but max bytes per batch is only 2097152 bytes. >>>>>> Please consider increasing max bytes per batch value by setting >>>>>> collect-sink.batch-size.max >>>>>> ``` >>>>>> >>>>>> The code looks like this now: >>>>>> >>>>>> ``` >>>>>> Configuration config = new Configuration(); >>>>>> config.setString("collect-sink.batch-size.max", "10mb"); >>>>>> StreamExecutionEnvironment env = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment(config); >>>>>> SavepointReader savepoint = SavepointReader.read(env, savepointPath, >>>>>> new HashMapStateBackend()); >>>>>> >>>>>> var matcher = savepoint.readKeyedState("Raw Triggers", new >>>>>> MatcherReader()); >>>>>> var matcherState = matcher.executeAndCollect(1000); >>>>>> ``` >>>>>> >>>>>> I have tried other ways but none has worked (the setting is always >>>>>> ignored in the end). >>>>>> >>>>>> Regards, >>>>>> >>>>>> Salva >>>>>> >>>>>> >>>>>> >>>>>> On Sun, Jul 21, 2024 at 9:10 AM Zhanghao Chen < >>>>>> zhanghao.c...@outlook.com> wrote: >>>>>> >>>>>>> Hi, you could increase it as follows: >>>>>>> >>>>>>> Configuration config = new Configuration(); >>>>>>> config.setString(collect-sink.batch-size.max, "10mb"); >>>>>>> StreamExecutionEnvironment env = >>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(config); >>>>>>> ------------------------------ >>>>>>> *From:* Salva Alcántara <salcantara...@gmail.com> >>>>>>> *Sent:* Saturday, July 20, 2024 15:05 >>>>>>> *To:* user <user@flink.apache.org> >>>>>>> *Subject:* SavepointReader: Record size is too large for >>>>>>> CollectSinkFunction >>>>>>> >>>>>>> Hi all! >>>>>>> >>>>>>> I'm trying to debug a job via inspecting its savepoints but I'm >>>>>>> getting this error message: >>>>>>> >>>>>>> ``` >>>>>>> Caused by: java.lang.RuntimeException: Record size is too large for >>>>>>> CollectSinkFunction. Record size is 9627127 bytes, but max bytes per >>>>>>> batch >>>>>>> is only 2097152 bytes. Please consider increasing max bytes per batch >>>>>>> value >>>>>>> by setting collect-sink.batch-size.max >>>>>>> ``` >>>>>>> >>>>>>> My code looks like this: >>>>>>> >>>>>>> ``` >>>>>>> private static void run(String savepointPath) throws Exception { >>>>>>> StreamExecutionEnvironment env = >>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>> org.apache.flink.state.api.SavepointReader savepoint = >>>>>>> org.apache.flink.state.api.SavepointReader.read(env, >>>>>>> savepointPath, new HashMapStateBackend()); >>>>>>> >>>>>>> var operator = savepoint.readKeyedState("uuid", new >>>>>>> MyKeyedOperatorReader()); >>>>>>> var operatorState = matcher.executeAndCollect(1000); >>>>>>> } >>>>>>> ``` >>>>>>> >>>>>>> I haven't found the way to increase the >>>>>>> `collect-sink.batch-size.max` as suggested in the error msg. >>>>>>> >>>>>>> Any help on this will be appreciated! >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> Salva >>>>>>> >>>>>>