Adding more info:- Hi Guozhang,
I was using exactly_once processing here, I can see this in the client logs, however I am not setting transaction id though. application.id = c-7-e6 application.server = bootstrap.servers = [172.29.65.190:9092, 172.29.65.192:9092, 172.29.65.193:9092] buffered.records.per.partition = 10000 cache.max.bytes.buffering = 2097152000 client.id = commit.interval.ms = 5000 connections.max.idle.ms = 540000 default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde metadata.max.age.ms = 60000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 num.standby.replicas = 0 num.stream.threads = 15 partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper poll.ms = 100 processing.guarantee = exactly_once receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 replication.factor = 1 request.timeout.ms = 40000 retry.backoff.ms = 100 rocksdb.config.setter = null security.protocol = PLAINTEXT send.buffer.bytes = 131072 state.cleanup.delay.ms = 4611686018427386903 state.dir = /data/streampoc/ timestamp.extractor = class org.apache.kafka.streams.processor.WallclockTimestampExtractor value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde windowstore.changelog.additional.retention.ms = 86400000 zookeeper.connect = On Wed, Sep 13, 2017 at 12:16 PM, Sameer Kumar <sam.kum.w...@gmail.com> wrote: > Hi Guozhang, > > The producer sending data to this topic is not running concurrently with > the stream processing. I had first ingested the data from another cluster > and then have the stream processing ran on it. The producer code is written > by me and it doesnt have transactions on by default. > > I will double check if someone else has transaction turned on, but this is > quite unlikely. Is there someway to verify it through logs. > > All of this behavior works fine when brokers are run on Kafka 10, this > might be because transactions are only available on Kafka11. I am > suspecting would there be a case that too much processing is causing one of > the brokers to crash. The timeouts are indicating that it is taking time to > send data > > I have tried this behavior also on a another cluster which I exclusively > use it for myself and found the same behavior there as well. > > What do you think should be our next step so that we can get to the root > of the issue. > > -Sameer. > > On Wed, Sep 13, 2017 at 6:14 AM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Hi Sameer, >> >> If no clients has transactions turned on the `__transaction_state` >> internal >> topic would not be created at all. So I still suspect that some of your >> clients (maybe not your Streams client, but your Producer client that is >> sending data to the source topic?) has transactions turned on. >> >> BTW from your logs I saw lots of the following errors on client side: >> >> 2017-09-11 12:42:34 ERROR RecordCollectorImpl:113 - task [0_6] Error >> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000 >> 0000007-repartition. >> No more offsets will be recorded for this task and the exception will >> eventually be thrown >> >> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13 record(s) >> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31174 ms has >> passed since last append >> >> 2017-09-11 12:42:36 WARN Sender:511 - Got error produce response with >> correlation id 82862 on topic-partition >> c-7-e6-KSTREAM-JOINTHIS-0000000018-store-changelog-22, retrying >> (2147483646 >> attempts left). *Error: NETWORK_EXCEPTION* >> >> 2017-09-11 12:42:36 ERROR RecordCollectorImpl:113 - task [0_22] Error >> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000 >> 0000007-repartition. >> No more offsets will be recorded for this task and the exception will >> eventually be thrown >> >> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13 record(s) >> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31467 ms has >> passed since last append >> >> >> Today if the TimeoutException is thrown from the recordCollector it will >> cause the Streams to throw this exception all the way to the user >> exception >> handler and then shutdown the thread. And this exception would be thrown >> if >> the Kafka broker itself is not available (also from your previous logs it >> seems broker 192 and 193 was unavailable and hence being kicked out by >> broker 109 out of the IRS). >> >> >> Guozhang >> >> >> >> On Mon, Sep 11, 2017 at 3:40 AM, Sameer Kumar <sam.kum.w...@gmail.com> >> wrote: >> >> > Hi Guozhang, >> > >> > Please find the relevant logs, see a folder for client logs as well, >> > things started getting awry at 12:42:05. >> > Let me know if you need any more information. >> > >> > -Sameer. >> > >> > On Sun, Sep 10, 2017 at 5:06 PM, Sameer Kumar <sam.kum.w...@gmail.com> >> > wrote: >> > >> >> Hi Guozhang, >> >> >> >> Nope, I was not using exactly-once mode. I dont have the client logs >> with >> >> me right now, I will try to replicate it again and share the other >> details >> >> with you. >> >> >> >> My concern was that it crashed my brokers as well. >> >> >> >> -Sameer. >> >> >> >> On Sat, Sep 9, 2017 at 1:51 AM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >> >> >>> Hello Sameer, >> >>> >> >>> I looked through your code, and here is what I figured: in 0.11 >> version >> >>> we >> >>> added the exactly-once feature ( >> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E >> >>> xactly+Once+Delivery+and+Transactional+Messaging >> >>> ) >> >>> >> >>> Which uses the transaction log (internal topic named >> >>> "__transaction_state") >> >>> that has a default replication of 3 (that will overwrite your global >> >>> config >> >>> value of 2). Then at around 12:30, the leader of the transation log >> >>> partition kicked both replicas of 190 and 192 out of the replica: >> >>> >> >>> [2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]: Preparing to >> >>> rebalance group KafkaCache_TEST15 with old generation 14 >> >>> (__consumer_offsets-27) (kafka.coordinator.group.GroupCoordinator) >> >>> >> >>> *[2017-09-05 12:30:41,510] INFO Partition [__transaction_state,9] on >> >>> broker >> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)* >> >>> >> >>> *[2017-09-05 12:30:41,513] INFO Partition [__transaction_state,6] on >> >>> broker >> >>> 193: Shrinking ISR from 193,190,192 to 193 (kafka.cluster.Partition)* >> >>> >> >>> *[2017-09-05 12:30:41,514] INFO Partition [__transaction_state,3] on >> >>> broker >> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)* >> >>> >> >>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,18] on >> >>> broker 193: Shrinking ISR from 193,190,192 to 193 >> >>> (kafka.cluster.Partition)* >> >>> >> >>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,15] on >> >>> broker 193: Shrinking ISR from 193,192,190 to 193 >> >>> (kafka.cluster.Partition)* >> >>> >> >>> *[2017-09-05 12:30:41,516] INFO Partition [__transaction_state,12] on >> >>> broker 193: Shrinking ISR from 193,190,192 to 193 >> >>> (kafka.cluster.Partition)* >> >>> >> >>> *[2017-09-05 12:30:41,516] INFO Partition [__consumer_offsets,12] on >> >>> broker >> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)* >> >>> >> >>> *[2017-09-05 12:30:41,517] INFO Partition [__consumer_offsets,15] on >> >>> broker >> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)* >> >>> >> >>> *[2017-09-05 12:30:41,517] INFO Partition [__transaction_state,24] on >> >>> broker 193: Shrinking ISR from 193,190,192 to 193 >> >>> (kafka.cluster.Partition)* >> >>> >> >>> At the mean time, both replicas of 190 and 192 seems to be timed out >> on >> >>> their fetch requests (note the big timestamp gap in the logs): >> >>> >> >>> [2017-09-05 12:26:21,130] INFO Rolled new log segment for 'AdServe-4' >> in >> >>> 1 >> >>> ms. (kafka.log.Log) >> >>> [2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]: Error in >> >>> fetch >> >>> to broker 193, request (type=FetchRequest, replicaId=190, maxWait=500, >> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets- >> >>> 21=(offset=0, >> >>> logStartOffset=0, maxBytes=1048576) >> >>> >> >>> ... >> >>> >> >>> [2017-09-05 12:28:37,514] INFO Deleting index >> >>> /data1/kafka/AdServe-5/00000000000405000294.timeindex.deleted >> >>> (kafka.log.TimeIndex) >> >>> [2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]: Error in >> >>> fetch >> >>> to broker 193, request (type=FetchRequest, replicaId=192, maxWait=500, >> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets- >> >>> 21=(offset=0, >> >>> logStartOffset=0, maxBytes=1048576) >> >>> >> >>> >> >>> >> >>> This caused the NotEnoughReplicasException since any appends to the >> >>> transaction logs are required "acks=all, and min.isr=num.replicas". >> >>> >> >>> *[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker 193]: >> Error >> >>> processing append operation on partition __transaction_state-18 >> >>> (kafka.server.ReplicaManager)* >> >>> >> >>> *org.apache.kafka.common.errors.NotEnoughReplicasException: Number of >> >>> insync replicas for partition __transaction_state-18 is [1], below >> >>> required >> >>> minimum [3]* >> >>> >> >>> Upon seeing this error, the transaction coordinator should retry >> >>> appending, >> >>> but if the retry never succeeds it will be blocked. I did not see the >> >>> Streams API client-side logs and so cannot tell for sure, why this >> caused >> >>> the Streams app to fail as well. A quick question: did you enable >> >>> `processing.mode=exactly-once` on your streams app? >> >>> >> >>> >> >>> Guozhang >> >>> >> >>> >> >>> >> >>> >> >>> On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar <sam.kum.w...@gmail.com> >> >>> wrote: >> >>> >> >>> > Hi All, >> >>> > >> >>> > >> >>> > Any thoughts on the below mail. >> >>> > >> >>> > -Sameer. >> >>> > >> >>> > On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar < >> sam.kum.w...@gmail.com> >> >>> > wrote: >> >>> > >> >>> > > Hi All, >> >>> > > >> >>> > > I want to report a scenario wherein my running 2 different >> instances >> >>> of >> >>> > my >> >>> > > stream application caused my brokers to crash and eventually my >> >>> stream >> >>> > > application as well. This scenario only happens when my brokers >> run >> >>> on >> >>> > > Kafka11, everything works fine if my brokers are on Kafka 10..2 >> and >> >>> > stream >> >>> > > application on Kafka11. >> >>> > > >> >>> > > I am attaching herewith the logs in a zipped format. >> >>> > > >> >>> > > The cluster configuration >> >>> > > 3 nodes(190,192,193) , Kafka 11 >> >>> > > Topic Replication Factor - 2 >> >>> > > >> >>> > > App configuration >> >>> > > Kafka 11 streams. >> >>> > > >> >>> > > >> >>> > > The error I saw on 193 server was org.apache.kafka.common.errors. >> >>> > NotEnoughReplicasException: >> >>> > > Number of insync replicas for partition __transaction_state-18 is >> >>> [1], >> >>> > > below required minimum [3]. Both 192,190 servers reported errors >> on >> >>> > failure >> >>> > > to read information from 193. >> >>> > > >> >>> > > Please look for the time around 12:30-12:32 to find the relevant >> >>> logs. >> >>> > Let >> >>> > > me know if you need some other information. >> >>> > > >> >>> > > >> >>> > > Regards, >> >>> > > -Sameer. >> >>> > > >> >>> > >> >>> >> >>> >> >>> >> >>> -- >> >>> -- Guozhang >> >>> >> >> >> >> >> > >> >> >> -- >> -- Guozhang >> > >