After changing log level from INFO to TRACE, here is kafka server.log:

[2016-03-14 06:43:03,568] TRACE 156 bytes written.
(kafka.network.BoundedByteBufferSend)

[2016-03-14 06:43:03,575] TRACE 68 bytes read.
(kafka.network.BoundedByteBufferReceive)

[2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8
has replica log end offset 0 for partition [__consumer_offsets,20].
Received 0 messages and leader hw 0
(kafka.server.ReplicaFetcherThread)

[2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8
has replica log end offset 0 after appending 0 bytes of messages for
partition [__consumer_offsets,20] (kafka.server.ReplicaFetcherThread)

[2016-03-14 06:43:03,575] TRACE Setting high watermark for replica 8
partition [__consumer_offsets,20] on broker 8 to [0 [-1 : -1]]
(kafka.cluster.Replica)

[2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8
set replica high watermark for partition [__consumer_offsets,20] to 0
(kafka.server.ReplicaFetcherThread)

[2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8
has replica log end offset 0 for partition [__consumer_offsets,12].
Received 0 messages and leader hw 0
(kafka.server.ReplicaFetcherThread)

[2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8
has replica log end offset 0 after appending 0 bytes of messages for
partition [__consumer_offsets,12] (kafka.server.ReplicaFetcherThread)

[2016-03-14 06:43:03,575] TRACE Setting high watermark for replica 8
partition [__consumer_offsets,12] on broker 8 to [0 [-1 : -1]]
(kafka.cluster.Replica)

[2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8
set replica high watermark for partition [__consumer_offsets,12] to 0
(kafka.server.ReplicaFetcherThread)

[2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Issuing to
broker 7 of fetch request Name: FetchRequest; Version: 0;
CorrelationId: 397497; ClientId: ReplicaFetcherThread-0-7; ReplicaId:
8; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo:
[__consumer_offsets,20] ->
PartitionFetchInfo(0,1048576),[__consumer_offsets,12] ->
PartitionFetchInfo(0,1048576) (kafka.server.ReplicaFetcherThread)

[2016-03-14 06:43:03,575] TRACE 110 bytes written.
(kafka.network.BoundedByteBufferSend)

[2016-03-14 06:43:03,579] DEBUG Accepted connection from
/10.225.36.226 on /10.224.146.63:9092. sendBufferSize
[actual|requested]: [102400|102400] recvBufferSize [actual|requested]:
[102400|102400] (kafka.network.Acceptor)

[2016-03-14 06:43:03,579] TRACE Processor id 2 selection time =
145604848 ns (kafka.network.Processor)

[2016-03-14 06:43:03,579] DEBUG Processor 2 listening to new
connection from /10.225.36.226:36151 (kafka.network.Processor)

[2016-03-14 06:43:03,579] TRACE Processor id 2 selection time = 3588
ns (kafka.network.Processor)

[2016-03-14 06:43:03,579] INFO Closing socket connection to
/10.225.36.226 due to invalid request: Request of length 808124929 is
not valid, it is larger than the maximum size of 104857600 bytes.
(kafka.network.Processor)

[2016-03-14 06:43:03,580] DEBUG Closing connection from
/10.225.36.226:36151 (kafka.network.Processor)



Here is kafka-request.log:


[2016-03-14 06:43:03,463] TRACE Completed request:Name: FetchRequest;
Version: 0; CorrelationId: 399486; ClientId: ReplicaFetcherThread-0-8;
ReplicaId: 6; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo:
[__consumer_offsets,5] -> PartitionFetchInfo(0,1048576) from client
/10.224.146.61:10716;totalTime:501,requestQueueTime:0,localTime:1,remoteTime:500,responseQueueTime:0,sendTime:0
(kafka.request.logger)

[2016-03-14 06:43:03,463] TRACE Processor 1 received request : Name:
FetchRequest; Version: 0; CorrelationId: 399487; ClientId:
ReplicaFetcherThread-0-8; ReplicaId: 6; MaxWait: 500 ms; MinBytes: 1
bytes; RequestInfo: [__consumer_offsets,5] ->
PartitionFetchInfo(0,1048576) (kafka.network.RequestChannel$)

[2016-03-14 06:43:03,744] TRACE Completed request:Name: FetchRequest;
Version: 0; CorrelationId: 397091; ClientId: ReplicaFetcherThread-0-8;
ReplicaId: 4; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo:
[__consumer_offsets,37] ->
PartitionFetchInfo(0,1048576),[__consumer_offsets,45] ->
PartitionFetchInfo(0,1048576) from client
/10.224.146.59:12535;totalTime:502,requestQueueTime:0,localTime:1,remoteTime:500,responseQueueTime:0,sendTime:1
(kafka.request.logger)

[2016-03-14 06:43:03,744] TRACE Processor 1 received request : Name:
FetchRequest; Version: 0; CorrelationId: 397092; ClientId:
ReplicaFetcherThread-0-8; ReplicaId: 4; MaxWait: 500 ms; MinBytes: 1
bytes; RequestInfo: [__consumer_offsets,37] ->
PartitionFetchInfo(0,1048576),[__consumer_offsets,45] ->
PartitionFetchInfo(0,1048576) (kafka.network.RequestChannel$)


>From the log, do you think it is still due to the batch size and which
version the fix was in?


Thanks

Fang


On Tue, Mar 8, 2016 at 11:56 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> I cannot think of an encoding or partial message issue at top of my head
> (browsed through 0.8.2.2 tickets, none of them seems related either).
>
> Guozhang
>
> On Tue, Mar 8, 2016 at 11:45 AM, Fang Wong <fw...@salesforce.com> wrote:
>
> > Thanks Guozhang!
> >
> > No I don't have a way to reproduce this issue. It randomly happens, I am
> > changing the log level from INFO to trace to see if I can get the exact
> > message what was sent when this happens.
> >
> > Could it also be some encoding issue or partial message related?
> >
> > Thanks,
> > Fang
> >
> > On Mon, Mar 7, 2016 at 5:03 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > John,
> > >
> > > There is not a specific JIRA for this change as it is only implemented
> in
> > > the new Java producer:
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-1239
> > >
> > > Related classes are RecordAccumulator and MemoryRecords:
> > >
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
> > >
> > >
> > > Fang,
> > >
> > > Do you have a way to re-produce the issue? I.e. if you have the exact
> > same
> > > produce data at hand, could you validate that their cumulated size is
> > less
> > > than the limit and then try sending them to Kafka and see if it always
> > > triggers the problem?
> > >
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Mar 7, 2016 at 10:23 AM, Fang Wong <fw...@salesforce.com>
> wrote:
> > >
> > > > No, we don't have compression turned on the batch size is the
> default:
> > > > 16384.
> > > > But the message size is very small, even with that batch size, it is
> > > > impossible to exceed the size limit.
> > > >
> > > > Thanks,
> > > > Fang
> > > >
> > > > On Sun, Mar 6, 2016 at 6:09 PM, John Dennison <
> dennison.j...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Guozhang,
> > > > >
> > > > > Do you know the ticket for for changing the "batching criterion
> from
> > > > > #.messages to bytes." I am unable to find it. Working on porting
> > > > > a similar change to pykafka.
> > > > >
> > > > >
> > > > > John
> > > > >
> > > > >
> > > > > On Sat, Mar 5, 2016 at 4:29 PM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hello,
> > > > > >
> > > > > > Did you have compression turned on and batching (in terms of
> > > > #.messages)?
> > > > > > In that case the whole compressed message set is treated as a
> > single
> > > > > > message on the broker and hence could possibly exceed the limit.
> > > > > >
> > > > > > In newer versions we have changed the batching criterion from
> > > > #.messages
> > > > > to
> > > > > > bytes, which is aimed at resolving such issues.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Thu, Mar 3, 2016 at 1:04 PM, Fang Wong <fw...@salesforce.com>
> > > > wrote:
> > > > > >
> > > > > > > Got the following error message with Kafka 0.8.2.1:
> > > > > > > [2016-02-26 20:33:43,025] INFO Closing socket connection to /x
> > due
> > > to
> > > > > > > invalid request: Request of length 1937006964 is not valid, it
> is
> > > > > larger
> > > > > > > than the maximum size of 104857600 bytes.
> > (kafka.network.Processor)
> > > > > > >
> > > > > > > Didn't send a large message at all, it seems like encoding
> issue
> > or
> > > > > > partial
> > > > > > > request, any suggestion how to fix it?
> > > > > > >
> > > > > > > The code is like below:
> > > > > > >
> > > > > > >     ByteArrayOutputStream bos = new ByteArrayOutputStream();
> > > > > > >
> > > > > > >     DataOutputStream dos = new DataOutputStream(bos);
> > > > > > >
> > > > > > >     dos.writeLong(System.currentTimeMillis());
> > > > > > >
> > > > > > >     OutputStreamWriter byteWriter = new OutputStreamWriter(bos,
> > > > > > > com.force.commons.text.EncodingUtil.UTF_ENCODING);
> > > > > > >
> > > > > > >     gson.toJson(obj, byteWriter);
> > > > > > >
> > > > > > >     byte[] payload = bos.toByteArray();
> > > > > > >
> > > > > > >     ProducerRecord<String, byte[]> data = new
> > > ProducerRecord<String,
> > > > > > > byte[]>(“Topic”, 0, null, payload);
> > > > > > >
> > > > > > >     kafkaProducer.send(data);
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to