Re: [DISCUSS] : KIP-410: Add metric for request handler thread pool utilization by request type

2018-12-26 Thread Stanislav Kozlovski
Hey there Mayuresh,

Thanks for the KIP! This will prove to be very useful.

I am wondering whether we should opt for the name of `
RequestHandlerPoolUsagePercent`. We seem to use the "Percent" suffix to
denote fractions of times in other places - e.g
`NetworkProcessorAvgIdlePercent`

The average fraction of time the network processors are idle
kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent
between 0 and 1, ideally > 0.3


Whereas we use the "Rate" suffix to denote the number of events per second
- e.g the clients' "connection-close-rate"


  connection-close-rate
  Connections closed per second in the window.
  
kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)



A separate nit - you may want to update the discussion thread link in the
KIP as it is pointing to the default one.

Thanks,
Stanislav

On Thu, Dec 20, 2018 at 9:58 PM Mayuresh Gharat 
wrote:

> I would like to get feedback on the proposal to add a metric for request
> handler thread pool utilization by request type. Please find the KIP here :
> KIP-410: Add metric for request handler thread pool utilization by request
> type
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-410%3A+Add+metric+for+request+handler+thread+pool+utilization+by+request+type
> >
>
>
> Thanks,
>
> Mayuresh
>


-- 
Best,
Stanislav


[jira] [Created] (KAFKA-7769) Illegal batch type class

2018-12-26 Thread Asaf Mesika (JIRA)
Asaf Mesika created KAFKA-7769:
--

 Summary:  Illegal batch type class
 Key: KAFKA-7769
 URL: https://issues.apache.org/jira/browse/KAFKA-7769
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 1.1.0
 Environment: Kafka 1.1.0 on client side, running on Java 8
Kafka 1.1.0 on broker side, running on Java 8
Reporter: Asaf Mesika


I get the following exception from Kafka Consumer version 1.1.0:
{code:java}
kafka.common.KafkaException: Error processing data for partition acmetopic-24 
offset 1408742703 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:205)
 at scala.Option.foreach(Option.scala:257) at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:169)
 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:166)
 at scala.collection.Iterator.foreach(Iterator.scala:929) at 
scala.collection.Iterator.foreach$(Iterator.scala:929) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1417) at 
scala.collection.IterableLike.foreach(IterableLike.scala:71) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:70) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:166)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:166)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) Caused by: 
java.lang.IllegalArgumentException: Illegal batch type class 
org.apache.kafka.common.record.DefaultRecordBatch. The older message format 
classes only support conversion from class 
org.apache.kafka.common.record.AbstractLegacyRecordBatch, which is used for 
magic v0 and v1 at 
kafka.message.MessageAndOffset$.fromRecordBatch(MessageAndOffset.scala:30) at 
kafka.message.ByteBufferMessageSet.$anonfun$internalIterator$1(ByteBufferMessageSet.scala:169)
 at scala.collection.Iterator$$anon$10.next(Iterator.scala:448) at 
scala.collection.Iterator.toStream(Iterator.scala:1403) at 
scala.collection.Iterator.toStream$(Iterator.scala:1402) at 
scala.collection.AbstractIterator.toStream(Iterator.scala:1417) at 
scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:298) at 
scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:298) at 
scala.collection.AbstractIterator.toSeq(Iterator.scala:1417) at 
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:59) at 
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:87)
 at 
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:37)
 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:183)
 ... 15 common frames omitted{code}
 

It happens once in  a while. I initialise the client with zooKeeper connect, 
and not with bootstrap servers. 

Any idea what can cause this?

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-382: MirrorMaker 2.0

2018-12-26 Thread Ryanne Dolan
Becket, this is great feedback, thanks.

> having a reserved character for the topics is probably something worth
doing in general

Agreed, but we shouldn't make this a requirement for MM2, or else it
wouldn't work with older versions of Kafka, complicating adoption, testing
etc. In particular, we'll want to prove MM2 against production workloads
without first upgrading Kafka brokers.

I think we should 1) make the separator configurable in MM2, defaulting to
a period for now, 2) in a separate KIP, propose a special separator
character as you suggest, 3) maybe update the default at some point.

We can endeavor to do this all in the same release, which would have the
effect you want.

> It might be better to add a config like allowAclMismatch to let user
decide what should be the right behavior, i.e. either fail a mirror if ACL
mismatch, or mirror it with different ACLs.

What would it mean to "fail a mirror"? I think it would be strange if
replication suddenly stopped after someone changes an ACL somewhere.

I think for users that want ACLs to mismatch, they'd just disable
sync.topic.acls and manage ACLs themselves. I want MM2 to do the right
thing by default, but I don't think it should be responsible for enforcing
policies or protecting against changes beyond its control and purview.

> seems possible to achieve per message granularity, given that there is a
single writer to the remote topic [...] if the returned target offset is
different from the expectation [...], MM2 emit a new mapping message to the
offset mapping topic

This is a fantastic idea. With this approach, we can reduce the recovery
period to a very small number of offsets -- effectively the same as the
latest commit. This isn't exactly one-to-one offset translation, but as I
said we don't really need that -- we just want to get as close to the
latest consumer commit as possible without going past it, which your
approach does very well. Thanks for this improvement.

Ryanne


On Tue, Dec 25, 2018 at 1:38 AM Becket Qin  wrote:

> Hi Ryanne,
>
> Thanks for the reply. Please see a few more comments below.
>
> 1. Reserved character for topic names.
>
> > I think the limitations on topic names are imposed by Kafka itself
> (beyond
> > those from Zookeeper and the file system), so it might be possible to
> add a
> > reserved character for this purpose. This would essentially promote the
> > concept of "remote topics" from MirrorMaker to Kafka in general, which
> > might be nice. In particular, we could add native support for remote
> topics
> > to KafkaConsumer if we had a baked-in way to distinguish remote vs
> > non-remote topics. Definitely worth investigating for a future KIP :)
>
> I do not worry much about the consumer side because it is fairly simple to
> expand the subscription from a classic topic to a local + remote topic, as
> long as users know the rule. I am more concerned about having a way to
> ensure the users can safely use the remote topic without potential issues.
> There were some earlier discussion about the hierarchical topic names /
> topic namespaces. Those are probably too much in our context, but having a
> reserved character for the topics is probably something worth doing in
> general. It seems simple enough and will help address the potential
> confusion between local / remote topic names.
>
> 2. ACLs
> I think in some cases user may want to have different ACL in local and
> remote topics, but in some other cases, users may want to make sure they
> are the same to avoid unexpected behaviors. It might be better to add a
> config like allowAclMismatch to let user decide what should be the right
> behavior, i.e. either fail a mirror if ACL mismatch, or mirror it with
> different ACLs.
>
> 3. Offset mapping between source and destination Kafka cluster.
> I haven't thought about this thoroughly, but seems possible to achieve per
> message granularity, given that there is a single writer to the remote
> topic. What we can do the is the following:
> 1. For the first message MM2 mirrors, it will always emit a [source offset,
> target offset] mapping to the offset mapping topic. (.e.g. (99 -> 199)
> 2. After that, MM2 expect the offset in the destination partition to
> increment one by one, corresponding to each message mirrored from source.
> (100 -> 200, 101 -> 201, etc...)
> 3. At any given point, if the returned target offset is different from the
> expectation (very likely larger, otherwise there is message loss), MM2 emit
> a new mapping message to the offset mapping topic. ( supposedly, if 99 ->
> 199, then MM2 expect 199 -> 299, but if 199 -> 300, MM2 emits the pair (199
> -> 300))
>
> In this case, for any committed source offset, the target offset can be
> determined by doing the following:
> 1. Find the offset mapping entry which contains the source offset that is
> closest but no larger than the committed source offset. (e.g. committed
> offsets 150 will be mapped to the entry (99 -> 199))
> 2. Add a offsets differ

Re: [VOTE] KIP-349 Priorities for Source Topics

2018-12-26 Thread nick

Hi All,

Bumping this thread for more votes

  
https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics
 


Cheers,
--
  Nick





Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka Streams

2018-12-26 Thread Richard Yu
Hi all,

Just changing the title of the KIP. Discovered it wasn't right.
Thats about it. :)

On Mon, Dec 24, 2018 at 7:57 PM Richard Yu 
wrote:

> Sorry, just making a correction.
>
> Even if we are processing records out of order, we will still have to
> checkpoint offset ranges.
> So it doesn't really change anything even if we are doing in-order
> processing.
>
> Thinking this over, I'm leaning slightly towards maintaining the ordering
> guarantee.
> Although when implementing this change, there might be some kinks that we
> have not thought about which could throw a monkey wrench into the works.
>
> But definitely worth trying out,
> Richard
>
> On Mon, Dec 24, 2018 at 6:51 PM Richard Yu 
> wrote:
>
>> Hi Boyang,
>>
>> I could see where you are going with this. Well, I suppose I should have
>> added this to alternatives, but I might as well mention it now.
>>
>> It had crossed my mind that we consider returning in-order even if there
>> are multiple threads processing on the same thread. But for this to happen,
>> we must block for the offsets in-between which have not been processed yet.
>> For example, offsets 1-50 are being processed by thread1, while the offsets
>> 51 - 100 are being processed by thread2. We will have to wait for thread1
>> to finish processing its offsets first before we return the records
>> processed by thread2. So in other words, once thread1 is done, thread2's
>> work up to that point will be returned in one go, but not before that.
>>
>> I suppose this could work, but the client will have to wait some time
>> before the advantages of multithreaded processing can be seen (i.e. the
>> first thread has to finish processing its segment of the records first
>> before any others are returned to guarantee ordering). Another point I
>> would like to make is that the threads are *asynchronous. *So for us to
>> know when a thread is done processing a certain segment, we will probably
>> have a similar policy to how getMetadataAsync() works (i.e. have a parent
>> thread be notified of when the children threads are done).
>> [image: image.png]
>> Just pulling this from the KIP. But instead, we would apply this to
>> metadata segments instead of just a callback.
>> I don't know whether or not the tradeoffs are acceptable to the client.
>> Ordering could be guaranteed, but it would be hard to do. For example, if
>> there was a crash, we might lose track of which offsets numbers and ranges
>> we are processing for each child thread, so somehow we need to find a way
>> to checkpoint those as well (like committing them to a Kafka topic).
>>
>> Let me know your thoughts on this approach. It would work, but the
>> implementation details could be a mess.
>>
>> Cheers,
>> Richard
>>
>>
>>
>>
>>
>> On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen  wrote:
>>
>>> Hey Richard,
>>>
>>> thanks for the explanation! After some thinking, I do understand more
>>> about this KIP. The motivation was to increase the throughput and put heavy
>>> lifting RPC calls or IO operations to the background. While I feel the
>>> ordering is hard to guarantee for async task, it is better to be
>>> configurable for the end users.
>>>
>>> An example use case I could think of is: for every 500 records
>>> processed, we need an RPC to external storage that takes non-trivial time,
>>> and before its finishing all 499 records before it shouldn't be visible to
>>> the end user. In such case, we need to have fine-grained control on the
>>> visibility of downstream consumer so that our async task is planting a
>>> barrier while still make 499 records non-blocking process and send to
>>> downstream. So eventually when the heavy RPC is done, we commit this record
>>> to remove the barrier and make all 500 records available for downstream. So
>>> here we still need to guarantee the ordering within 500 records, while in
>>> the same time consumer semantic has nothing to change.
>>>
>>> Am I making the point clear here? Just want have more discussion on the
>>> ordering guarantee since I feel it wouldn't be a good idea to break
>>> consumer ordering guarantee by default.
>>>
>>> Best,
>>> Boyang
>>>
>>> 
>>> From: Richard Yu 
>>> Sent: Saturday, December 22, 2018 9:08 AM
>>> To: dev@kafka.apache.org
>>> Subject: Re: KIP-408: Add Asynchronous Processing to Kafka Streams
>>>
>>> Hi Boyang,
>>>
>>> Thanks for pointing out the possibility of skipping bad records (never
>>> crossed my mind). I suppose we could make it an option for the user if
>>> they
>>> could skip a bad record. It was never the intention of this KIP though on
>>> whether or not to do that. I could log a JIRA on such an issue, but I
>>> think
>>> this is out of the KIP's scope.
>>>
>>> As for the ordering guarantees, if you are using the standard Kafka
>>> design
>>> of one thread per task. Then everything will pretty much remain the same.
>>> However, if we are talking about using multiple threads per task (which
>>> is
>>> something that this

Re: KIP-408: Add Asynchronous Processing to Kafka Streams

2018-12-26 Thread Richard Yu
Hi all, just saying.
We are migrating to a different discussion thread. (Forgot that the
discussion thread's name was incorrect.)
Sorry for the confusion.

On Mon, Dec 24, 2018 at 7:57 PM Richard Yu 
wrote:

> Sorry, just making a correction.
>
> Even if we are processing records out of order, we will still have to
> checkpoint offset ranges.
> So it doesn't really change anything even if we are doing in-order
> processing.
>
> Thinking this over, I'm leaning slightly towards maintaining the ordering
> guarantee.
> Although when implementing this change, there might be some kinks that we
> have not thought about which could throw a monkey wrench into the works.
>
> But definitely worth trying out,
> Richard
>
> On Mon, Dec 24, 2018 at 6:51 PM Richard Yu 
> wrote:
>
>> Hi Boyang,
>>
>> I could see where you are going with this. Well, I suppose I should have
>> added this to alternatives, but I might as well mention it now.
>>
>> It had crossed my mind that we consider returning in-order even if there
>> are multiple threads processing on the same thread. But for this to happen,
>> we must block for the offsets in-between which have not been processed yet.
>> For example, offsets 1-50 are being processed by thread1, while the offsets
>> 51 - 100 are being processed by thread2. We will have to wait for thread1
>> to finish processing its offsets first before we return the records
>> processed by thread2. So in other words, once thread1 is done, thread2's
>> work up to that point will be returned in one go, but not before that.
>>
>> I suppose this could work, but the client will have to wait some time
>> before the advantages of multithreaded processing can be seen (i.e. the
>> first thread has to finish processing its segment of the records first
>> before any others are returned to guarantee ordering). Another point I
>> would like to make is that the threads are *asynchronous. *So for us to
>> know when a thread is done processing a certain segment, we will probably
>> have a similar policy to how getMetadataAsync() works (i.e. have a parent
>> thread be notified of when the children threads are done).
>> [image: image.png]
>> Just pulling this from the KIP. But instead, we would apply this to
>> metadata segments instead of just a callback.
>> I don't know whether or not the tradeoffs are acceptable to the client.
>> Ordering could be guaranteed, but it would be hard to do. For example, if
>> there was a crash, we might lose track of which offsets numbers and ranges
>> we are processing for each child thread, so somehow we need to find a way
>> to checkpoint those as well (like committing them to a Kafka topic).
>>
>> Let me know your thoughts on this approach. It would work, but the
>> implementation details could be a mess.
>>
>> Cheers,
>> Richard
>>
>>
>>
>>
>>
>> On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen  wrote:
>>
>>> Hey Richard,
>>>
>>> thanks for the explanation! After some thinking, I do understand more
>>> about this KIP. The motivation was to increase the throughput and put heavy
>>> lifting RPC calls or IO operations to the background. While I feel the
>>> ordering is hard to guarantee for async task, it is better to be
>>> configurable for the end users.
>>>
>>> An example use case I could think of is: for every 500 records
>>> processed, we need an RPC to external storage that takes non-trivial time,
>>> and before its finishing all 499 records before it shouldn't be visible to
>>> the end user. In such case, we need to have fine-grained control on the
>>> visibility of downstream consumer so that our async task is planting a
>>> barrier while still make 499 records non-blocking process and send to
>>> downstream. So eventually when the heavy RPC is done, we commit this record
>>> to remove the barrier and make all 500 records available for downstream. So
>>> here we still need to guarantee the ordering within 500 records, while in
>>> the same time consumer semantic has nothing to change.
>>>
>>> Am I making the point clear here? Just want have more discussion on the
>>> ordering guarantee since I feel it wouldn't be a good idea to break
>>> consumer ordering guarantee by default.
>>>
>>> Best,
>>> Boyang
>>>
>>> 
>>> From: Richard Yu 
>>> Sent: Saturday, December 22, 2018 9:08 AM
>>> To: dev@kafka.apache.org
>>> Subject: Re: KIP-408: Add Asynchronous Processing to Kafka Streams
>>>
>>> Hi Boyang,
>>>
>>> Thanks for pointing out the possibility of skipping bad records (never
>>> crossed my mind). I suppose we could make it an option for the user if
>>> they
>>> could skip a bad record. It was never the intention of this KIP though on
>>> whether or not to do that. I could log a JIRA on such an issue, but I
>>> think
>>> this is out of the KIP's scope.
>>>
>>> As for the ordering guarantees, if you are using the standard Kafka
>>> design
>>> of one thread per task. Then everything will pretty much remain the same.
>>> However, if we are talking about using