Adding more info:-

Hi Guozhang,

I was using exactly_once processing here, I can see this in the client
logs, however I am not setting transaction id though.

application.id = c-7-e6
application.server =
bootstrap.servers = [172.29.65.190:9092, 172.29.65.192:9092,
172.29.65.193:9092]
buffered.records.per.partition = 10000
cache.max.bytes.buffering = 2097152000
client.id =
commit.interval.ms = 5000
connections.max.idle.ms = 540000
default.key.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
metadata.max.age.ms = 60000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 15
partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = exactly_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 4611686018427386903
state.dir = /data/streampoc/
timestamp.extractor = class
org.apache.kafka.streams.processor.WallclockTimestampExtractor
value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect =


On Wed, Sep 13, 2017 at 12:16 PM, Sameer Kumar <sam.kum.w...@gmail.com>
wrote:

> Hi Guozhang,
>
> The producer sending data to this topic is not running concurrently with
> the stream processing. I had first ingested the data from another cluster
> and then have the stream processing ran on it. The producer code is written
> by me and it doesnt have transactions on by default.
>
> I will double check if someone else has transaction turned on, but this is
> quite unlikely. Is there someway to verify it through logs.
>
> All of this behavior works fine when brokers are run on Kafka 10, this
> might be because transactions are only available on Kafka11. I am
> suspecting would there be a case that too much processing is causing one of
> the brokers to crash. The timeouts are indicating that it is taking time to
> send data
>
> I have tried this behavior also on a another cluster which I exclusively
> use it for myself and found the same behavior there as well.
>
> What do you think should be our next step so that we can get to the root
> of the issue.
>
> -Sameer.
>
> On Wed, Sep 13, 2017 at 6:14 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Hi Sameer,
>>
>> If no clients has transactions turned on the `__transaction_state`
>> internal
>> topic would not be created at all. So I still suspect that some of your
>> clients (maybe not your Streams client, but your Producer client that is
>> sending data to the source topic?) has transactions turned on.
>>
>> BTW from your logs I saw lots of the following errors on client side:
>>
>> 2017-09-11 12:42:34 ERROR RecordCollectorImpl:113 - task [0_6] Error
>> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
>> 0000007-repartition.
>> No more offsets will be recorded for this task and the exception will
>> eventually be thrown
>>
>> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13 record(s)
>> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31174 ms has
>> passed since last append
>>
>> 2017-09-11 12:42:36 WARN  Sender:511 - Got error produce response with
>> correlation id 82862 on topic-partition
>> c-7-e6-KSTREAM-JOINTHIS-0000000018-store-changelog-22, retrying
>> (2147483646
>> attempts left). *Error: NETWORK_EXCEPTION*
>>
>> 2017-09-11 12:42:36 ERROR RecordCollectorImpl:113 - task [0_22] Error
>> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
>> 0000007-repartition.
>> No more offsets will be recorded for this task and the exception will
>> eventually be thrown
>>
>> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13 record(s)
>> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31467 ms has
>> passed since last append
>>
>>
>> Today if the TimeoutException is thrown from the recordCollector it will
>> cause the Streams to throw this exception all the way to the user
>> exception
>> handler and then shutdown the thread. And this exception would be thrown
>> if
>> the Kafka broker itself is not available (also from your previous logs it
>> seems broker 192 and 193 was unavailable and hence being kicked out by
>> broker 109 out of the IRS).
>>
>>
>> Guozhang
>>
>>
>>
>> On Mon, Sep 11, 2017 at 3:40 AM, Sameer Kumar <sam.kum.w...@gmail.com>
>> wrote:
>>
>> > Hi Guozhang,
>> >
>> > Please find the relevant logs, see a folder for client logs as well,
>> > things started getting awry at 12:42:05.
>> > Let me know if you need any more information.
>> >
>> > -Sameer.
>> >
>> > On Sun, Sep 10, 2017 at 5:06 PM, Sameer Kumar <sam.kum.w...@gmail.com>
>> > wrote:
>> >
>> >> Hi Guozhang,
>> >>
>> >> Nope, I was not using exactly-once mode. I dont have the client logs
>> with
>> >> me right now, I will try to replicate it again and share the other
>> details
>> >> with you.
>> >>
>> >> My concern was that it crashed my brokers as well.
>> >>
>> >> -Sameer.
>> >>
>> >> On Sat, Sep 9, 2017 at 1:51 AM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >>
>> >>> Hello Sameer,
>> >>>
>> >>> I looked through your code, and here is what I figured: in 0.11
>> version
>> >>> we
>> >>> added the exactly-once feature (
>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E
>> >>> xactly+Once+Delivery+and+Transactional+Messaging
>> >>> )
>> >>>
>> >>> Which uses the transaction log (internal topic named
>> >>> "__transaction_state")
>> >>> that has a default replication of 3 (that will overwrite your global
>> >>> config
>> >>> value of 2). Then at around 12:30, the leader of the transation log
>> >>> partition kicked both replicas of 190 and 192 out of the replica:
>> >>>
>> >>> [2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]: Preparing to
>> >>> rebalance group KafkaCache_TEST15 with old generation 14
>> >>> (__consumer_offsets-27) (kafka.coordinator.group.GroupCoordinator)
>> >>>
>> >>> *[2017-09-05 12:30:41,510] INFO Partition [__transaction_state,9] on
>> >>> broker
>> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,513] INFO Partition [__transaction_state,6] on
>> >>> broker
>> >>> 193: Shrinking ISR from 193,190,192 to 193 (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,514] INFO Partition [__transaction_state,3] on
>> >>> broker
>> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,18] on
>> >>> broker 193: Shrinking ISR from 193,190,192 to 193
>> >>> (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,15] on
>> >>> broker 193: Shrinking ISR from 193,192,190 to 193
>> >>> (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,516] INFO Partition [__transaction_state,12] on
>> >>> broker 193: Shrinking ISR from 193,190,192 to 193
>> >>> (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,516] INFO Partition [__consumer_offsets,12] on
>> >>> broker
>> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,517] INFO Partition [__consumer_offsets,15] on
>> >>> broker
>> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,517] INFO Partition [__transaction_state,24] on
>> >>> broker 193: Shrinking ISR from 193,190,192 to 193
>> >>> (kafka.cluster.Partition)*
>> >>>
>> >>> At the mean time, both replicas of 190 and 192 seems to be timed out
>> on
>> >>> their fetch requests (note the big timestamp gap in the logs):
>> >>>
>> >>> [2017-09-05 12:26:21,130] INFO Rolled new log segment for 'AdServe-4'
>> in
>> >>> 1
>> >>> ms. (kafka.log.Log)
>> >>> [2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]: Error in
>> >>> fetch
>> >>> to broker 193, request (type=FetchRequest, replicaId=190, maxWait=500,
>> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
>> >>> 21=(offset=0,
>> >>> logStartOffset=0, maxBytes=1048576)
>> >>>
>> >>> ...
>> >>>
>> >>> [2017-09-05 12:28:37,514] INFO Deleting index
>> >>> /data1/kafka/AdServe-5/00000000000405000294.timeindex.deleted
>> >>> (kafka.log.TimeIndex)
>> >>> [2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]: Error in
>> >>> fetch
>> >>> to broker 193, request (type=FetchRequest, replicaId=192, maxWait=500,
>> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
>> >>> 21=(offset=0,
>> >>> logStartOffset=0, maxBytes=1048576)
>> >>>
>> >>>
>> >>>
>> >>> This caused the NotEnoughReplicasException since any appends to the
>> >>> transaction logs are required "acks=all, and min.isr=num.replicas".
>> >>>
>> >>> *[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker 193]:
>> Error
>> >>> processing append operation on partition __transaction_state-18
>> >>> (kafka.server.ReplicaManager)*
>> >>>
>> >>> *org.apache.kafka.common.errors.NotEnoughReplicasException: Number of
>> >>> insync replicas for partition __transaction_state-18 is [1], below
>> >>> required
>> >>> minimum [3]*
>> >>>
>> >>> Upon seeing this error, the transaction coordinator should retry
>> >>> appending,
>> >>> but if the retry never succeeds it will be blocked. I did not see the
>> >>> Streams API client-side logs and so cannot tell for sure, why this
>> caused
>> >>> the Streams app to fail as well. A quick question: did you enable
>> >>> `processing.mode=exactly-once` on your streams app?
>> >>>
>> >>>
>> >>> Guozhang
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar <sam.kum.w...@gmail.com>
>> >>> wrote:
>> >>>
>> >>> > Hi All,
>> >>> >
>> >>> >
>> >>> > Any thoughts on the below mail.
>> >>> >
>> >>> > -Sameer.
>> >>> >
>> >>> > On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar <
>> sam.kum.w...@gmail.com>
>> >>> > wrote:
>> >>> >
>> >>> > > Hi All,
>> >>> > >
>> >>> > > I want to report a scenario wherein my running 2 different
>> instances
>> >>> of
>> >>> > my
>> >>> > > stream application caused my brokers to crash and eventually my
>> >>> stream
>> >>> > > application as well. This scenario only happens when my brokers
>> run
>> >>> on
>> >>> > > Kafka11, everything works fine if my brokers are on Kafka 10..2
>> and
>> >>> > stream
>> >>> > > application on Kafka11.
>> >>> > >
>> >>> > > I am attaching herewith the logs in a zipped format.
>> >>> > >
>> >>> > > The cluster configuration
>> >>> > > 3 nodes(190,192,193) , Kafka 11
>> >>> > > Topic Replication Factor - 2
>> >>> > >
>> >>> > > App configuration
>> >>> > > Kafka 11 streams.
>> >>> > >
>> >>> > >
>> >>> > > The error I saw on 193 server was org.apache.kafka.common.errors.
>> >>> > NotEnoughReplicasException:
>> >>> > > Number of insync replicas for partition __transaction_state-18 is
>> >>> [1],
>> >>> > > below required minimum [3]. Both 192,190 servers reported errors
>> on
>> >>> > failure
>> >>> > > to read information from 193.
>> >>> > >
>> >>> > > Please look for the time around 12:30-12:32 to find the relevant
>> >>> logs.
>> >>> > Let
>> >>> > > me know if you need some other information.
>> >>> > >
>> >>> > >
>> >>> > > Regards,
>> >>> > > -Sameer.
>> >>> > >
>> >>> >
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> -- Guozhang
>> >>>
>> >>
>> >>
>> >
>>
>>
>> --
>> -- Guozhang
>>
>
>

Reply via email to