Sorry for my late reply Gabor, here you are the whole trace: SLF4J(W): No SLF4J providers were found. SLF4J(W): Defaulting to no-operation (NOP) logger implementation SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details. SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier. SLF4J(W): Ignoring binding found at [jar:file:/private/var/tmp/_bazel_salva/d416ba92e8fb4416e3182f6204d8237a/external/maven/v1/https/ repo1.maven.org/maven2/org/apache/logging/log4j/log4j-slf4j-impl/2.17.1/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class ] SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for an explanation. WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by com.twitter.chill.java.ArraysAsListSerializer (file:/private/var/tmp/_bazel_salva/d416ba92e8fb4416e3182f6204d8237a/external/maven/v1/https/ repo1.maven.org/maven2/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar) to field java.util.Arrays$ArrayList.a WARNING: Please consider reporting this to the maintainers of com.twitter.chill.java.ArraysAsListSerializer WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release Exception in thread "main" java.lang.RuntimeException: Failed to fetch next result at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) at org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1365) at org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1348) at com.auvik.platform.triggering.statecli.savepoint.SavepointReader.run(SavepointReader.java:41) at com.auvik.platform.triggering.statecli.savepoint.SavepointReader.main(SavepointReader.java:33) Caused by: java.io.IOException: Failed to fetch job execution result at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:185) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ... 5 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:183) ... 7 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680) at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658) at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) ... 7 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276) at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269) at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) at jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: java.lang.RuntimeException: Record size is too large for CollectSinkFunction. Record size is 7462652 bytes, but max bytes per batch is only 2097152 bytes. Please consider increasing max bytes per batch value by setting collect-sink.batch-size.max at org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.invoke(CollectSinkFunction.java:285) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829)
On Mon, Jul 29, 2024 at 11:06 AM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > I've double checked and I think that CollectSinkOperatorFactory is > initialized in DataStream.collectAsync without MAX_BATCH_SIZE > and SOCKET_TIMEOUT values coming from the Flink config. > Could you plz share the whole stacktrace to double check my assumption? > > G > > > On Tue, Jul 23, 2024 at 12:46 PM Salva Alcántara <salcantara...@gmail.com> > wrote: > >> Hi all, >> >> Just to share my findings so far. Regarding tweaking the setting, it has >> been impossible for me to do so. So, the only way to work around this has >> been to duplicate some Flink code directly to allow me to do the tweak. >> More precisely, this is how my code looks like now (kudos to my dear >> colleague Den!): >> >> ``` >> private static <T> List<T> executeAndCollect(DataStream<T> dataStream, >> StreamExecutionEnvironment env, >> String maxBatchSize, int >> limit) throws Exception { >> >> TypeSerializer<T> serializer = >> dataStream.getType().createSerializer(env.getConfig()); >> String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); >> >> CollectSinkOperatorFactory<T> factory = >> new CollectSinkOperatorFactory<>(serializer, accumulatorName, >> MemorySize.parse(maxBatchSize), SOCKET_TIMEOUT.defaultValue()); >> CollectSinkOperator<MatcherState> operator = >> (CollectSinkOperator<MatcherState>) factory.getOperator(); >> CollectResultIterator<T> iterator = >> new CollectResultIterator<>( >> operator.getOperatorIdFuture(), >> serializer, >> accumulatorName, >> env.getCheckpointConfig()); >> CollectStreamSink<T> sink = new CollectStreamSink<>(dataStream, >> factory); >> sink.name("Data stream collect sink"); >> env.addOperator(sink.getTransformation()); >> >> final JobClient jobClient = env.executeAsync("DataStream Collect"); >> iterator.setJobClient(jobClient); >> >> var clientAndIterator = new ClientAndIterator<>(jobClient, iterator); >> List<T> results = new ArrayList<>(limit); >> while (limit > 0 && clientAndIterator.iterator.hasNext()) { >> results.add(clientAndIterator.iterator.next()); >> limit--; >> } >> return results; >> } >> ``` >> >> Essentially, I'm just adding a parameter to the >> CollectSinkOperatorFactory constructor here: >> - >> https://github.com/apache/flink/blob/release-1.15.4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L1378 >> >> This works but it's obviously inconvenient for the user. If this >> limitation is confirmed, Den and I will be glad to send a MR to fix that. >> >> Makes sense? >> >> Regards, >> >> Salva >> >> On Mon, Jul 22, 2024 at 10:04 AM Salva Alcántara <salcantara...@gmail.com> >> wrote: >> >>> The same happens with this slight variation: >>> >>> ``` >>> Configuration config = new Configuration(); >>> config.setString("collect-sink.batch-size.max", "100mb"); >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.configure(config); >>> SavepointReader savepoint = SavepointReader.read(env, savepointPath, new >>> HashMapStateBackend()); >>> ``` >>> >>> Salva >>> >>> On Mon, Jul 22, 2024 at 10:00 AM Salva Alcántara < >>> salcantara...@gmail.com> wrote: >>> >>>> Hi Zhanghao, >>>> >>>> Thanks for your suggestion. Unfortunately, this does not work, I still >>>> get the same error message: >>>> >>>> ``` >>>> Record size is too large for CollectSinkFunction. Record size is >>>> 9623137 bytes, but max bytes per batch is only 2097152 bytes. >>>> Please consider increasing max bytes per batch value by setting >>>> collect-sink.batch-size.max >>>> ``` >>>> >>>> The code looks like this now: >>>> >>>> ``` >>>> Configuration config = new Configuration(); >>>> config.setString("collect-sink.batch-size.max", "10mb"); >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(config); >>>> SavepointReader savepoint = SavepointReader.read(env, savepointPath, >>>> new HashMapStateBackend()); >>>> >>>> var matcher = savepoint.readKeyedState("Raw Triggers", new >>>> MatcherReader()); >>>> var matcherState = matcher.executeAndCollect(1000); >>>> ``` >>>> >>>> I have tried other ways but none has worked (the setting is always >>>> ignored in the end). >>>> >>>> Regards, >>>> >>>> Salva >>>> >>>> >>>> >>>> On Sun, Jul 21, 2024 at 9:10 AM Zhanghao Chen < >>>> zhanghao.c...@outlook.com> wrote: >>>> >>>>> Hi, you could increase it as follows: >>>>> >>>>> Configuration config = new Configuration(); >>>>> config.setString(collect-sink.batch-size.max, "10mb"); >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(config); >>>>> ------------------------------ >>>>> *From:* Salva Alcántara <salcantara...@gmail.com> >>>>> *Sent:* Saturday, July 20, 2024 15:05 >>>>> *To:* user <user@flink.apache.org> >>>>> *Subject:* SavepointReader: Record size is too large for >>>>> CollectSinkFunction >>>>> >>>>> Hi all! >>>>> >>>>> I'm trying to debug a job via inspecting its savepoints but I'm >>>>> getting this error message: >>>>> >>>>> ``` >>>>> Caused by: java.lang.RuntimeException: Record size is too large for >>>>> CollectSinkFunction. Record size is 9627127 bytes, but max bytes per batch >>>>> is only 2097152 bytes. Please consider increasing max bytes per batch >>>>> value >>>>> by setting collect-sink.batch-size.max >>>>> ``` >>>>> >>>>> My code looks like this: >>>>> >>>>> ``` >>>>> private static void run(String savepointPath) throws Exception { >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> org.apache.flink.state.api.SavepointReader savepoint = >>>>> org.apache.flink.state.api.SavepointReader.read(env, >>>>> savepointPath, new HashMapStateBackend()); >>>>> >>>>> var operator = savepoint.readKeyedState("uuid", new >>>>> MyKeyedOperatorReader()); >>>>> var operatorState = matcher.executeAndCollect(1000); >>>>> } >>>>> ``` >>>>> >>>>> I haven't found the way to increase the `collect-sink.batch-size.max` >>>>> as suggested in the error msg. >>>>> >>>>> Any help on this will be appreciated! >>>>> >>>>> Regards, >>>>> >>>>> Salva >>>>> >>>>