Well, the easiest is to cherry-pick to the top of the latest distro what
you build. When not possible then local snapshot build + use it in local
execution can be an option.

G

On Mon, Dec 9, 2024 at 6:17 PM Salva Alcántara <salcantara...@gmail.com>
wrote:

> Hey Gabor! What would be the easiest way for me to test the change
> locally? I guess now that it has been merged I could simply bump to the
> current snapshot version right? But what is the common approach for testing
> a change in a local project before it gets approved/merged?
>
> BTW I was importing the Flink project into IntelliJ 2024.3 following this
> guide:
>
> -
> https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/ide_setup/#importing-flink
>
> but when I click on "Generate Sources and Update Folders" I get the
> following errors:
>
> [INFO] Validation error:
> [ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.13:check
> (default) on project flink-parent: Cannot read header
> [ERROR] Maven server structure problem
> flink-rpc/flink-rpc-core/pom.xml (No such file or directory)
>
> however, the alternative command:
>
> mvn clean package -DskipTests
>
> runs fine in my terminal.
>
> Regards,
>
> Salva
>
> On Sun, Dec 8, 2024 at 12:06 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> It would be good to report back whether it's solved your issue, but since
>> we've added automated test + I've tested it locally it's not urgent.
>>
>> On Sun, Dec 8, 2024 at 8:38 AM Salva Alcántara <salcantara...@gmail.com>
>> wrote:
>>
>>> Gabor, I took a look at your PR and it looks good to me but didn't test
>>> your changes locally. If still necessary, let me know and I'll find the
>>> time!
>>>
>>>
>>> On Fri, Dec 6, 2024 at 1:49 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Salva, I've tested the code by myslelf but do you have the possibility
>>>> to test this[1] fix?
>>>>
>>>> [1] https://github.com/apache/flink/pull/25755
>>>>
>>>> G
>>>>
>>>>
>>>> On Thu, Dec 5, 2024 at 3:15 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm intended to file a jira + PR.
>>>>>
>>>>> G
>>>>>
>>>>>
>>>>> On Thu, Dec 5, 2024 at 2:40 PM Salva Alcántara <
>>>>> salcantara...@gmail.com> wrote:
>>>>>
>>>>>> So what's next? Do you want me to do something Gabor?
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Salva
>>>>>>
>>>>>> On Thu, Dec 5, 2024, 13:48 Gabor Somogyi <gabor.g.somo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Based on this I can confirm that the maxBatchSize and socketTimeout
>>>>>>> values are coming from default value Flink config is not considered.
>>>>>>>
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Dec 5, 2024 at 8:20 AM Salva Alcántara <
>>>>>>> salcantara...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Sorry for my late reply Gabor, here you are the whole trace:
>>>>>>>>
>>>>>>>> SLF4J(W): No SLF4J providers were found.
>>>>>>>> SLF4J(W): Defaulting to no-operation (NOP) logger implementation
>>>>>>>> SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for
>>>>>>>> further details.
>>>>>>>> SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api
>>>>>>>> versions 1.7.x or earlier.
>>>>>>>> SLF4J(W): Ignoring binding found at
>>>>>>>> [jar:file:/private/var/tmp/_bazel_salva/d416ba92e8fb4416e3182f6204d8237a/external/maven/v1/https/
>>>>>>>> repo1.maven.org/maven2/org/apache/logging/log4j/log4j-slf4j-impl/2.17.1/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class
>>>>>>>> ]
>>>>>>>> SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for
>>>>>>>> an explanation.
>>>>>>>> WARNING: An illegal reflective access operation has occurred
>>>>>>>> WARNING: Illegal reflective access by
>>>>>>>> com.twitter.chill.java.ArraysAsListSerializer
>>>>>>>> (file:/private/var/tmp/_bazel_salva/d416ba92e8fb4416e3182f6204d8237a/external/maven/v1/https/
>>>>>>>> repo1.maven.org/maven2/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar)
>>>>>>>> to field java.util.Arrays$ArrayList.a
>>>>>>>> WARNING: Please consider reporting this to the maintainers of
>>>>>>>> com.twitter.chill.java.ArraysAsListSerializer
>>>>>>>> WARNING: Use --illegal-access=warn to enable warnings of further
>>>>>>>> illegal reflective access operations
>>>>>>>> WARNING: All illegal access operations will be denied in a future
>>>>>>>> release
>>>>>>>> Exception in thread "main" java.lang.RuntimeException: Failed to
>>>>>>>> fetch next result
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1365)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1348)
>>>>>>>>         at
>>>>>>>> com.auvik.platform.triggering.statecli.savepoint.SavepointReader.run(SavepointReader.java:41)
>>>>>>>>         at
>>>>>>>> com.auvik.platform.triggering.statecli.savepoint.SavepointReader.main(SavepointReader.java:33)
>>>>>>>> Caused by: java.io.IOException: Failed to fetch job execution result
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:185)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
>>>>>>>>         ... 5 more
>>>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution 
>>>>>>>> failed.
>>>>>>>>         at
>>>>>>>> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>>>>>>>>         at
>>>>>>>> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:183)
>>>>>>>>         ... 7 more
>>>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>>>> Job execution failed.
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
>>>>>>>>         at
>>>>>>>> java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
>>>>>>>>         at
>>>>>>>> java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
>>>>>>>>         at
>>>>>>>> java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
>>>>>>>>         ... 7 more
>>>>>>>> Caused by: org.apache.flink.runtime.JobException: Recovery is
>>>>>>>> suppressed by NoRestartBackoffTimeStrategy
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
>>>>>>>>         at
>>>>>>>> jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
>>>>>>>>         at
>>>>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>         at
>>>>>>>> java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
>>>>>>>>         at
>>>>>>>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
>>>>>>>>         at
>>>>>>>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
>>>>>>>>         at
>>>>>>>> scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>>>>>>>>         at
>>>>>>>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>>>>>>>>         at
>>>>>>>> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>>>>>>>>         at
>>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>>>>>>>>         at
>>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>>>>>>>>         at
>>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>>>>>>>>         at
>>>>>>>> org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
>>>>>>>>         at
>>>>>>>> org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
>>>>>>>>         at
>>>>>>>> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
>>>>>>>>         at
>>>>>>>> org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
>>>>>>>>         at
>>>>>>>> org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
>>>>>>>>         at
>>>>>>>> org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
>>>>>>>>         at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
>>>>>>>>         at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
>>>>>>>>         at
>>>>>>>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
>>>>>>>>         at
>>>>>>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
>>>>>>>>         at
>>>>>>>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
>>>>>>>>         at
>>>>>>>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
>>>>>>>>         at
>>>>>>>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
>>>>>>>> Caused by: java.lang.RuntimeException: Record size is too large for
>>>>>>>> CollectSinkFunction. Record size is 7462652 bytes, but max bytes per 
>>>>>>>> batch
>>>>>>>> is only 2097152 bytes. Please consider increasing max bytes per batch 
>>>>>>>> value
>>>>>>>> by setting collect-sink.batch-size.max
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.invoke(CollectSinkFunction.java:285)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
>>>>>>>>         at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>>>>>>>>         at java.base/java.lang.Thread.run(Thread.java:829)
>>>>>>>>
>>>>>>>> On Mon, Jul 29, 2024 at 11:06 AM Gabor Somogyi <
>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I've double checked and I think that CollectSinkOperatorFactory is
>>>>>>>>> initialized in DataStream.collectAsync without MAX_BATCH_SIZE
>>>>>>>>> and SOCKET_TIMEOUT values coming from the Flink config.
>>>>>>>>> Could you plz share the whole stacktrace to double check my
>>>>>>>>> assumption?
>>>>>>>>>
>>>>>>>>> G
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jul 23, 2024 at 12:46 PM Salva Alcántara <
>>>>>>>>> salcantara...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> Just to share my findings so far. Regarding tweaking the setting,
>>>>>>>>>> it has been impossible for me to do so. So, the only way to work 
>>>>>>>>>> around
>>>>>>>>>> this has been to duplicate some Flink code directly to allow me to 
>>>>>>>>>> do the
>>>>>>>>>> tweak. More precisely, this is how my code looks like now (kudos to 
>>>>>>>>>> my dear
>>>>>>>>>> colleague Den!):
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>>   private static <T> List<T> executeAndCollect(DataStream<T>
>>>>>>>>>> dataStream, StreamExecutionEnvironment env,
>>>>>>>>>>                                                String
>>>>>>>>>> maxBatchSize, int limit) throws Exception {
>>>>>>>>>>
>>>>>>>>>>     TypeSerializer<T> serializer =
>>>>>>>>>> dataStream.getType().createSerializer(env.getConfig());
>>>>>>>>>>     String accumulatorName = "dataStreamCollect_" +
>>>>>>>>>> UUID.randomUUID();
>>>>>>>>>>
>>>>>>>>>>     CollectSinkOperatorFactory<T> factory =
>>>>>>>>>>         new CollectSinkOperatorFactory<>(serializer,
>>>>>>>>>> accumulatorName, MemorySize.parse(maxBatchSize),
>>>>>>>>>> SOCKET_TIMEOUT.defaultValue());
>>>>>>>>>>     CollectSinkOperator<MatcherState> operator =
>>>>>>>>>> (CollectSinkOperator<MatcherState>) factory.getOperator();
>>>>>>>>>>     CollectResultIterator<T> iterator =
>>>>>>>>>>         new CollectResultIterator<>(
>>>>>>>>>>             operator.getOperatorIdFuture(),
>>>>>>>>>>             serializer,
>>>>>>>>>>             accumulatorName,
>>>>>>>>>>             env.getCheckpointConfig());
>>>>>>>>>>     CollectStreamSink<T> sink = new
>>>>>>>>>> CollectStreamSink<>(dataStream, factory);
>>>>>>>>>>     sink.name("Data stream collect sink");
>>>>>>>>>>     env.addOperator(sink.getTransformation());
>>>>>>>>>>
>>>>>>>>>>     final JobClient jobClient = env.executeAsync("DataStream
>>>>>>>>>> Collect");
>>>>>>>>>>     iterator.setJobClient(jobClient);
>>>>>>>>>>
>>>>>>>>>>     var clientAndIterator = new ClientAndIterator<>(jobClient,
>>>>>>>>>> iterator);
>>>>>>>>>>     List<T> results = new ArrayList<>(limit);
>>>>>>>>>>     while (limit > 0 && clientAndIterator.iterator.hasNext()) {
>>>>>>>>>>       results.add(clientAndIterator.iterator.next());
>>>>>>>>>>       limit--;
>>>>>>>>>>     }
>>>>>>>>>>     return results;
>>>>>>>>>>   }
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> Essentially, I'm just adding a parameter to the
>>>>>>>>>> CollectSinkOperatorFactory constructor here:
>>>>>>>>>> -
>>>>>>>>>> https://github.com/apache/flink/blob/release-1.15.4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L1378
>>>>>>>>>>
>>>>>>>>>> This works but it's obviously inconvenient for the user. If this
>>>>>>>>>> limitation is confirmed, Den and I will be glad to send a MR to fix 
>>>>>>>>>> that.
>>>>>>>>>>
>>>>>>>>>> Makes sense?
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>>
>>>>>>>>>> Salva
>>>>>>>>>>
>>>>>>>>>> On Mon, Jul 22, 2024 at 10:04 AM Salva Alcántara <
>>>>>>>>>> salcantara...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> The same happens with this slight variation:
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>> Configuration config = new Configuration();
>>>>>>>>>>> config.setString("collect-sink.batch-size.max", "100mb");
>>>>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>> env.configure(config);
>>>>>>>>>>> SavepointReader savepoint = SavepointReader.read(env,
>>>>>>>>>>> savepointPath, new HashMapStateBackend());
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>>> Salva
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jul 22, 2024 at 10:00 AM Salva Alcántara <
>>>>>>>>>>> salcantara...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Zhanghao,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your suggestion. Unfortunately, this does not work,
>>>>>>>>>>>> I still get the same error message:
>>>>>>>>>>>>
>>>>>>>>>>>> ```
>>>>>>>>>>>> Record size is too large for CollectSinkFunction. Record size
>>>>>>>>>>>> is 9623137 bytes, but max bytes per batch is only 2097152 bytes.
>>>>>>>>>>>> Please consider increasing max bytes per batch value by setting
>>>>>>>>>>>> collect-sink.batch-size.max
>>>>>>>>>>>> ```
>>>>>>>>>>>>
>>>>>>>>>>>> The code looks like this now:
>>>>>>>>>>>>
>>>>>>>>>>>> ```
>>>>>>>>>>>> Configuration config = new Configuration();
>>>>>>>>>>>> config.setString("collect-sink.batch-size.max", "10mb");
>>>>>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>>>>>>>>>>>> SavepointReader savepoint = SavepointReader.read(env,
>>>>>>>>>>>> savepointPath, new HashMapStateBackend());
>>>>>>>>>>>>
>>>>>>>>>>>> var matcher = savepoint.readKeyedState("Raw Triggers", new
>>>>>>>>>>>> MatcherReader());
>>>>>>>>>>>> var matcherState = matcher.executeAndCollect(1000);
>>>>>>>>>>>> ```
>>>>>>>>>>>>
>>>>>>>>>>>> I have tried other ways but none has worked (the setting is
>>>>>>>>>>>> always ignored in the end).
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Salva
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Jul 21, 2024 at 9:10 AM Zhanghao Chen <
>>>>>>>>>>>> zhanghao.c...@outlook.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi, you could increase it as follows:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Configuration config = new Configuration();
>>>>>>>>>>>>> config.setString(collect-sink.batch-size.max, "10mb");
>>>>>>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>>>>>>>>>>>>> ------------------------------
>>>>>>>>>>>>> *From:* Salva Alcántara <salcantara...@gmail.com>
>>>>>>>>>>>>> *Sent:* Saturday, July 20, 2024 15:05
>>>>>>>>>>>>> *To:* user <user@flink.apache.org>
>>>>>>>>>>>>> *Subject:* SavepointReader: Record size is too large for
>>>>>>>>>>>>> CollectSinkFunction
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi all!
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm trying to debug a job via inspecting its savepoints but
>>>>>>>>>>>>> I'm getting this error message:
>>>>>>>>>>>>>
>>>>>>>>>>>>> ```
>>>>>>>>>>>>> Caused by: java.lang.RuntimeException: Record size is too
>>>>>>>>>>>>> large for CollectSinkFunction. Record size is 9627127 bytes, but 
>>>>>>>>>>>>> max bytes
>>>>>>>>>>>>> per batch is only 2097152 bytes. Please consider increasing max 
>>>>>>>>>>>>> bytes per
>>>>>>>>>>>>> batch value by setting collect-sink.batch-size.max
>>>>>>>>>>>>> ```
>>>>>>>>>>>>>
>>>>>>>>>>>>> My code looks like this:
>>>>>>>>>>>>>
>>>>>>>>>>>>> ```
>>>>>>>>>>>>>   private static void run(String savepointPath) throws
>>>>>>>>>>>>> Exception {
>>>>>>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>     org.apache.flink.state.api.SavepointReader savepoint =
>>>>>>>>>>>>>         org.apache.flink.state.api.SavepointReader.read(env,
>>>>>>>>>>>>> savepointPath, new HashMapStateBackend());
>>>>>>>>>>>>>
>>>>>>>>>>>>>     var operator = savepoint.readKeyedState("uuid", new
>>>>>>>>>>>>> MyKeyedOperatorReader());
>>>>>>>>>>>>>     var operatorState = matcher.executeAndCollect(1000);
>>>>>>>>>>>>>   }
>>>>>>>>>>>>> ```
>>>>>>>>>>>>>
>>>>>>>>>>>>> I haven't found the way to increase the
>>>>>>>>>>>>> `collect-sink.batch-size.max` as suggested in the error msg.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any help on this will be appreciated!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Salva
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to