Hi Robert,

Thanks for your answer.

Indeed, you were right. The properties attribute have to be specified and then 
it is the non-nested variant. In fact, it is documented for the Egress but not 
the Ingress but the same behaviour applies.

Have a great day,
Jérémy
________________________________
From: Robert Metzger <metrob...@gmail.com>
Sent: Wednesday, December 8, 2021 8:04 AM
To: Jérémy Albrecht <jalbre...@skapane.ai>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Customize Kafka client (module.yaml)

Hi Jérémy,

In my understanding of the StateFun docs, you need to pass custom properties 
using "ingress.spec.properties".
For example:

ingresses:
  - ingress:
      meta:
        type: io.statefun.kafka/ingress
        id: project.A/input
      spec:
        properties:

          max.request.size: 110000000

(or the nested variant?)



On Tue, Dec 7, 2021 at 4:31 PM Jérémy Albrecht 
<jalbre...@skapane.ai<mailto:jalbre...@skapane.ai>> wrote:
Hi All,

I encounter a blocking problem linked to exchanging messages between Stateful 
functions.
The context is: I am sending a very large payload from a Stateful Function to a 
Kafka topic. I am blocked by the Kafka client (I think) because here is the 
output of the statefun-manager container:
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message 
is 6660172 bytes when serialized which is larger than the maximum request size 
you have configured with the max.request.size configuration.

Now if I take a look at the documentation 
(https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/io/apache-kafka/)
 they refer to the Confluent doc to customize the configuration of the Kafka 
client. It is unclear on how to implement this into the module.yaml file. I 
tried several ways:

ingresses:
  - ingress:
      meta:
        type: io.statefun.kafka/ingress
        id: project.A/input
      spec:
        max:
          request:
            size: 104857600
        max.request.size: 110000000
        message:
          max:
            bytes: 104857600
        address: kafka:9092
        consumerGroupId: my-consumer-group
        startupPosition:
          type: earliest
        topics:
          - topic: entry # used for retrop-compatibility, to be removed in next 
release
            valueType: project.A/Message
            targets:
              - project.redacted/Entry

None of the above solutions seems to be working.
Does anyone have the ability to clarify what I am not doing correctly ?

Thanks in advance,
Jérémy

Reply via email to