After changing log level from INFO to TRACE, here is kafka server.log: [2016-03-14 06:43:03,568] TRACE 156 bytes written. (kafka.network.BoundedByteBufferSend)
[2016-03-14 06:43:03,575] TRACE 68 bytes read. (kafka.network.BoundedByteBufferReceive) [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8 has replica log end offset 0 for partition [__consumer_offsets,20]. Received 0 messages and leader hw 0 (kafka.server.ReplicaFetcherThread) [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8 has replica log end offset 0 after appending 0 bytes of messages for partition [__consumer_offsets,20] (kafka.server.ReplicaFetcherThread) [2016-03-14 06:43:03,575] TRACE Setting high watermark for replica 8 partition [__consumer_offsets,20] on broker 8 to [0 [-1 : -1]] (kafka.cluster.Replica) [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8 set replica high watermark for partition [__consumer_offsets,20] to 0 (kafka.server.ReplicaFetcherThread) [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8 has replica log end offset 0 for partition [__consumer_offsets,12]. Received 0 messages and leader hw 0 (kafka.server.ReplicaFetcherThread) [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8 has replica log end offset 0 after appending 0 bytes of messages for partition [__consumer_offsets,12] (kafka.server.ReplicaFetcherThread) [2016-03-14 06:43:03,575] TRACE Setting high watermark for replica 8 partition [__consumer_offsets,12] on broker 8 to [0 [-1 : -1]] (kafka.cluster.Replica) [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8 set replica high watermark for partition [__consumer_offsets,12] to 0 (kafka.server.ReplicaFetcherThread) [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Issuing to broker 7 of fetch request Name: FetchRequest; Version: 0; CorrelationId: 397497; ClientId: ReplicaFetcherThread-0-7; ReplicaId: 8; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [__consumer_offsets,20] -> PartitionFetchInfo(0,1048576),[__consumer_offsets,12] -> PartitionFetchInfo(0,1048576) (kafka.server.ReplicaFetcherThread) [2016-03-14 06:43:03,575] TRACE 110 bytes written. (kafka.network.BoundedByteBufferSend) [2016-03-14 06:43:03,579] DEBUG Accepted connection from /10.225.36.226 on /10.224.146.63:9092. sendBufferSize [actual|requested]: [102400|102400] recvBufferSize [actual|requested]: [102400|102400] (kafka.network.Acceptor) [2016-03-14 06:43:03,579] TRACE Processor id 2 selection time = 145604848 ns (kafka.network.Processor) [2016-03-14 06:43:03,579] DEBUG Processor 2 listening to new connection from /10.225.36.226:36151 (kafka.network.Processor) [2016-03-14 06:43:03,579] TRACE Processor id 2 selection time = 3588 ns (kafka.network.Processor) [2016-03-14 06:43:03,579] INFO Closing socket connection to /10.225.36.226 due to invalid request: Request of length 808124929 is not valid, it is larger than the maximum size of 104857600 bytes. (kafka.network.Processor) [2016-03-14 06:43:03,580] DEBUG Closing connection from /10.225.36.226:36151 (kafka.network.Processor) Here is kafka-request.log: [2016-03-14 06:43:03,463] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 399486; ClientId: ReplicaFetcherThread-0-8; ReplicaId: 6; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [__consumer_offsets,5] -> PartitionFetchInfo(0,1048576) from client /10.224.146.61:10716;totalTime:501,requestQueueTime:0,localTime:1,remoteTime:500,responseQueueTime:0,sendTime:0 (kafka.request.logger) [2016-03-14 06:43:03,463] TRACE Processor 1 received request : Name: FetchRequest; Version: 0; CorrelationId: 399487; ClientId: ReplicaFetcherThread-0-8; ReplicaId: 6; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [__consumer_offsets,5] -> PartitionFetchInfo(0,1048576) (kafka.network.RequestChannel$) [2016-03-14 06:43:03,744] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 397091; ClientId: ReplicaFetcherThread-0-8; ReplicaId: 4; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [__consumer_offsets,37] -> PartitionFetchInfo(0,1048576),[__consumer_offsets,45] -> PartitionFetchInfo(0,1048576) from client /10.224.146.59:12535;totalTime:502,requestQueueTime:0,localTime:1,remoteTime:500,responseQueueTime:0,sendTime:1 (kafka.request.logger) [2016-03-14 06:43:03,744] TRACE Processor 1 received request : Name: FetchRequest; Version: 0; CorrelationId: 397092; ClientId: ReplicaFetcherThread-0-8; ReplicaId: 4; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [__consumer_offsets,37] -> PartitionFetchInfo(0,1048576),[__consumer_offsets,45] -> PartitionFetchInfo(0,1048576) (kafka.network.RequestChannel$) >From the log, do you think it is still due to the batch size and which version the fix was in? Thanks Fang On Tue, Mar 8, 2016 at 11:56 AM, Guozhang Wang <wangg...@gmail.com> wrote: > I cannot think of an encoding or partial message issue at top of my head > (browsed through 0.8.2.2 tickets, none of them seems related either). > > Guozhang > > On Tue, Mar 8, 2016 at 11:45 AM, Fang Wong <fw...@salesforce.com> wrote: > > > Thanks Guozhang! > > > > No I don't have a way to reproduce this issue. It randomly happens, I am > > changing the log level from INFO to trace to see if I can get the exact > > message what was sent when this happens. > > > > Could it also be some encoding issue or partial message related? > > > > Thanks, > > Fang > > > > On Mon, Mar 7, 2016 at 5:03 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > John, > > > > > > There is not a specific JIRA for this change as it is only implemented > in > > > the new Java producer: > > > > > > https://issues.apache.org/jira/browse/KAFKA-1239 > > > > > > Related classes are RecordAccumulator and MemoryRecords: > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > > > > > > > > > Fang, > > > > > > Do you have a way to re-produce the issue? I.e. if you have the exact > > same > > > produce data at hand, could you validate that their cumulated size is > > less > > > than the limit and then try sending them to Kafka and see if it always > > > triggers the problem? > > > > > > > > > > > > Guozhang > > > > > > On Mon, Mar 7, 2016 at 10:23 AM, Fang Wong <fw...@salesforce.com> > wrote: > > > > > > > No, we don't have compression turned on the batch size is the > default: > > > > 16384. > > > > But the message size is very small, even with that batch size, it is > > > > impossible to exceed the size limit. > > > > > > > > Thanks, > > > > Fang > > > > > > > > On Sun, Mar 6, 2016 at 6:09 PM, John Dennison < > dennison.j...@gmail.com > > > > > > > wrote: > > > > > > > > > Guozhang, > > > > > > > > > > Do you know the ticket for for changing the "batching criterion > from > > > > > #.messages to bytes." I am unable to find it. Working on porting > > > > > a similar change to pykafka. > > > > > > > > > > > > > > > John > > > > > > > > > > > > > > > On Sat, Mar 5, 2016 at 4:29 PM, Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > > > Hello, > > > > > > > > > > > > Did you have compression turned on and batching (in terms of > > > > #.messages)? > > > > > > In that case the whole compressed message set is treated as a > > single > > > > > > message on the broker and hence could possibly exceed the limit. > > > > > > > > > > > > In newer versions we have changed the batching criterion from > > > > #.messages > > > > > to > > > > > > bytes, which is aimed at resolving such issues. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Thu, Mar 3, 2016 at 1:04 PM, Fang Wong <fw...@salesforce.com> > > > > wrote: > > > > > > > > > > > > > Got the following error message with Kafka 0.8.2.1: > > > > > > > [2016-02-26 20:33:43,025] INFO Closing socket connection to /x > > due > > > to > > > > > > > invalid request: Request of length 1937006964 is not valid, it > is > > > > > larger > > > > > > > than the maximum size of 104857600 bytes. > > (kafka.network.Processor) > > > > > > > > > > > > > > Didn't send a large message at all, it seems like encoding > issue > > or > > > > > > partial > > > > > > > request, any suggestion how to fix it? > > > > > > > > > > > > > > The code is like below: > > > > > > > > > > > > > > ByteArrayOutputStream bos = new ByteArrayOutputStream(); > > > > > > > > > > > > > > DataOutputStream dos = new DataOutputStream(bos); > > > > > > > > > > > > > > dos.writeLong(System.currentTimeMillis()); > > > > > > > > > > > > > > OutputStreamWriter byteWriter = new OutputStreamWriter(bos, > > > > > > > com.force.commons.text.EncodingUtil.UTF_ENCODING); > > > > > > > > > > > > > > gson.toJson(obj, byteWriter); > > > > > > > > > > > > > > byte[] payload = bos.toByteArray(); > > > > > > > > > > > > > > ProducerRecord<String, byte[]> data = new > > > ProducerRecord<String, > > > > > > > byte[]>(“Topic”, 0, null, payload); > > > > > > > > > > > > > > kafkaProducer.send(data); > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >