Hi Guozhang, The problem is that server "10.225.36.226" is not one of my kafka clients, nslookup shows it is another internal server, my servers are like 10.224.146.6 <http://10.224.146.63:9092/>#, I can't even login to that server. All of my messages are at most a few KB.
Is it possible anybody within the internal network can send any message to kafka? How do I allow a list of fixed servers can send a request to kafka server? Thanks, Fang On Tue, Mar 15, 2016 at 5:31 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Fang, > > From the logs you showed above there is a single produce request with very > large request size: > > "[2016-03-14 06:43:03,579] INFO Closing socket connection to > /10.225.36.226 due to invalid request: Request of length *808124929* is > not valid, it is larger than the maximum size of 104857600 bytes. > (kafka.network.Processor)" > > Which is about 770MB while the maximum request size is configured as 100MB. > It is from the client hosted at "10.225.36.226", if you can go to that > server and checks the producer logs around that time, maybe you can > discover why there comes a single big produce request. > > Guozhang > > > On Mon, Mar 14, 2016 at 1:59 PM, Fang Wong <fw...@salesforce.com> wrote: > > > After changing log level from INFO to TRACE, here is kafka server.log: > > > > [2016-03-14 06:43:03,568] TRACE 156 bytes written. > > (kafka.network.BoundedByteBufferSend) > > > > [2016-03-14 06:43:03,575] TRACE 68 bytes read. > > (kafka.network.BoundedByteBufferReceive) > > > > [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8 > > has replica log end offset 0 for partition [__consumer_offsets,20]. > > Received 0 messages and leader hw 0 > > (kafka.server.ReplicaFetcherThread) > > > > [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8 > > has replica log end offset 0 after appending 0 bytes of messages for > > partition [__consumer_offsets,20] (kafka.server.ReplicaFetcherThread) > > > > [2016-03-14 06:43:03,575] TRACE Setting high watermark for replica 8 > > partition [__consumer_offsets,20] on broker 8 to [0 [-1 : -1]] > > (kafka.cluster.Replica) > > > > [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8 > > set replica high watermark for partition [__consumer_offsets,20] to 0 > > (kafka.server.ReplicaFetcherThread) > > > > [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8 > > has replica log end offset 0 for partition [__consumer_offsets,12]. > > Received 0 messages and leader hw 0 > > (kafka.server.ReplicaFetcherThread) > > > > [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8 > > has replica log end offset 0 after appending 0 bytes of messages for > > partition [__consumer_offsets,12] (kafka.server.ReplicaFetcherThread) > > > > [2016-03-14 06:43:03,575] TRACE Setting high watermark for replica 8 > > partition [__consumer_offsets,12] on broker 8 to [0 [-1 : -1]] > > (kafka.cluster.Replica) > > > > [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Follower 8 > > set replica high watermark for partition [__consumer_offsets,12] to 0 > > (kafka.server.ReplicaFetcherThread) > > > > [2016-03-14 06:43:03,575] TRACE [ReplicaFetcherThread-0-7], Issuing to > > broker 7 of fetch request Name: FetchRequest; Version: 0; > > CorrelationId: 397497; ClientId: ReplicaFetcherThread-0-7; ReplicaId: > > 8; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: > > [__consumer_offsets,20] -> > > PartitionFetchInfo(0,1048576),[__consumer_offsets,12] -> > > PartitionFetchInfo(0,1048576) (kafka.server.ReplicaFetcherThread) > > > > [2016-03-14 06:43:03,575] TRACE 110 bytes written. > > (kafka.network.BoundedByteBufferSend) > > > > [2016-03-14 06:43:03,579] DEBUG Accepted connection from > > /10.225.36.226 on /10.224.146.63:9092. sendBufferSize > > [actual|requested]: [102400|102400] recvBufferSize [actual|requested]: > > [102400|102400] (kafka.network.Acceptor) > > > > [2016-03-14 06:43:03,579] TRACE Processor id 2 selection time = > > 145604848 ns (kafka.network.Processor) > > > > [2016-03-14 06:43:03,579] DEBUG Processor 2 listening to new > > connection from /10.225.36.226:36151 (kafka.network.Processor) > > > > [2016-03-14 06:43:03,579] TRACE Processor id 2 selection time = 3588 > > ns (kafka.network.Processor) > > > > [2016-03-14 06:43:03,579] INFO Closing socket connection to > > /10.225.36.226 due to invalid request: Request of length 808124929 is > > not valid, it is larger than the maximum size of 104857600 bytes. > > (kafka.network.Processor) > > > > [2016-03-14 06:43:03,580] DEBUG Closing connection from > > /10.225.36.226:36151 (kafka.network.Processor) > > > > > > > > Here is kafka-request.log: > > > > > > [2016-03-14 06:43:03,463] TRACE Completed request:Name: FetchRequest; > > Version: 0; CorrelationId: 399486; ClientId: ReplicaFetcherThread-0-8; > > ReplicaId: 6; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: > > [__consumer_offsets,5] -> PartitionFetchInfo(0,1048576) from client > > /10.224.146.61:10716 > > > ;totalTime:501,requestQueueTime:0,localTime:1,remoteTime:500,responseQueueTime:0,sendTime:0 > > (kafka.request.logger) > > > > [2016-03-14 06:43:03,463] TRACE Processor 1 received request : Name: > > FetchRequest; Version: 0; CorrelationId: 399487; ClientId: > > ReplicaFetcherThread-0-8; ReplicaId: 6; MaxWait: 500 ms; MinBytes: 1 > > bytes; RequestInfo: [__consumer_offsets,5] -> > > PartitionFetchInfo(0,1048576) (kafka.network.RequestChannel$) > > > > [2016-03-14 06:43:03,744] TRACE Completed request:Name: FetchRequest; > > Version: 0; CorrelationId: 397091; ClientId: ReplicaFetcherThread-0-8; > > ReplicaId: 4; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: > > [__consumer_offsets,37] -> > > PartitionFetchInfo(0,1048576),[__consumer_offsets,45] -> > > PartitionFetchInfo(0,1048576) from client > > /10.224.146.59:12535 > > > ;totalTime:502,requestQueueTime:0,localTime:1,remoteTime:500,responseQueueTime:0,sendTime:1 > > (kafka.request.logger) > > > > [2016-03-14 06:43:03,744] TRACE Processor 1 received request : Name: > > FetchRequest; Version: 0; CorrelationId: 397092; ClientId: > > ReplicaFetcherThread-0-8; ReplicaId: 4; MaxWait: 500 ms; MinBytes: 1 > > bytes; RequestInfo: [__consumer_offsets,37] -> > > PartitionFetchInfo(0,1048576),[__consumer_offsets,45] -> > > PartitionFetchInfo(0,1048576) (kafka.network.RequestChannel$) > > > > > > From the log, do you think it is still due to the batch size and which > > version the fix was in? > > > > > > Thanks > > > > Fang > > > > > > On Tue, Mar 8, 2016 at 11:56 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > I cannot think of an encoding or partial message issue at top of my > head > > > (browsed through 0.8.2.2 tickets, none of them seems related either). > > > > > > Guozhang > > > > > > On Tue, Mar 8, 2016 at 11:45 AM, Fang Wong <fw...@salesforce.com> > wrote: > > > > > > > Thanks Guozhang! > > > > > > > > No I don't have a way to reproduce this issue. It randomly happens, I > > am > > > > changing the log level from INFO to trace to see if I can get the > exact > > > > message what was sent when this happens. > > > > > > > > Could it also be some encoding issue or partial message related? > > > > > > > > Thanks, > > > > Fang > > > > > > > > On Mon, Mar 7, 2016 at 5:03 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > John, > > > > > > > > > > There is not a specific JIRA for this change as it is only > > implemented > > > in > > > > > the new Java producer: > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1239 > > > > > > > > > > Related classes are RecordAccumulator and MemoryRecords: > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > > > > > > > > > > > > > > > Fang, > > > > > > > > > > Do you have a way to re-produce the issue? I.e. if you have the > exact > > > > same > > > > > produce data at hand, could you validate that their cumulated size > is > > > > less > > > > > than the limit and then try sending them to Kafka and see if it > > always > > > > > triggers the problem? > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Mon, Mar 7, 2016 at 10:23 AM, Fang Wong <fw...@salesforce.com> > > > wrote: > > > > > > > > > > > No, we don't have compression turned on the batch size is the > > > default: > > > > > > 16384. > > > > > > But the message size is very small, even with that batch size, it > > is > > > > > > impossible to exceed the size limit. > > > > > > > > > > > > Thanks, > > > > > > Fang > > > > > > > > > > > > On Sun, Mar 6, 2016 at 6:09 PM, John Dennison < > > > dennison.j...@gmail.com > > > > > > > > > > > wrote: > > > > > > > > > > > > > Guozhang, > > > > > > > > > > > > > > Do you know the ticket for for changing the "batching criterion > > > from > > > > > > > #.messages to bytes." I am unable to find it. Working on > porting > > > > > > > a similar change to pykafka. > > > > > > > > > > > > > > > > > > > > > John > > > > > > > > > > > > > > > > > > > > > On Sat, Mar 5, 2016 at 4:29 PM, Guozhang Wang < > > wangg...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > > > Did you have compression turned on and batching (in terms of > > > > > > #.messages)? > > > > > > > > In that case the whole compressed message set is treated as a > > > > single > > > > > > > > message on the broker and hence could possibly exceed the > > limit. > > > > > > > > > > > > > > > > In newer versions we have changed the batching criterion from > > > > > > #.messages > > > > > > > to > > > > > > > > bytes, which is aimed at resolving such issues. > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > On Thu, Mar 3, 2016 at 1:04 PM, Fang Wong < > > fw...@salesforce.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > Got the following error message with Kafka 0.8.2.1: > > > > > > > > > [2016-02-26 20:33:43,025] INFO Closing socket connection to > > /x > > > > due > > > > > to > > > > > > > > > invalid request: Request of length 1937006964 is not valid, > > it > > > is > > > > > > > larger > > > > > > > > > than the maximum size of 104857600 bytes. > > > > (kafka.network.Processor) > > > > > > > > > > > > > > > > > > Didn't send a large message at all, it seems like encoding > > > issue > > > > or > > > > > > > > partial > > > > > > > > > request, any suggestion how to fix it? > > > > > > > > > > > > > > > > > > The code is like below: > > > > > > > > > > > > > > > > > > ByteArrayOutputStream bos = new > ByteArrayOutputStream(); > > > > > > > > > > > > > > > > > > DataOutputStream dos = new DataOutputStream(bos); > > > > > > > > > > > > > > > > > > dos.writeLong(System.currentTimeMillis()); > > > > > > > > > > > > > > > > > > OutputStreamWriter byteWriter = new > > OutputStreamWriter(bos, > > > > > > > > > com.force.commons.text.EncodingUtil.UTF_ENCODING); > > > > > > > > > > > > > > > > > > gson.toJson(obj, byteWriter); > > > > > > > > > > > > > > > > > > byte[] payload = bos.toByteArray(); > > > > > > > > > > > > > > > > > > ProducerRecord<String, byte[]> data = new > > > > > ProducerRecord<String, > > > > > > > > > byte[]>(“Topic”, 0, null, payload); > > > > > > > > > > > > > > > > > > kafkaProducer.send(data); > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >