[ https://issues.apache.org/jira/browse/KAFKA-6667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16402435#comment-16402435 ]
Matthias J. Sax commented on KAFKA-6667: ---------------------------------------- As the error message indicates, you try to write a message to Kafka that is larger then the configures max request size: {quote}The message is 59787680 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration {quote} Thus, you need to change the producer config and increase max.request.size. Look into the docs for details: [https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_client_config] As you mention that it works without encryption, I assume that encryption increases the message size beyond the limit. I am closing this because it's not a bug but a miss configuration. Feel free to follow up on if you have further question. > Issues in kafka Streaming when using encryption with custom serializers and > deserializers > ----------------------------------------------------------------------------------------- > > Key: KAFKA-6667 > URL: https://issues.apache.org/jira/browse/KAFKA-6667 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.11.0.1 > Reporter: Amandeep Singh > Priority: Major > > I am using kafka streams to do some aggregations. The data comes in at a very > rapid rate in my source topics. > I am using a custom serialization and deserialization classes, that > encrypt/decrypt the bytes that are written to kafka. It works nicely without > encryption in serialization and deserialization. > Encryption used is AES-256 > This is the stack trace with Encryption. I am also not able to figure out at > which step this breaks. > Exception in thread "prod-streamer-recording-partitioner-test-StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=2_0, processor=KSTREAM-SOURCE-0000000009, > topic=recording-aggregator-partitioner-temp-analyze_recording-aggregation-repartition, > partition=0, offset=16971 > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:202) > at > org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334) > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_0] > exception caught when producing > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) > at > org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:59) > at > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:112) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34) > at > org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:95) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) > at > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:232) > at > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:247) > at > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:155) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:148) > at > org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:111) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:45) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:131) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:188) > ... 7 more > Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The > message is 59787680 bytes when serialized which is larger than the maximum > request size you have configured with the max.request.size configuration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)