Fang,

>From the logs you showed above there is a single produce request with very
large request size:

"[2016-03-14 06:43:03,579] INFO Closing socket connection to
/10.225.36.226 due to invalid request: Request of length *808124929* is
not valid, it is larger than the maximum size of 104857600 bytes.
(kafka.network.Processor)"

Which is about 770MB while the maximum request size is configured as 100MB.
It is from the client hosted at "10.225.36.226", if you can go to that
server and checks the producer logs around that time, maybe you can
discover why there comes a single big produce request.

Guozhang


On Mon, Mar 14, 2016 at 1:59 PM, Fang Wong <fw...@salesforce.com> wrote:

> After changing log level from INFO to TRACE, here is kafka server.log:
>
> [2016-03-14 06:43:03,568] TRACE 156 bytes written.
> (kafka.network.BoundedByteBufferSend)
>
> [2016-03-14 06:43:03,575] TRACE 68 bytes read.
> (kafka.network.BoundedByteBufferReceive)
>
> [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8
> has replica log end offset 0 for partition [__consumer_offsets,20].
> Received 0 messages and leader hw 0
> (kafka.server.ReplicaFetcherThread)
>
> [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8
> has replica log end offset 0 after appending 0 bytes of messages for
> partition [__consumer_offsets,20] (kafka.server.ReplicaFetcherThread)
>
> [2016-03-14 06:43:03,575] TRACE Setting high watermark for replica 8
> partition [__consumer_offsets,20] on broker 8 to [0 [-1 : -1]]
> (kafka.cluster.Replica)
>
> [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8
> set replica high watermark for partition [__consumer_offsets,20] to 0
> (kafka.server.ReplicaFetcherThread)
>
> [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8
> has replica log end offset 0 for partition [__consumer_offsets,12].
> Received 0 messages and leader hw 0
> (kafka.server.ReplicaFetcherThread)
>
> [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8
> has replica log end offset 0 after appending 0 bytes of messages for
> partition [__consumer_offsets,12] (kafka.server.ReplicaFetcherThread)
>
> [2016-03-14 06:43:03,575] TRACE Setting high watermark for replica 8
> partition [__consumer_offsets,12] on broker 8 to [0 [-1 : -1]]
> (kafka.cluster.Replica)
>
> [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8
> set replica high watermark for partition [__consumer_offsets,12] to 0
> (kafka.server.ReplicaFetcherThread)
>
> [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Issuing to
> broker 7 of fetch request Name: FetchRequest; Version: 0;
> CorrelationId: 397497; ClientId: ReplicaFetcherThread-0-7; ReplicaId:
> 8; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo:
> [__consumer_offsets,20] ->
> PartitionFetchInfo(0,1048576),[__consumer_offsets,12] ->
> PartitionFetchInfo(0,1048576) (kafka.server.ReplicaFetcherThread)
>
> [2016-03-14 06:43:03,575] TRACE 110 bytes written.
> (kafka.network.BoundedByteBufferSend)
>
> [2016-03-14 06:43:03,579] DEBUG Accepted connection from
> /10.225.36.226 on /10.224.146.63:9092. sendBufferSize
> [actual|requested]: [102400|102400] recvBufferSize [actual|requested]:
> [102400|102400] (kafka.network.Acceptor)
>
> [2016-03-14 06:43:03,579] TRACE Processor id 2 selection time =
> 145604848 ns (kafka.network.Processor)
>
> [2016-03-14 06:43:03,579] DEBUG Processor 2 listening to new
> connection from /10.225.36.226:36151 (kafka.network.Processor)
>
> [2016-03-14 06:43:03,579] TRACE Processor id 2 selection time = 3588
> ns (kafka.network.Processor)
>
> [2016-03-14 06:43:03,579] INFO Closing socket connection to
> /10.225.36.226 due to invalid request: Request of length 808124929 is
> not valid, it is larger than the maximum size of 104857600 bytes.
> (kafka.network.Processor)
>
> [2016-03-14 06:43:03,580] DEBUG Closing connection from
> /10.225.36.226:36151 (kafka.network.Processor)
>
>
>
> Here is kafka-request.log:
>
>
> [2016-03-14 06:43:03,463] TRACE Completed request:Name: FetchRequest;
> Version: 0; CorrelationId: 399486; ClientId: ReplicaFetcherThread-0-8;
> ReplicaId: 6; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo:
> [__consumer_offsets,5] -> PartitionFetchInfo(0,1048576) from client
> /10.224.146.61:10716
> ;totalTime:501,requestQueueTime:0,localTime:1,remoteTime:500,responseQueueTime:0,sendTime:0
> (kafka.request.logger)
>
> [2016-03-14 06:43:03,463] TRACE Processor 1 received request : Name:
> FetchRequest; Version: 0; CorrelationId: 399487; ClientId:
> ReplicaFetcherThread-0-8; ReplicaId: 6; MaxWait: 500 ms; MinBytes: 1
> bytes; RequestInfo: [__consumer_offsets,5] ->
> PartitionFetchInfo(0,1048576) (kafka.network.RequestChannel$)
>
> [2016-03-14 06:43:03,744] TRACE Completed request:Name: FetchRequest;
> Version: 0; CorrelationId: 397091; ClientId: ReplicaFetcherThread-0-8;
> ReplicaId: 4; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo:
> [__consumer_offsets,37] ->
> PartitionFetchInfo(0,1048576),[__consumer_offsets,45] ->
> PartitionFetchInfo(0,1048576) from client
> /10.224.146.59:12535
> ;totalTime:502,requestQueueTime:0,localTime:1,remoteTime:500,responseQueueTime:0,sendTime:1
> (kafka.request.logger)
>
> [2016-03-14 06:43:03,744] TRACE Processor 1 received request : Name:
> FetchRequest; Version: 0; CorrelationId: 397092; ClientId:
> ReplicaFetcherThread-0-8; ReplicaId: 4; MaxWait: 500 ms; MinBytes: 1
> bytes; RequestInfo: [__consumer_offsets,37] ->
> PartitionFetchInfo(0,1048576),[__consumer_offsets,45] ->
> PartitionFetchInfo(0,1048576) (kafka.network.RequestChannel$)
>
>
> From the log, do you think it is still due to the batch size and which
> version the fix was in?
>
>
> Thanks
>
> Fang
>
>
> On Tue, Mar 8, 2016 at 11:56 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > I cannot think of an encoding or partial message issue at top of my head
> > (browsed through 0.8.2.2 tickets, none of them seems related either).
> >
> > Guozhang
> >
> > On Tue, Mar 8, 2016 at 11:45 AM, Fang Wong <fw...@salesforce.com> wrote:
> >
> > > Thanks Guozhang!
> > >
> > > No I don't have a way to reproduce this issue. It randomly happens, I
> am
> > > changing the log level from INFO to trace to see if I can get the exact
> > > message what was sent when this happens.
> > >
> > > Could it also be some encoding issue or partial message related?
> > >
> > > Thanks,
> > > Fang
> > >
> > > On Mon, Mar 7, 2016 at 5:03 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > John,
> > > >
> > > > There is not a specific JIRA for this change as it is only
> implemented
> > in
> > > > the new Java producer:
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-1239
> > > >
> > > > Related classes are RecordAccumulator and MemoryRecords:
> > > >
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
> > > >
> > > >
> > > > Fang,
> > > >
> > > > Do you have a way to re-produce the issue? I.e. if you have the exact
> > > same
> > > > produce data at hand, could you validate that their cumulated size is
> > > less
> > > > than the limit and then try sending them to Kafka and see if it
> always
> > > > triggers the problem?
> > > >
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Mar 7, 2016 at 10:23 AM, Fang Wong <fw...@salesforce.com>
> > wrote:
> > > >
> > > > > No, we don't have compression turned on the batch size is the
> > default:
> > > > > 16384.
> > > > > But the message size is very small, even with that batch size, it
> is
> > > > > impossible to exceed the size limit.
> > > > >
> > > > > Thanks,
> > > > > Fang
> > > > >
> > > > > On Sun, Mar 6, 2016 at 6:09 PM, John Dennison <
> > dennison.j...@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Guozhang,
> > > > > >
> > > > > > Do you know the ticket for for changing the "batching criterion
> > from
> > > > > > #.messages to bytes." I am unable to find it. Working on porting
> > > > > > a similar change to pykafka.
> > > > > >
> > > > > >
> > > > > > John
> > > > > >
> > > > > >
> > > > > > On Sat, Mar 5, 2016 at 4:29 PM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hello,
> > > > > > >
> > > > > > > Did you have compression turned on and batching (in terms of
> > > > > #.messages)?
> > > > > > > In that case the whole compressed message set is treated as a
> > > single
> > > > > > > message on the broker and hence could possibly exceed the
> limit.
> > > > > > >
> > > > > > > In newer versions we have changed the batching criterion from
> > > > > #.messages
> > > > > > to
> > > > > > > bytes, which is aimed at resolving such issues.
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > > On Thu, Mar 3, 2016 at 1:04 PM, Fang Wong <
> fw...@salesforce.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Got the following error message with Kafka 0.8.2.1:
> > > > > > > > [2016-02-26 20:33:43,025] INFO Closing socket connection to
> /x
> > > due
> > > > to
> > > > > > > > invalid request: Request of length 1937006964 is not valid,
> it
> > is
> > > > > > larger
> > > > > > > > than the maximum size of 104857600 bytes.
> > > (kafka.network.Processor)
> > > > > > > >
> > > > > > > > Didn't send a large message at all, it seems like encoding
> > issue
> > > or
> > > > > > > partial
> > > > > > > > request, any suggestion how to fix it?
> > > > > > > >
> > > > > > > > The code is like below:
> > > > > > > >
> > > > > > > >     ByteArrayOutputStream bos = new ByteArrayOutputStream();
> > > > > > > >
> > > > > > > >     DataOutputStream dos = new DataOutputStream(bos);
> > > > > > > >
> > > > > > > >     dos.writeLong(System.currentTimeMillis());
> > > > > > > >
> > > > > > > >     OutputStreamWriter byteWriter = new
> OutputStreamWriter(bos,
> > > > > > > > com.force.commons.text.EncodingUtil.UTF_ENCODING);
> > > > > > > >
> > > > > > > >     gson.toJson(obj, byteWriter);
> > > > > > > >
> > > > > > > >     byte[] payload = bos.toByteArray();
> > > > > > > >
> > > > > > > >     ProducerRecord<String, byte[]> data = new
> > > > ProducerRecord<String,
> > > > > > > > byte[]>(“Topic”, 0, null, payload);
> > > > > > > >
> > > > > > > >     kafkaProducer.send(data);
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to