When exactly_once is turned on the transactional id would be set
automatically by the Streams client.

What I'd inspect is the healthiness of the brokers since the "
*TimeoutException*", if you have metrics on the broker servers regarding
request handler thread idleness / request queue length / request rate etc,
you can monitor that and see what could be the possible causes of the
broker unavailability.


Guozhang


On Wed, Sep 13, 2017 at 8:26 AM, Sameer Kumar <sam.kum.w...@gmail.com>
wrote:

> Adding more info:-
>
> Hi Guozhang,
>
> I was using exactly_once processing here, I can see this in the client
> logs, however I am not setting transaction id though.
>
> application.id = c-7-e6
> application.server =
> bootstrap.servers = [172.29.65.190:9092, 172.29.65.192:9092,
> 172.29.65.193:9092]
> buffered.records.per.partition = 10000
> cache.max.bytes.buffering = 2097152000
> client.id =
> commit.interval.ms = 5000
> connections.max.idle.ms = 540000
> default.key.serde = class
> org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> default.timestamp.extractor = class
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp
> default.value.serde = class
> org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
> metadata.max.age.ms = 60000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> num.standby.replicas = 0
> num.stream.threads = 15
> partition.grouper = class
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
> poll.ms = 100
> processing.guarantee = exactly_once
> receive.buffer.bytes = 32768
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> replication.factor = 1
> request.timeout.ms = 40000
> retry.backoff.ms = 100
> rocksdb.config.setter = null
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> state.cleanup.delay.ms = 4611686018427386903
> state.dir = /data/streampoc/
> timestamp.extractor = class
> org.apache.kafka.streams.processor.WallclockTimestampExtractor
> value.serde = class org.apache.kafka.common.serialization.Serdes$
> StringSerde
> windowstore.changelog.additional.retention.ms = 86400000
> zookeeper.connect =
>
>
> On Wed, Sep 13, 2017 at 12:16 PM, Sameer Kumar <sam.kum.w...@gmail.com>
> wrote:
>
> > Hi Guozhang,
> >
> > The producer sending data to this topic is not running concurrently with
> > the stream processing. I had first ingested the data from another cluster
> > and then have the stream processing ran on it. The producer code is
> written
> > by me and it doesnt have transactions on by default.
> >
> > I will double check if someone else has transaction turned on, but this
> is
> > quite unlikely. Is there someway to verify it through logs.
> >
> > All of this behavior works fine when brokers are run on Kafka 10, this
> > might be because transactions are only available on Kafka11. I am
> > suspecting would there be a case that too much processing is causing one
> of
> > the brokers to crash. The timeouts are indicating that it is taking time
> to
> > send data
> >
> > I have tried this behavior also on a another cluster which I exclusively
> > use it for myself and found the same behavior there as well.
> >
> > What do you think should be our next step so that we can get to the root
> > of the issue.
> >
> > -Sameer.
> >
> > On Wed, Sep 13, 2017 at 6:14 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Hi Sameer,
> >>
> >> If no clients has transactions turned on the `__transaction_state`
> >> internal
> >> topic would not be created at all. So I still suspect that some of your
> >> clients (maybe not your Streams client, but your Producer client that is
> >> sending data to the source topic?) has transactions turned on.
> >>
> >> BTW from your logs I saw lots of the following errors on client side:
> >>
> >> 2017-09-11 12:42:34 ERROR RecordCollectorImpl:113 - task [0_6] Error
> >> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
> >> 0000007-repartition.
> >> No more offsets will be recorded for this task and the exception will
> >> eventually be thrown
> >>
> >> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13
> record(s)
> >> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31174 ms has
> >> passed since last append
> >>
> >> 2017-09-11 12:42:36 WARN  Sender:511 - Got error produce response with
> >> correlation id 82862 on topic-partition
> >> c-7-e6-KSTREAM-JOINTHIS-0000000018-store-changelog-22, retrying
> >> (2147483646
> >> attempts left). *Error: NETWORK_EXCEPTION*
> >>
> >> 2017-09-11 12:42:36 ERROR RecordCollectorImpl:113 - task [0_22] Error
> >> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
> >> 0000007-repartition.
> >> No more offsets will be recorded for this task and the exception will
> >> eventually be thrown
> >>
> >> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13
> record(s)
> >> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31467 ms has
> >> passed since last append
> >>
> >>
> >> Today if the TimeoutException is thrown from the recordCollector it will
> >> cause the Streams to throw this exception all the way to the user
> >> exception
> >> handler and then shutdown the thread. And this exception would be thrown
> >> if
> >> the Kafka broker itself is not available (also from your previous logs
> it
> >> seems broker 192 and 193 was unavailable and hence being kicked out by
> >> broker 109 out of the IRS).
> >>
> >>
> >> Guozhang
> >>
> >>
> >>
> >> On Mon, Sep 11, 2017 at 3:40 AM, Sameer Kumar <sam.kum.w...@gmail.com>
> >> wrote:
> >>
> >> > Hi Guozhang,
> >> >
> >> > Please find the relevant logs, see a folder for client logs as well,
> >> > things started getting awry at 12:42:05.
> >> > Let me know if you need any more information.
> >> >
> >> > -Sameer.
> >> >
> >> > On Sun, Sep 10, 2017 at 5:06 PM, Sameer Kumar <sam.kum.w...@gmail.com
> >
> >> > wrote:
> >> >
> >> >> Hi Guozhang,
> >> >>
> >> >> Nope, I was not using exactly-once mode. I dont have the client logs
> >> with
> >> >> me right now, I will try to replicate it again and share the other
> >> details
> >> >> with you.
> >> >>
> >> >> My concern was that it crashed my brokers as well.
> >> >>
> >> >> -Sameer.
> >> >>
> >> >> On Sat, Sep 9, 2017 at 1:51 AM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >> >>
> >> >>> Hello Sameer,
> >> >>>
> >> >>> I looked through your code, and here is what I figured: in 0.11
> >> version
> >> >>> we
> >> >>> added the exactly-once feature (
> >> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E
> >> >>> xactly+Once+Delivery+and+Transactional+Messaging
> >> >>> )
> >> >>>
> >> >>> Which uses the transaction log (internal topic named
> >> >>> "__transaction_state")
> >> >>> that has a default replication of 3 (that will overwrite your global
> >> >>> config
> >> >>> value of 2). Then at around 12:30, the leader of the transation log
> >> >>> partition kicked both replicas of 190 and 192 out of the replica:
> >> >>>
> >> >>> [2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]: Preparing to
> >> >>> rebalance group KafkaCache_TEST15 with old generation 14
> >> >>> (__consumer_offsets-27) (kafka.coordinator.group.GroupCoordinator)
> >> >>>
> >> >>> *[2017-09-05 12:30:41,510] INFO Partition [__transaction_state,9] on
> >> >>> broker
> >> >>> 193: Shrinking ISR from 193,192,190 to 193
> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,513] INFO Partition [__transaction_state,6] on
> >> >>> broker
> >> >>> 193: Shrinking ISR from 193,190,192 to 193
> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,514] INFO Partition [__transaction_state,3] on
> >> >>> broker
> >> >>> 193: Shrinking ISR from 193,192,190 to 193
> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,18]
> on
> >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> >> >>> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,15]
> on
> >> >>> broker 193: Shrinking ISR from 193,192,190 to 193
> >> >>> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,516] INFO Partition [__transaction_state,12]
> on
> >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> >> >>> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,516] INFO Partition [__consumer_offsets,12] on
> >> >>> broker
> >> >>> 193: Shrinking ISR from 193,192,190 to 193
> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,517] INFO Partition [__consumer_offsets,15] on
> >> >>> broker
> >> >>> 193: Shrinking ISR from 193,192,190 to 193
> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,517] INFO Partition [__transaction_state,24]
> on
> >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> >> >>> (kafka.cluster.Partition)*
> >> >>>
> >> >>> At the mean time, both replicas of 190 and 192 seems to be timed out
> >> on
> >> >>> their fetch requests (note the big timestamp gap in the logs):
> >> >>>
> >> >>> [2017-09-05 12:26:21,130] INFO Rolled new log segment for
> 'AdServe-4'
> >> in
> >> >>> 1
> >> >>> ms. (kafka.log.Log)
> >> >>> [2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]: Error
> in
> >> >>> fetch
> >> >>> to broker 193, request (type=FetchRequest, replicaId=190,
> maxWait=500,
> >> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
> >> >>> 21=(offset=0,
> >> >>> logStartOffset=0, maxBytes=1048576)
> >> >>>
> >> >>> ...
> >> >>>
> >> >>> [2017-09-05 12:28:37,514] INFO Deleting index
> >> >>> /data1/kafka/AdServe-5/00000000000405000294.timeindex.deleted
> >> >>> (kafka.log.TimeIndex)
> >> >>> [2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]: Error
> in
> >> >>> fetch
> >> >>> to broker 193, request (type=FetchRequest, replicaId=192,
> maxWait=500,
> >> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
> >> >>> 21=(offset=0,
> >> >>> logStartOffset=0, maxBytes=1048576)
> >> >>>
> >> >>>
> >> >>>
> >> >>> This caused the NotEnoughReplicasException since any appends to the
> >> >>> transaction logs are required "acks=all, and min.isr=num.replicas".
> >> >>>
> >> >>> *[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker 193]:
> >> Error
> >> >>> processing append operation on partition __transaction_state-18
> >> >>> (kafka.server.ReplicaManager)*
> >> >>>
> >> >>> *org.apache.kafka.common.errors.NotEnoughReplicasException: Number
> of
> >> >>> insync replicas for partition __transaction_state-18 is [1], below
> >> >>> required
> >> >>> minimum [3]*
> >> >>>
> >> >>> Upon seeing this error, the transaction coordinator should retry
> >> >>> appending,
> >> >>> but if the retry never succeeds it will be blocked. I did not see
> the
> >> >>> Streams API client-side logs and so cannot tell for sure, why this
> >> caused
> >> >>> the Streams app to fail as well. A quick question: did you enable
> >> >>> `processing.mode=exactly-once` on your streams app?
> >> >>>
> >> >>>
> >> >>> Guozhang
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar <
> sam.kum.w...@gmail.com>
> >> >>> wrote:
> >> >>>
> >> >>> > Hi All,
> >> >>> >
> >> >>> >
> >> >>> > Any thoughts on the below mail.
> >> >>> >
> >> >>> > -Sameer.
> >> >>> >
> >> >>> > On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar <
> >> sam.kum.w...@gmail.com>
> >> >>> > wrote:
> >> >>> >
> >> >>> > > Hi All,
> >> >>> > >
> >> >>> > > I want to report a scenario wherein my running 2 different
> >> instances
> >> >>> of
> >> >>> > my
> >> >>> > > stream application caused my brokers to crash and eventually my
> >> >>> stream
> >> >>> > > application as well. This scenario only happens when my brokers
> >> run
> >> >>> on
> >> >>> > > Kafka11, everything works fine if my brokers are on Kafka 10..2
> >> and
> >> >>> > stream
> >> >>> > > application on Kafka11.
> >> >>> > >
> >> >>> > > I am attaching herewith the logs in a zipped format.
> >> >>> > >
> >> >>> > > The cluster configuration
> >> >>> > > 3 nodes(190,192,193) , Kafka 11
> >> >>> > > Topic Replication Factor - 2
> >> >>> > >
> >> >>> > > App configuration
> >> >>> > > Kafka 11 streams.
> >> >>> > >
> >> >>> > >
> >> >>> > > The error I saw on 193 server was org.apache.kafka.common.
> errors.
> >> >>> > NotEnoughReplicasException:
> >> >>> > > Number of insync replicas for partition __transaction_state-18
> is
> >> >>> [1],
> >> >>> > > below required minimum [3]. Both 192,190 servers reported errors
> >> on
> >> >>> > failure
> >> >>> > > to read information from 193.
> >> >>> > >
> >> >>> > > Please look for the time around 12:30-12:32 to find the relevant
> >> >>> logs.
> >> >>> > Let
> >> >>> > > me know if you need some other information.
> >> >>> > >
> >> >>> > >
> >> >>> > > Regards,
> >> >>> > > -Sameer.
> >> >>> > >
> >> >>> >
> >> >>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> -- Guozhang
> >> >>>
> >> >>
> >> >>
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Reply via email to