Well, the easiest is to cherry-pick to the top of the latest distro what you build. When not possible then local snapshot build + use it in local execution can be an option.
G On Mon, Dec 9, 2024 at 6:17 PM Salva Alcántara <salcantara...@gmail.com> wrote: > Hey Gabor! What would be the easiest way for me to test the change > locally? I guess now that it has been merged I could simply bump to the > current snapshot version right? But what is the common approach for testing > a change in a local project before it gets approved/merged? > > BTW I was importing the Flink project into IntelliJ 2024.3 following this > guide: > > - > https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/ide_setup/#importing-flink > > but when I click on "Generate Sources and Update Folders" I get the > following errors: > > [INFO] Validation error: > [ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.13:check > (default) on project flink-parent: Cannot read header > [ERROR] Maven server structure problem > flink-rpc/flink-rpc-core/pom.xml (No such file or directory) > > however, the alternative command: > > mvn clean package -DskipTests > > runs fine in my terminal. > > Regards, > > Salva > > On Sun, Dec 8, 2024 at 12:06 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> It would be good to report back whether it's solved your issue, but since >> we've added automated test + I've tested it locally it's not urgent. >> >> On Sun, Dec 8, 2024 at 8:38 AM Salva Alcántara <salcantara...@gmail.com> >> wrote: >> >>> Gabor, I took a look at your PR and it looks good to me but didn't test >>> your changes locally. If still necessary, let me know and I'll find the >>> time! >>> >>> >>> On Fri, Dec 6, 2024 at 1:49 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> Salva, I've tested the code by myslelf but do you have the possibility >>>> to test this[1] fix? >>>> >>>> [1] https://github.com/apache/flink/pull/25755 >>>> >>>> G >>>> >>>> >>>> On Thu, Dec 5, 2024 at 3:15 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >>>> wrote: >>>> >>>>> I'm intended to file a jira + PR. >>>>> >>>>> G >>>>> >>>>> >>>>> On Thu, Dec 5, 2024 at 2:40 PM Salva Alcántara < >>>>> salcantara...@gmail.com> wrote: >>>>> >>>>>> So what's next? Do you want me to do something Gabor? >>>>>> >>>>>> Regards, >>>>>> >>>>>> Salva >>>>>> >>>>>> On Thu, Dec 5, 2024, 13:48 Gabor Somogyi <gabor.g.somo...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Based on this I can confirm that the maxBatchSize and socketTimeout >>>>>>> values are coming from default value Flink config is not considered. >>>>>>> >>>>>>> G >>>>>>> >>>>>>> >>>>>>> On Thu, Dec 5, 2024 at 8:20 AM Salva Alcántara < >>>>>>> salcantara...@gmail.com> wrote: >>>>>>> >>>>>>>> Sorry for my late reply Gabor, here you are the whole trace: >>>>>>>> >>>>>>>> SLF4J(W): No SLF4J providers were found. >>>>>>>> SLF4J(W): Defaulting to no-operation (NOP) logger implementation >>>>>>>> SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for >>>>>>>> further details. >>>>>>>> SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api >>>>>>>> versions 1.7.x or earlier. >>>>>>>> SLF4J(W): Ignoring binding found at >>>>>>>> [jar:file:/private/var/tmp/_bazel_salva/d416ba92e8fb4416e3182f6204d8237a/external/maven/v1/https/ >>>>>>>> repo1.maven.org/maven2/org/apache/logging/log4j/log4j-slf4j-impl/2.17.1/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class >>>>>>>> ] >>>>>>>> SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for >>>>>>>> an explanation. >>>>>>>> WARNING: An illegal reflective access operation has occurred >>>>>>>> WARNING: Illegal reflective access by >>>>>>>> com.twitter.chill.java.ArraysAsListSerializer >>>>>>>> (file:/private/var/tmp/_bazel_salva/d416ba92e8fb4416e3182f6204d8237a/external/maven/v1/https/ >>>>>>>> repo1.maven.org/maven2/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar) >>>>>>>> to field java.util.Arrays$ArrayList.a >>>>>>>> WARNING: Please consider reporting this to the maintainers of >>>>>>>> com.twitter.chill.java.ArraysAsListSerializer >>>>>>>> WARNING: Use --illegal-access=warn to enable warnings of further >>>>>>>> illegal reflective access operations >>>>>>>> WARNING: All illegal access operations will be denied in a future >>>>>>>> release >>>>>>>> Exception in thread "main" java.lang.RuntimeException: Failed to >>>>>>>> fetch next result >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1365) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1348) >>>>>>>> at >>>>>>>> com.auvik.platform.triggering.statecli.savepoint.SavepointReader.run(SavepointReader.java:41) >>>>>>>> at >>>>>>>> com.auvik.platform.triggering.statecli.savepoint.SavepointReader.main(SavepointReader.java:33) >>>>>>>> Caused by: java.io.IOException: Failed to fetch job execution result >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:185) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) >>>>>>>> ... 5 more >>>>>>>> Caused by: java.util.concurrent.ExecutionException: >>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution >>>>>>>> failed. >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:183) >>>>>>>> ... 7 more >>>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>>>>>>> Job execution failed. >>>>>>>> at >>>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680) >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658) >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) >>>>>>>> ... 7 more >>>>>>>> Caused by: org.apache.flink.runtime.JobException: Recovery is >>>>>>>> suppressed by NoRestartBackoffTimeStrategy >>>>>>>> at >>>>>>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) >>>>>>>> at >>>>>>>> jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) >>>>>>>> at >>>>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>> at >>>>>>>> java.base/java.lang.reflect.Method.invoke(Method.java:566) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) >>>>>>>> at >>>>>>>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) >>>>>>>> at >>>>>>>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) >>>>>>>> at >>>>>>>> scala.PartialFunction.applyOrElse(PartialFunction.scala:127) >>>>>>>> at >>>>>>>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) >>>>>>>> at >>>>>>>> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) >>>>>>>> at >>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) >>>>>>>> at >>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) >>>>>>>> at >>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) >>>>>>>> at >>>>>>>> org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) >>>>>>>> at >>>>>>>> org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) >>>>>>>> at >>>>>>>> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) >>>>>>>> at >>>>>>>> org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) >>>>>>>> at >>>>>>>> org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) >>>>>>>> at >>>>>>>> org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) >>>>>>>> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) >>>>>>>> at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) >>>>>>>> Caused by: java.lang.RuntimeException: Record size is too large for >>>>>>>> CollectSinkFunction. Record size is 7462652 bytes, but max bytes per >>>>>>>> batch >>>>>>>> is only 2097152 bytes. Please consider increasing max bytes per batch >>>>>>>> value >>>>>>>> by setting collect-sink.batch-size.max >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.invoke(CollectSinkFunction.java:285) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) >>>>>>>> at java.base/java.lang.Thread.run(Thread.java:829) >>>>>>>> >>>>>>>> On Mon, Jul 29, 2024 at 11:06 AM Gabor Somogyi < >>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I've double checked and I think that CollectSinkOperatorFactory is >>>>>>>>> initialized in DataStream.collectAsync without MAX_BATCH_SIZE >>>>>>>>> and SOCKET_TIMEOUT values coming from the Flink config. >>>>>>>>> Could you plz share the whole stacktrace to double check my >>>>>>>>> assumption? >>>>>>>>> >>>>>>>>> G >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jul 23, 2024 at 12:46 PM Salva Alcántara < >>>>>>>>> salcantara...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi all, >>>>>>>>>> >>>>>>>>>> Just to share my findings so far. Regarding tweaking the setting, >>>>>>>>>> it has been impossible for me to do so. So, the only way to work >>>>>>>>>> around >>>>>>>>>> this has been to duplicate some Flink code directly to allow me to >>>>>>>>>> do the >>>>>>>>>> tweak. More precisely, this is how my code looks like now (kudos to >>>>>>>>>> my dear >>>>>>>>>> colleague Den!): >>>>>>>>>> >>>>>>>>>> ``` >>>>>>>>>> private static <T> List<T> executeAndCollect(DataStream<T> >>>>>>>>>> dataStream, StreamExecutionEnvironment env, >>>>>>>>>> String >>>>>>>>>> maxBatchSize, int limit) throws Exception { >>>>>>>>>> >>>>>>>>>> TypeSerializer<T> serializer = >>>>>>>>>> dataStream.getType().createSerializer(env.getConfig()); >>>>>>>>>> String accumulatorName = "dataStreamCollect_" + >>>>>>>>>> UUID.randomUUID(); >>>>>>>>>> >>>>>>>>>> CollectSinkOperatorFactory<T> factory = >>>>>>>>>> new CollectSinkOperatorFactory<>(serializer, >>>>>>>>>> accumulatorName, MemorySize.parse(maxBatchSize), >>>>>>>>>> SOCKET_TIMEOUT.defaultValue()); >>>>>>>>>> CollectSinkOperator<MatcherState> operator = >>>>>>>>>> (CollectSinkOperator<MatcherState>) factory.getOperator(); >>>>>>>>>> CollectResultIterator<T> iterator = >>>>>>>>>> new CollectResultIterator<>( >>>>>>>>>> operator.getOperatorIdFuture(), >>>>>>>>>> serializer, >>>>>>>>>> accumulatorName, >>>>>>>>>> env.getCheckpointConfig()); >>>>>>>>>> CollectStreamSink<T> sink = new >>>>>>>>>> CollectStreamSink<>(dataStream, factory); >>>>>>>>>> sink.name("Data stream collect sink"); >>>>>>>>>> env.addOperator(sink.getTransformation()); >>>>>>>>>> >>>>>>>>>> final JobClient jobClient = env.executeAsync("DataStream >>>>>>>>>> Collect"); >>>>>>>>>> iterator.setJobClient(jobClient); >>>>>>>>>> >>>>>>>>>> var clientAndIterator = new ClientAndIterator<>(jobClient, >>>>>>>>>> iterator); >>>>>>>>>> List<T> results = new ArrayList<>(limit); >>>>>>>>>> while (limit > 0 && clientAndIterator.iterator.hasNext()) { >>>>>>>>>> results.add(clientAndIterator.iterator.next()); >>>>>>>>>> limit--; >>>>>>>>>> } >>>>>>>>>> return results; >>>>>>>>>> } >>>>>>>>>> ``` >>>>>>>>>> >>>>>>>>>> Essentially, I'm just adding a parameter to the >>>>>>>>>> CollectSinkOperatorFactory constructor here: >>>>>>>>>> - >>>>>>>>>> https://github.com/apache/flink/blob/release-1.15.4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L1378 >>>>>>>>>> >>>>>>>>>> This works but it's obviously inconvenient for the user. If this >>>>>>>>>> limitation is confirmed, Den and I will be glad to send a MR to fix >>>>>>>>>> that. >>>>>>>>>> >>>>>>>>>> Makes sense? >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> >>>>>>>>>> Salva >>>>>>>>>> >>>>>>>>>> On Mon, Jul 22, 2024 at 10:04 AM Salva Alcántara < >>>>>>>>>> salcantara...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> The same happens with this slight variation: >>>>>>>>>>> >>>>>>>>>>> ``` >>>>>>>>>>> Configuration config = new Configuration(); >>>>>>>>>>> config.setString("collect-sink.batch-size.max", "100mb"); >>>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>> env.configure(config); >>>>>>>>>>> SavepointReader savepoint = SavepointReader.read(env, >>>>>>>>>>> savepointPath, new HashMapStateBackend()); >>>>>>>>>>> ``` >>>>>>>>>>> >>>>>>>>>>> Salva >>>>>>>>>>> >>>>>>>>>>> On Mon, Jul 22, 2024 at 10:00 AM Salva Alcántara < >>>>>>>>>>> salcantara...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Zhanghao, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for your suggestion. Unfortunately, this does not work, >>>>>>>>>>>> I still get the same error message: >>>>>>>>>>>> >>>>>>>>>>>> ``` >>>>>>>>>>>> Record size is too large for CollectSinkFunction. Record size >>>>>>>>>>>> is 9623137 bytes, but max bytes per batch is only 2097152 bytes. >>>>>>>>>>>> Please consider increasing max bytes per batch value by setting >>>>>>>>>>>> collect-sink.batch-size.max >>>>>>>>>>>> ``` >>>>>>>>>>>> >>>>>>>>>>>> The code looks like this now: >>>>>>>>>>>> >>>>>>>>>>>> ``` >>>>>>>>>>>> Configuration config = new Configuration(); >>>>>>>>>>>> config.setString("collect-sink.batch-size.max", "10mb"); >>>>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(config); >>>>>>>>>>>> SavepointReader savepoint = SavepointReader.read(env, >>>>>>>>>>>> savepointPath, new HashMapStateBackend()); >>>>>>>>>>>> >>>>>>>>>>>> var matcher = savepoint.readKeyedState("Raw Triggers", new >>>>>>>>>>>> MatcherReader()); >>>>>>>>>>>> var matcherState = matcher.executeAndCollect(1000); >>>>>>>>>>>> ``` >>>>>>>>>>>> >>>>>>>>>>>> I have tried other ways but none has worked (the setting is >>>>>>>>>>>> always ignored in the end). >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>>>> Salva >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Jul 21, 2024 at 9:10 AM Zhanghao Chen < >>>>>>>>>>>> zhanghao.c...@outlook.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, you could increase it as follows: >>>>>>>>>>>>> >>>>>>>>>>>>> Configuration config = new Configuration(); >>>>>>>>>>>>> config.setString(collect-sink.batch-size.max, "10mb"); >>>>>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(config); >>>>>>>>>>>>> ------------------------------ >>>>>>>>>>>>> *From:* Salva Alcántara <salcantara...@gmail.com> >>>>>>>>>>>>> *Sent:* Saturday, July 20, 2024 15:05 >>>>>>>>>>>>> *To:* user <user@flink.apache.org> >>>>>>>>>>>>> *Subject:* SavepointReader: Record size is too large for >>>>>>>>>>>>> CollectSinkFunction >>>>>>>>>>>>> >>>>>>>>>>>>> Hi all! >>>>>>>>>>>>> >>>>>>>>>>>>> I'm trying to debug a job via inspecting its savepoints but >>>>>>>>>>>>> I'm getting this error message: >>>>>>>>>>>>> >>>>>>>>>>>>> ``` >>>>>>>>>>>>> Caused by: java.lang.RuntimeException: Record size is too >>>>>>>>>>>>> large for CollectSinkFunction. Record size is 9627127 bytes, but >>>>>>>>>>>>> max bytes >>>>>>>>>>>>> per batch is only 2097152 bytes. Please consider increasing max >>>>>>>>>>>>> bytes per >>>>>>>>>>>>> batch value by setting collect-sink.batch-size.max >>>>>>>>>>>>> ``` >>>>>>>>>>>>> >>>>>>>>>>>>> My code looks like this: >>>>>>>>>>>>> >>>>>>>>>>>>> ``` >>>>>>>>>>>>> private static void run(String savepointPath) throws >>>>>>>>>>>>> Exception { >>>>>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>> org.apache.flink.state.api.SavepointReader savepoint = >>>>>>>>>>>>> org.apache.flink.state.api.SavepointReader.read(env, >>>>>>>>>>>>> savepointPath, new HashMapStateBackend()); >>>>>>>>>>>>> >>>>>>>>>>>>> var operator = savepoint.readKeyedState("uuid", new >>>>>>>>>>>>> MyKeyedOperatorReader()); >>>>>>>>>>>>> var operatorState = matcher.executeAndCollect(1000); >>>>>>>>>>>>> } >>>>>>>>>>>>> ``` >>>>>>>>>>>>> >>>>>>>>>>>>> I haven't found the way to increase the >>>>>>>>>>>>> `collect-sink.batch-size.max` as suggested in the error msg. >>>>>>>>>>>>> >>>>>>>>>>>>> Any help on this will be appreciated! >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> >>>>>>>>>>>>> Salva >>>>>>>>>>>>> >>>>>>>>>>>>