Hi all,

Just to share my findings so far. Regarding tweaking the setting, it has
been impossible for me to do so. So, the only way to work around this has
been to duplicate some Flink code directly to allow me to do the tweak.
More precisely, this is how my code looks like now (kudos to my dear
colleague Den!):

```
  private static <T> List<T> executeAndCollect(DataStream<T> dataStream,
StreamExecutionEnvironment env,
                                               String maxBatchSize, int
limit) throws Exception {

    TypeSerializer<T> serializer =
dataStream.getType().createSerializer(env.getConfig());
    String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();

    CollectSinkOperatorFactory<T> factory =
        new CollectSinkOperatorFactory<>(serializer, accumulatorName,
MemorySize.parse(maxBatchSize), SOCKET_TIMEOUT.defaultValue());
    CollectSinkOperator<MatcherState> operator =
(CollectSinkOperator<MatcherState>) factory.getOperator();
    CollectResultIterator<T> iterator =
        new CollectResultIterator<>(
            operator.getOperatorIdFuture(),
            serializer,
            accumulatorName,
            env.getCheckpointConfig());
    CollectStreamSink<T> sink = new CollectStreamSink<>(dataStream,
factory);
    sink.name("Data stream collect sink");
    env.addOperator(sink.getTransformation());

    final JobClient jobClient = env.executeAsync("DataStream Collect");
    iterator.setJobClient(jobClient);

    var clientAndIterator = new ClientAndIterator<>(jobClient, iterator);
    List<T> results = new ArrayList<>(limit);
    while (limit > 0 && clientAndIterator.iterator.hasNext()) {
      results.add(clientAndIterator.iterator.next());
      limit--;
    }
    return results;
  }
```

Essentially, I'm just adding a parameter to the CollectSinkOperatorFactory
constructor here:
-
https://github.com/apache/flink/blob/release-1.15.4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L1378

This works but it's obviously inconvenient for the user. If this limitation
is confirmed, Den and I will be glad to send a MR to fix that.

Makes sense?

Regards,

Salva

On Mon, Jul 22, 2024 at 10:04 AM Salva Alcántara <salcantara...@gmail.com>
wrote:

> The same happens with this slight variation:
>
> ```
> Configuration config = new Configuration();
> config.setString("collect-sink.batch-size.max", "100mb");
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.configure(config);
> SavepointReader savepoint = SavepointReader.read(env, savepointPath, new
> HashMapStateBackend());
> ```
>
> Salva
>
> On Mon, Jul 22, 2024 at 10:00 AM Salva Alcántara <salcantara...@gmail.com>
> wrote:
>
>> Hi Zhanghao,
>>
>> Thanks for your suggestion. Unfortunately, this does not work, I still
>> get the same error message:
>>
>> ```
>> Record size is too large for CollectSinkFunction. Record size is 9623137
>> bytes, but max bytes per batch is only 2097152 bytes.
>> Please consider increasing max bytes per batch value by setting
>> collect-sink.batch-size.max
>> ```
>>
>> The code looks like this now:
>>
>> ```
>> Configuration config = new Configuration();
>> config.setString("collect-sink.batch-size.max", "10mb");
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>> SavepointReader savepoint = SavepointReader.read(env, savepointPath, new
>> HashMapStateBackend());
>>
>> var matcher = savepoint.readKeyedState("Raw Triggers", new
>> MatcherReader());
>> var matcherState = matcher.executeAndCollect(1000);
>> ```
>>
>> I have tried other ways but none has worked (the setting is always
>> ignored in the end).
>>
>> Regards,
>>
>> Salva
>>
>>
>>
>> On Sun, Jul 21, 2024 at 9:10 AM Zhanghao Chen <zhanghao.c...@outlook.com>
>> wrote:
>>
>>> Hi, you could increase it as follows:
>>>
>>> Configuration config = new Configuration();
>>> config.setString(collect-sink.batch-size.max, "10mb");
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>>> ------------------------------
>>> *From:* Salva Alcántara <salcantara...@gmail.com>
>>> *Sent:* Saturday, July 20, 2024 15:05
>>> *To:* user <user@flink.apache.org>
>>> *Subject:* SavepointReader: Record size is too large for
>>> CollectSinkFunction
>>>
>>> Hi all!
>>>
>>> I'm trying to debug a job via inspecting its savepoints but I'm getting
>>> this error message:
>>>
>>> ```
>>> Caused by: java.lang.RuntimeException: Record size is too large for
>>> CollectSinkFunction. Record size is 9627127 bytes, but max bytes per batch
>>> is only 2097152 bytes. Please consider increasing max bytes per batch value
>>> by setting collect-sink.batch-size.max
>>> ```
>>>
>>> My code looks like this:
>>>
>>> ```
>>>   private static void run(String savepointPath) throws Exception {
>>>     StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>     org.apache.flink.state.api.SavepointReader savepoint =
>>>         org.apache.flink.state.api.SavepointReader.read(env,
>>> savepointPath, new HashMapStateBackend());
>>>
>>>     var operator = savepoint.readKeyedState("uuid", new
>>> MyKeyedOperatorReader());
>>>     var operatorState = matcher.executeAndCollect(1000);
>>>   }
>>> ```
>>>
>>> I haven't found the way to increase the `collect-sink.batch-size.max` as
>>> suggested in the error msg.
>>>
>>> Any help on this will be appreciated!
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>

Reply via email to