Some jira cleanup

2019-01-28 Thread Sönke Liebau
All,

I left a few comments on some old but still open jiras in an attempt to
clean up a little bit.

Since probably no one would notice these comments I thought I'd quickly
list them here to give people a chance to check on them:

KAFKA-217 : Client test
suite
KAFKA-517 : Ensure that we
escape the metric names if they include user strings
KAFKA-659 : Support
request pipelining in the network server
KAFKA-817 : Implement a
zookeeper path-based controlled shutdown tool
KAFKA-859 : support basic
auth protection of mx4j console
KAFKA-1015 :
documentation for inbuilt offset management
KAFKA-1021 : Write a tool
to check replica lag for individual topic partitions


I'll wait a few days for objections and then close these issues.

Also, as a heads up, I'll try and spent some time on this more or less
regularly in the future, is the approach of notifying everybody on the
mailing list fine for everybody?

Best regards,
Sönke


Re: [VOTE] KIP-183 - Change PreferredReplicaLeaderElectionCommand to use AdminClient

2019-01-28 Thread Tom Bentley
Hi Folks,

It took a while, but the work for KIP-183 has now been merged. My thanks to
everyone involved.

A few details changed between what was voted on and what ultimately got
merged. I've updated the KIP to reflect what was actually merged. If anyone
is interested in the gory details they can look at
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=73632065&selectedPageVersions=20&selectedPageVersions=18
and
https://github.com/apache/kafka/commit/269b65279c746bc54c611141a5a6509f9b310f11

Kind regards,

Tom

On Fri, 8 Sep 2017 at 16:30, Tom Bentley  wrote:

> Since no one has objected, I conclude that this KIP is again accepted.
>
> Thanks,
>
> Tom
>
> On 7 September 2017 at 22:31, Guozhang Wang  wrote:
>
>> Hi Tom,
>>
>> The updated part in "AdminClient:electPreferredLeaders()" looks reasonable
>> to me. If there is no objections from the voted committer by end of the
>> day, I think you can mark it as accepted.
>>
>>
>> Guozhang
>>
>>
>> On Wed, Sep 6, 2017 at 7:42 AM, Tom Bentley 
>> wrote:
>>
>> > Unfortunately I've had to make a small change to the
>> > ElectPreferredLeadersResult, because exposing a Map> > KafkaFuture> was incompatible with the case where
>> > electPreferredLeaders() was called with a null partitions argument. The
>> > change exposes methods to access the map which return futures, rather
>> than
>> > exposing the map (and crucially its keys) directly.
>> >
>> > This is described in more detail in the [DISCUSS] thread.
>> >
>> > Please take a look and recast your votes:
>> >
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-183+-+Change+
>> > PreferredReplicaLeaderElectionCommand+to+use+AdminClient#KIP-183-
>> > ChangePreferredReplicaLeaderElectionCommandtouseAdminClient-AdminClient:
>> > electPreferredLeaders()
>> >
>> > Thanks,
>> >
>> > Tom
>> >
>> > On 4 September 2017 at 10:52, Ismael Juma  wrote:
>> >
>> > > Hi Tom,
>> > >
>> > > You can update the KIP for minor things like that. Worth updating the
>> > > thread if it's something that is done during the PR review.
>> > >
>> > > With regards to exceptions, yes, that's definitely desired. I filed a
>> > JIRA
>> > > a while back for this:
>> > >
>> > > https://issues.apache.org/jira/browse/KAFKA-5445
>> > >
>> > > Ideally, new methods that we add would have this so that we don't
>> > increase
>> > > the tech debt that already exists.
>> > >
>> > > Ismael
>> > >
>> > > On Mon, Sep 4, 2017 at 10:11 AM, Tom Bentley 
>> > > wrote:
>> > >
>> > > > Hi Jun,
>> > > >
>> > > > You're correct about those other expected errors. If it's OK to
>> update
>> > > the
>> > > > KIP after the vote I'll add those.
>> > > >
>> > > > But this makes me wonder about the value of documenting expected
>> errors
>> > > in
>> > > > the Javadocs for the AdminClient (on the Results class, to be
>> > specific).
>> > > > Currently we don't do this, but it would be helpful for people using
>> > the
>> > > > AdminClient to know the kinds of errors they should expect, for
>> testing
>> > > > purposes for example. On the other hand it's a maintenance burden.
>> > Should
>> > > > we start documenting likely errors like this?
>> > > >
>> > > > Cheers,
>> > > >
>> > > > Tom
>> > > >
>> > > > On 4 September 2017 at 10:10, Tom Bentley 
>> > wrote:
>> > > >
>> > > > > I see three +1s, no +0s and no -1, so the vote passes.
>> > > > >
>> > > > > Thanks to those who voted and/or commented on the discussion
>> thread.
>> > > > >
>> > > > > On 1 September 2017 at 07:36, Gwen Shapira 
>> > wrote:
>> > > > >
>> > > > >> Thank you! +1 (binding).
>> > > > >>
>> > > > >> On Thu, Aug 31, 2017 at 9:48 AM Jun Rao 
>> wrote:
>> > > > >>
>> > > > >> > Hi, Tom,
>> > > > >> >
>> > > > >> > Thanks for the KIP. +1. Just one more minor comment. It seems
>> that
>> > > the
>> > > > >> > ElectPreferredLeadersResponse
>> > > > >> > should expect at least 3 other types of errors : (1) request
>> > timeout
>> > > > >> > exception, (2) leader rebalance in-progress exception, (3)
>> can't
>> > > move
>> > > > to
>> > > > >> > the preferred replica exception (i.e., preferred replica not in
>> > sync
>> > > > >> yet).
>> > > > >> >
>> > > > >> > Jun
>> > > > >> >
>> > > > >> > On Tue, Aug 29, 2017 at 8:56 AM, Tom Bentley <
>> > t.j.bent...@gmail.com
>> > > >
>> > > > >> > wrote:
>> > > > >> >
>> > > > >> > > Hi all,
>> > > > >> > >
>> > > > >> > > I would like to start the vote on KIP-183 which will provide
>> an
>> > > > >> > AdminClient
>> > > > >> > > interface for electing the preferred replica, and refactor
>> the
>> > > > >> > > kafka-preferred-replica-election.sh tool to use this
>> interface.
>> > > > More
>> > > > >> > > details here:
>> > > > >> > >
>> > > > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > 183+-+Change+
>> > > > >> > > PreferredReplicaLeaderElectionCommand+to+use+AdminClient
>> > > > >> > >
>> > > > >> > >
>> > > > >> > > Regards,
>> > > > >> > >
>> > > > >> > > Tom
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > > > >

[jira] [Created] (KAFKA-7877) Connect DLQ not used in SinkTask put()

2019-01-28 Thread Andrew Bourgeois (JIRA)
Andrew Bourgeois created KAFKA-7877:
---

 Summary: Connect DLQ not used in SinkTask put()
 Key: KAFKA-7877
 URL: https://issues.apache.org/jira/browse/KAFKA-7877
 Project: Kafka
  Issue Type: Bug
Reporter: Andrew Bourgeois






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

2019-01-28 Thread ChienHsing Wu
So... Does this non-response mean I should drop this topic after almost one 
month, folks?

-Original Message-
From: ChienHsing Wu  
Sent: Monday, January 21, 2019 12:47 PM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption 
Across Partitions in KafkaConsumer

Hi all, not sure what to do next as weeks have gone by, guys. --CH

-Original Message-
From: ChienHsing Wu 
Sent: Monday, January 14, 2019 9:13 AM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption 
Across Partitions in KafkaConsumer

Hi,

I know everyone is busy. But I would appreciate someone letting me know what to 
do next. I started this effort back in last year early November...

Thanks, CH

-Original Message-
From: ChienHsing Wu 
Sent: Monday, January 07, 2019 9:24 AM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption 
Across Partitions in KafkaConsumer

Hi guys,

I am not sure what to do next in this KIP process. Could anyone please 
help/advise me on what to do next? 

Thanks, CH

-Original Message-
From: ChienHsing Wu 
Sent: Wednesday, January 02, 2019 12:55 PM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption 
Across Partitions in KafkaConsumer

Hi Colin,

Setting max.partition.fetch.bytes was discussed in the ticket. It's not as 
desirable if the message size is highly variable. Also this decrease the 
efficiency of network communication. 

In the case you mentioned below where a consumer can get messages from A, B, C 
and D but the consumer currently only has messages from A, B and C, the 
proposed change will NOT wait until some messages from D arrives to start 
returning messages; it will just serve those from A, B and C. It will include 
those from D when they are available. That IS the current behavior. The 
proposed change does not impose a strict round robin pattern.

The original KIP 41 discussed "Ensuring Fair Consumption", that means it 
originally intended to take that into account in the Consumer code, the 
proposed change takes the current algorithm closer to that goal, IMHO. I could 
implement that logic at the caller side but, that would mean each library user 
need to know the inner working of the consumer code and to implement the logic 
on their own. Though as a first timer here, I do appreciate the complexity and 
functionalities in the client library and feel that we'd be better off as a 
community to implement the logic in the library so the complexity is hidden 
from library users.

Thanks, CH

-Original Message-
From: Colin McCabe 
Sent: Saturday, December 22, 2018 3:53 AM
To: dev@kafka.apache.org
Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption 
Across Partitions in KafkaConsumer

Hi ChienHsing Wu,

Maybe I'm misunderstanding something, but I'm not sure I see the need for a KIP 
here.  You can just set max.partition.fetch.bytes to a very small value.  That 
will cause Kafka to fetch only one message from each partition.  This will give 
you the round robin behavior you want.

Alternately, if you don't want to change max.partition.fetch.bytes, you could 
do your own buffering to get round robin behavior.  Keep a buffer of messages 
from partition A, B, C, and D and hold back the messages from A, B, and C until 
one from D arrives, so that the A B C D A B C D... etc. order always repeats.

best,
Colin


On Wed, Dec 19, 2018, at 09:00, ChienHsing Wu wrote:
> Looking back the email thread I think one of the comments from 
> Mayuresh was the question about needing KIP for this change or not as 
> the KafkaConsumer does not guarantee the end user any order, and so no 
> changes to the contracts to users.
> 
> I entered KIP based on suggestions from the attached email when going 
> through code contribution process. I am not sure what to do next in 
> this KIP process. Could anyone please help/advise me on what to do next?
> 
> Thanks!
> 
> CH
> 
> -Original Message-
> From: ChienHsing Wu 
> Sent: Wednesday, December 12, 2018 1:05 PM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Good to know that, Thanks! 
> 
> Nonetheless, that introduces additional complexity at the client side 
> for a common expectation to more or less receives records in a fair 
> fashion.
> 
> CH
> 
> -Original Message-
> From: Mayuresh Gharat 
> Sent: Wednesday, December 12, 2018 12:55 PM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi ChienHsing,
> 
> We are actually working on buffering the already fetched data for 
> paused topicPartitions, so ideally it should not have any effect on 
> performance.
> Associated jira : 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issue

Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

2019-01-28 Thread Konstantine Karantasis
Jason, Ewen, great discussion w.r.t. the proposed config. I agree that
making 'compatible' the new default is a nice suggestion and makes sense.
The cost in metadata is acceptable.

However, while I also like being as conservative as possible in suggesting
addition of new configs, I feel that this is an important enough item in
group membership protocols to justify a new config property. I'm inclined
to say it's justified here, even if some of the values become obsolete
sooner or later and even if most people will rely on the default
'compatible' option most of the time.

Allow me to list a few pros and cons regarding 'connect.protocol' that I
can think of:

Pros:
* Flexibility. While the majority of the users will be happy with
'compatible' as default, we'll be able to cover specific cases where users
definitely need to pin their clusters in one or the other option
explicitly. Allowing the users to be explicit here, might be preferable,
given a sensible default is in place for the rest.
* We are set up for more flexible evolution too. Indeed flatbuffers
proposition is included here in order to avoid proliferation of config
updates with every single improvement in the protocol. However, the overall
Incremental Cooperative Rebalancing proposition has already three policies,
and some other flavors might come up in the near future, depending how
conservative we are with V1.
* We rely on the robustness of the current implementation. Even though
releasing of resources can probably happen when handling the JoinResponse
message (at which point we know which protocol has been selected) instead
of the very beginning of the rebalance process, I believe it's valuable to
keep the old path as unchanged as possible, even at the expense of some
code expansion. This way, I believe, we move forward in a more confident
way.
* Config is general enough for the long term. What is deprecated is certain
values, besides the default. Maybe that's slightly better than deprecated
the config property as altogether.

Cons:
* Initially we'll have more code to maintain, because we'll preserve more
paths depending on the protocol used.
* Some config values will become obsolete eventually.
* Maintain an additional config option

Given the above, I'm leaning towards introducing the config
'connect.protocol' with 'compatible' as default. But I'll be also happy if
we conclude to skip addition in KIP-415 and reconsider in a later version.

Konstantine


On Fri, Jan 25, 2019 at 2:04 PM Jason Gustafson  wrote:

> Yeah, my expectation was that the use of flatbuffers in v1 was giving us
> another option to evolve the protocol, so I was hoping it would be some
> time before we needed v2. Maybe we could delay the introduction of
> `connect.protocol` until we're convinced it's necessary? The cost of
> supporting both v0 and v1 seems small in any case. Always easier to add
> configs than remove them ;)
>
> -Jason
>
> On Fri, Jan 25, 2019 at 11:04 AM Ewen Cheslack-Postava 
> wrote:
>
> > I was going to make a related comment about connect.protocol. Even if we
> > have the config, we should default it to compatible given v0 state is
> small
> > and we believe v1 is better and people should migrate to it.
> >
> > While I like getting rid of configs, not sure whether we should remove it
> > here. If we think the protocol might continue to evolve, just setting us
> up
> > with a method to do this cleanly without just defaulting to including all
> > versions would be better. That said, it took 3+ years to get to v1 and
> I'm
> > not sure we're aware of any blatant deficiencies in this version, so
> maybe
> > we won't ever get to v2 anyway...
> >
> > -Ewen
> >
> > On Fri, Jan 25, 2019 at 9:38 AM Jason Gustafson 
> > wrote:
> >
> > > Hey Konstantine,
> > >
> > > Thanks for the reply. Just one response below:
> > >
> > > In 'compatible' mode, the worker sends both protocols to the broker
> > > > coordinator during the Join request. The field is already a list of
> > > > ProtocolMetadata. The broker, after gathering all the requests
> follows
> > a
> > > > process of selecting the first choice that is common among all the
> > > workers.
> > >
> > >
> > > Discussed briefly offline. The point I was trying to make is that the
> > > consumer doesn't know when a rebalance begins whether the coordinator
> > will
> > > decide to use the "eager" or "cooperative" protocol. The question is
> how
> > > that affects cooperative semantics. In other words, is it safe to allow
> > > tasks to continue executing when a rebalance begins even if the
> > coordinator
> > > ultimate decides to use the "eager" protocol? If it is, then I think
> the
> > > value of the `connect.protocol` configuration is diminished. We can
> just
> > > implement the "compatible" behavior and we won't need the two rolling
> > > restart upgrade. The size of the join state in v0 is small, so I don't
> > see
> > > much downside to retaining compatibility and our users will thank us
> for
> > > it.
> > >
>

Kafka Version Detection for Rolling Upgrade

2019-01-28 Thread M. Manna
Hello,

I am struggling a little bit to do the rolling upgrade. My confusion is
mainly where the version numbers should be. I am verifying my steps using
main site guidelines. Any help would be appreciated

https://kafka.apache.org/documentation/#ecosystem

I am upgrading from *2.11-1.1.0* to *2.12-2.1.0. *I have 3 brokers.Could
someone confirm that my versions are correct.

Current version: 1.1
New version: 2.1 (obtained from kafka-version.sh --version).

Also, I don't need to override the log message format since it's 1.1.0 to
2.1.0, is it still correct?

Thanks,


[jira] [Created] (KAFKA-7878) Connect Task already exists in this worker when failed to create consumer

2019-01-28 Thread JIRA
Loïc Monney created KAFKA-7878:
--

 Summary: Connect Task already exists in this worker when failed to 
create consumer
 Key: KAFKA-7878
 URL: https://issues.apache.org/jira/browse/KAFKA-7878
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.1, 1.0.1
Reporter: Loïc Monney


*Assumption*
1. DNS is not available during a few minutes
2. Consumer group rebalances
3. Client is not able to resolve DNS entries anymore and fails
4. Task seems already registered, so at next rebalance the task will fail due 
to *Task already exists in this worker* and the only way to recover is to 
restart the connect process

*Real log entries*
* Distributed cluster running one connector on top of Kubernetes
* Connect 2.0.1
* kafka-connect-hdfs 5.0.1
{noformat}
[2019-01-28 13:31:25,914] WARN Removing server kafka.xxx.net:9093 from 
bootstrap.servers as DNS resolution failed for kafka.xxx.net 
(org.apache.kafka.clients.ClientUtils:56)
[2019-01-28 13:31:25,915] ERROR WorkerSinkTask\{id=xxx-22} Task failed 
initialization and will not be started. 
(org.apache.kafka.connect.runtime.WorkerSinkTask:142)
org.apache.kafka.connect.errors.ConnectException: Failed to create consumer
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:476)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.initialize(WorkerSinkTask.java:139)
 at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:452)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:873)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:111)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:888)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:884)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
consumer
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:615)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:596)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:474)
 ... 10 more
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable 
bootstrap urls given in bootstrap.servers
 at 
org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:66)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709)
 ... 13 more
[2019-01-28 13:31:25,925] INFO Finished starting connectors and tasks 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)
[2019-01-28 13:31:25,926] INFO Rebalance started 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1239)
[2019-01-28 13:31:25,927] INFO Stopping task xxx-22 
(org.apache.kafka.connect.runtime.Worker:555)
[2019-01-28 13:31:26,021] INFO Finished stopping tasks in preparation for 
rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1269)
[2019-01-28 13:31:26,021] INFO [Worker clientId=connect-1, groupId=xxx-cluster] 
(Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
[2019-01-28 13:31:30,746] INFO [Worker clientId=connect-1, groupId=xxx-cluster] 
Successfully joined group with generation 29 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:473)
[2019-01-28 13:31:30,746] INFO Joined group and got assignment: 
Assignment\{error=0, leader='connect-1-05961f03-52a7-4c02-acc2-0f1fb021692e', 
leaderUrl='http://192.168.46.59:8083/', offset=32, connectorIds=[], 
taskIds=[xxx-22]} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1217)
[2019-01-28 13:31:30,747] INFO Starting connectors and tasks using config 
offset 32 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:858)
[2019-01-28 13:31:30,747] INFO Starting task xxx-22 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:872)
[2019-01-28 13:31:30,747] INFO Creating task xxx-22 
(org.apache.kafka.connect.runtime.Worker:396)
[2019-01-28 13:31:30,748] ERROR Couldn't instantiate task xxx-22 because it has 
an invalid task configuration. This task will not execute until reconfigured. 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:890)
org.apache.kafka.connect.errors.ConnectException: Task already exists in this 
worker: xxx-22
 at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:399)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHe

Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

2019-01-28 Thread Konstantine Karantasis
Hi Ismael,
thanks for bringing up serialization in the discussion!

Indeed, JSON was considered given it's the prevalent text-based
serialization option.

In comparison to flatbuffers, most generic pros and cons are valid in this
context too. Higher perfomance during serde, small size, optional fields,
strongly typed and others.

Specifically, for Connect's use case, flatbuffers serialization, although
it introduces a single dependency, it appears more appealing for the
following reasons:

* The protocol is evolving from a binary format again to a binary one.
* Although new fields, nested or not, are expected to be introduced (as in
KIP-415) or old fields may get deprecated, the protocol schemas are
expected to be simple, mostly flat and manageable. We won't need to process
arbitrarily nested structures during runtime, for which JSON would be a
better fit. The current proposal aims to make the current append only
format a bit more flexible.
* It's good to keep performance tight because the loop that includes
subprotocol serde will need to accomodate resource release and assignment.
Also, rebalancing in it's incremental cooperative form which is expected to
be lighter has the potential to start happening more frequently. Parsing
JSON with Jackson has been a hotspot in certain occasions in the past if I
remember correctly.
* Evolution will be facilitated by handling or ignoring optional fields
easily. The protocol may evolve with fewer hard version bumps like the one
proposed here from V0 to V1.
* Optional fields are omitted, not just compressed.
* Unpacking of fields does not require deserialization of the whole
message, making transition between versions or flavors of the protocol easy
and performant.
* Flatbuffers' specification is simple and can be implemented, even in the
absence of appropriate clients.

I hope the above highlight why flatbuffers is a good candidate for this use
case and, thus, worth adding as a dependency.
Strictly speaking, yes, they introduce a new compile-time dependency. But
during runtime, such a dependency seems equivalent to introducing a JSON
parser (such as Jackson that is already being used in AK).

Your question is very valid. It's probably worth adding an item under
rejected alternatives, once we agree how we want to move forward.

Best,
Konstantine



On Fri, Jan 25, 2019 at 11:13 PM Ismael Juma  wrote:

> Thanks for the KIP Konstantine. Quick question: introducing a new
> serialization format (ie flatbuffers) has major implications. Have we
> considered json? If so, why did we reject it?
>
> Ismael
>
> On Fri, Jan 11, 2019, 3:44 PM Konstantine Karantasis <
> konstant...@confluent.io wrote:
>
> > Hi all,
> >
> > I just published KIP-415: Incremental Cooperative Rebalancing in Kafka
> > Connect
> > on the wiki here:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> >
> > This is the first KIP to suggest an implementation of incremental and
> > cooperative rebalancing in the context of Kafka Connect. It aims to
> provide
> > an adequate solution to the stop-the-world effect that occurs in a
> Connect
> > cluster whenever a new connector configuration is submitted or a Connect
> > Worker is added or removed from the cluster.
> >
> > Looking forward to your insightful feedback!
> >
> > Regards,
> > Konstantine
> >
>


kafka stream depends on it's own derived table

2019-01-28 Thread Nan Xu
hi,
I was writing a simple stream app, all it does is producer send a sequence
of path and value, for example
path /0 ,  value 1
path /0/1,  value 2
path /0/1/2, value 3
and kafka stream take those input and produce a ktable store.

There is a rule. if parent path is not exist, then child can not insert.
so if /0/1 is not there,  insert /0/1/2 should be filter out.

I use the following program to process it.  path send as /0, /0/1, /0/1/2,
., /0/1/../9.

Because the filter is depends on the ktable store, which was build after
the filter stream.  When filter check for a path if its parent exist, it
could be the parent path already pass the filter, but not at the store
yet,  but from filter, it think the parent is not exist. this is more of a
problem of asyn processing. because the parent is not fully done( to the
store), and next element start processing (filter)

Another problem is because parent key and child key are different, so the
path arrival seq could be different as producer send sequence, which also
cause the child get filter out.  producer send as /0, /0/1, /0/1/2.. but
broker get it as /0, /0/1/2, /0/1,.then all the following path will be
filter out, because /0/1/2 don't get a chance to get created.

any thoughts to solve this?

Thanks,
Nan


val streamProperties = new Properties()
  streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
"my-first-streams-application1")
  streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
  streamProperties.put(StreamsConfig.CLIENT_ID_CONFIG,
"important-test-client")
  streamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
classOf[StringSerde].getName)
  streamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
classOf[IntegerSerde].getName)

  val streamBuilder = new StreamsBuilder()
  val topic = "input"

  val inputStream = streamBuilder.stream[String, Integer](topic)

  val materialized = Materialized.as[String, Integer, KeyValueStore[Bytes,
Array[Byte]]](topic + "_store")
.withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())

  val reducer = new Reducer[Integer](){
override def apply(value1: Integer, value2: Integer): Integer = {
  value2
}
  }

  //value is not important, only care key.
  val ktable = inputStream.filter(filter).groupByKey().reduce(reducer,
materialized)

  // make sure parent exist.
  def filter(key: String, value: Integer): Boolean = {
println("===current store===, checking key " + key + " value: " + value
)
store.all().forEachRemaining(x => println(x.key))
val parent = key.trim().substring(0, key.lastIndexOf("/"))
if(parent == "") {
  true
} else {
  if (store.get(parent) == null) {
println("not found parent" + parent)
false
  } else {
true
  }
}
  }

  val topology = streamBuilder.build()
  val streams = new KafkaStreams(topology, streamProperties)
  streams.start()

  Thread.sleep(6000)
  val storeName = ktable.queryableStoreName()
  val store = streams.store(storeName,
QueryableStoreTypes.keyValueStore[String, Integer])


val senderProperties = new Properties
  senderProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
  senderProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer")
  senderProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName)
  senderProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
classOf[IntegerSerializer].getName)
  val producer = new KafkaProducer[String, Integer](senderProperties)


  for(j <- 1 until 10) {
val path = for(i <- 0 until j) yield {
  "/" + i
}
producer.send(new ProducerRecord(topic, path.mkString(""), j))
  }

  Thread.sleep(3000)
  println("show final store state")
  store.all().forEachRemaining(x => println(x.key))

Thread.sleep(1000)


output:
===current store===, checking key /0 value: 1
===current store===, checking key /0/1 value: 2
/0
===current store===, checking key /0/1/2/3 value: 4
/0/1
/0
not found parent/0/1/2
===current store===, checking key /0/1/2 value: 3
/0/1
/0
===current store===, checking key /0/1/2/3/4/5 value: 6
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4
===current store===, checking key /0/1/2/3/4 value: 5
/0/1
/0
/0/1/2
not found parent/0/1/2/3
===current store===, checking key /0/1/2/3/4/5/6 value: 7
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4/5
===current store===, checking key /0/1/2/3/4/5/6/7/8 value: 9
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4/5/6/7
===current store===, checking key /0/1/2/3/4/5/6/7 value: 8
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4/5/6
show final store state
/0/1
/0
/0/1/2


[VOTE] KIP-417: Allow JmxTool to connect to a secured RMI port

2019-01-28 Thread Fangbin Sun
Hi, All:
I would like to start a vote on KIP-417 which aims at supporting JmxTool to 
connect to a secured RMI port.


The KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-417%3A+Allow+JmxTool+to+connect+to+a+secured+RMI+port


Thanks!
Fangbin

Re: [VOTE] KIP-258: Allow to Store Record Timestamps in RocksDB

2019-01-28 Thread Matthias J. Sax
Hi,

during PR reviews, we discovered a couple of opportunities to simply and
improve the KIP and code. Thus, the following minor changes to the
public API are done (the KIP is already updated). I revote is not
necessary as the changes are minor.

 - interface `ValueAndTimestamp` is going to be a class

 - interface `RecordConverter` is renamed to `TimestampedBytesStore` and
we add a static method that converts values from old to new format

 - the three new interfaces `TimestampedXxxStore` don't add any new methods



Let us know if there are any objections. I can also provide more details
why those changes make sense.

Thanks a lot!


-Matthias


On 1/18/19 10:00 PM, Matthias J. Sax wrote:
> +1 from myself.
> 
> 
> I am also closing this vote. The KIP is accepted with
> 
> - 3 binding votes (Damian, Guozhang, Matthias)
> - 3 non-binding votes (Bill, Patrik, John)
> 
> 
> Thanks for the discussion and voting.
> 
> 
> -Matthias
> 
> 
> On 1/16/19 10:35 AM, John Roesler wrote:
>> +1 (nonbinding) from me.
>>
>> Thanks for the KIP, Matthias.
>>
>> -John
>>
>> On Wed, Jan 16, 2019 at 12:01 PM Guozhang Wang  wrote:
>>
>>> Thanks Matthias, I left some minor comments but since they do not involve
>>> in any major architectural changes and I did not feel strong about the
>>> naming etc as well. I'd +1 on the proposal as well.
>>>
>>> Feel free to reply / accept or reject my suggestions on the other DISCUSS
>>> thread.
>>>
>>>
>>> Guozhang
>>>
>>> On Wed, Jan 16, 2019 at 6:38 AM Damian Guy  wrote:
>>>
 +1

 On Wed, 16 Jan 2019 at 05:09, Patrik Kleindl  wrote:

> +1 (non-binding)
> Thanks too
> Best regards
> Patrik
>
>> Am 16.01.2019 um 03:30 schrieb Bill Bejeck :
>>
>> Thanks for the KIP Matthias.
>>
>> +1
>>
>> -Bill
>>
>> On Tue, Jan 15, 2019 at 7:33 PM Matthias J. Sax <
>>> matth...@confluent.io
>
>> wrote:
>>
>>> Hi,
>>>
>>> I would like to start the vote for KIP-258:
>>>
>>>
>>>
>

>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>>>
>>> The KIP adds new stores that allow to store record timestamps next
>>> to
>>> key and value. Additionally, we will allow to upgrade exiting stores
 to
>>> the new stores; this will allow us to use the new stores in the DSL
 with
>>> a smooth upgrade path.
>>>
>>>
>>> -Matthias
>>>
>>>
>

>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Resolved] (KAFKA-7876) Broker suddenly got disconnected

2019-01-28 Thread Anthony Lazam (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anthony Lazam resolved KAFKA-7876.
--
Resolution: Duplicate

> Broker suddenly got disconnected 
> -
>
> Key: KAFKA-7876
> URL: https://issues.apache.org/jira/browse/KAFKA-7876
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, network
>Affects Versions: 2.1.0
>Reporter: Anthony Lazam
>Priority: Critical
> Fix For: 2.2.0, 2.1.1
>
> Attachments: kafka-issue.png
>
>
>  
> We have 3 node cluster setup. There are scenarios that one of the broker 
> suddenly got disconnected from the cluster but no underlying system issue is 
> found. The node that got dc'ed wasn't able to release the partition it holds 
> as the leader, hence clients (spring-boot) was unable to send/receive data 
> from the issued broker.
> We noticed that it always happen to the active controller count.
> Environment details:
> Provider: AWS
>  Kernel: 3.10.0-693.21.1.el7.x86_64
>  OS: CentOS Linux release 7.5.1804 (Core)
>  Scala version: 2.11
>  Kafka version: 2.1.0
>  Kafka config:
> {code:java}
> # Socket Server Settings 
> #
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> # Log Basics #
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> # Internal Topic Settings  
> #
> offsets.topic.replication.factor=3
> transaction.state.log.replication.factor=3
> transaction.state.log.min.isr=2
> # Log Retention Policy 
> #
> log.retention.hours=168
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=30
> # Group Coordinator Settings 
> #
> group.initial.rebalance.delay.ms=0
> # Zookeeper #
> zookeeper.connection.timeout.ms=6000
> broker.id=1
> zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
> log.dirs=/data/kafka-node
> advertised.listeners=PLAINTEXT://node1:9092
> {code}
> Broker disconnected controller log:
> {code:java}
> [2019-01-26 05:03:52,512] TRACE [Controller id=2] Checking need to trigger 
> auto leader balancing (kafka.controller.KafkaController)
> [2019-01-26 05:03:52,513] DEBUG [Controller id=2] Preferred replicas by 
> broker Map(TOPICS->MAP) (kafka.controller.KafkaController)
> [2019-01-26 05:03:52,513] DEBUG [Controller id=2] Topics not in preferred 
> replica for broker 2 Map() (kafka.controller.KafkaController)
> [2019-01-26 05:03:52,513] TRACE [Controller id=2] Leader imbalance ratio for 
> broker 2 is 0.0 (kafka.controller.KafkaController)
> [2019-01-26 05:03:52,513] DEBUG [Controller id=2] Topics not in preferred 
> replica for broker 1 Map() (kafka.controller.KafkaController)
> [2019-01-26 05:03:52,513] TRACE [Controller id=2] Leader imbalance ratio for 
> broker 1 is 0.0 (kafka.controller.KafkaController)
> [2019-01-26 05:03:52,513] DEBUG [Controller id=2] Topics not in preferred 
> replica for broker 3 Map() (kafka.controller.KafkaController)
> [2019-01-26 05:03:52,513] TRACE [Controller id=2] Leader imbalance ratio for 
> broker 3 is 0.0 (kafka.controller.KafkaController)
> [2019-01-26 05:08:52,513] TRACE [Controller id=2] Checking need to trigger 
> auto leader balancing (kafka.controller.KafkaController)
> {code}
> Broker working server.log:
> {code:java}
> [2019-01-26 05:02:05,564] INFO [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Error sending fetch request (sessionId=1637095899, 
> epoch=21379644) to node 2: java.io.IOException: Connection to 2 was 
> disconnected before the response was read. 
> (org.apache.kafka.clients.FetchSessionHandler)
> [2019-01-26 05:02:05,573] WARN [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=3, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={PlayerGameRounds-8=(offset=2171960, logStartOffset=1483356, 
> maxBytes=1048576, currentLeaderEpoch=Optional[2])}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessio
> nId=1637095899, epoch=21379644)) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
> at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
> at 
> kafka.server.AbstractFetcherThread.kaf

[jira] [Created] (KAFKA-7879) Data directory size decreases every few minutes when producer is sending large amount of data

2019-01-28 Thread Pradeep Bansal (JIRA)
Pradeep Bansal created KAFKA-7879:
-

 Summary: Data directory size decreases every few minutes when 
producer is sending large amount of data
 Key: KAFKA-7879
 URL: https://issues.apache.org/jira/browse/KAFKA-7879
 Project: Kafka
  Issue Type: Improvement
Reporter: Pradeep Bansal


I am running kafka broker with 6 nodes and have set reteion hours to 24 hours 
and retention bytes to 5 GB.

 

I have set retention bytes to 250GB on  topic configuration.

 

Now when producing message in async mode with 1000 bytes message with very high 
frequency. I am seeing that kafka data directory size increases but every 5 
minutes it decreases by some percentage (in my observation it increases by 40G 
and then reduces to 20G, so in every 5 minutes we are seeing increase by 20G 
instead of 40G).

 

Is there any extra configuration we need to set to avoid this data loss or is 
there some sort of compression that is going on here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7880) KafkaConnect should standardize worker thread name

2019-01-28 Thread YeLiang (JIRA)
YeLiang created KAFKA-7880:
--

 Summary: KafkaConnect should standardize worker thread name
 Key: KAFKA-7880
 URL: https://issues.apache.org/jira/browse/KAFKA-7880
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.1.0
Reporter: YeLiang
 Fix For: 2.1.1


KafkaConnect will create a WorkerTask for tasks assigned to it and then submit 
tasks to a thread pool.

However,the 
[Worker|https://github.com/apache/kafka/blob/2.1.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java]
 class initializes its thread pool using a default ThreadFactory.So the thread 
name will have a pattern pool-[0-9]-thread-[0-9].

When we are running KafkaConnect and find that one of the task thread is under 
high CPU usage, it is difficult for us to find out which task is under high 
load becasue when we print out 
the stack of KafkaConnect, we can only see a list of threads name 
pool-[0-9]-thread-[0-9] even if we can know the exact pid of the high CPU usage 
thread

If worker threads name will be named like connectorName-taskId, it will be very 
helpful
{code:java}


{code}
 
[HTTP|http://dict.youdao.com/search?q=HTTP&keyfrom=chrome.extension]  
[,eitʃti:ti:'pi:]  
[详细|http://dict.youdao.com/search?q=HTTP&keyfrom=chrome.extension]X
基本翻译
abbr. 超文本传输协议(Hyper Text Transport Protocol)
网络释义
[HTTP:|http://dict.youdao.com/search?q=HTTP&keyfrom=chrome.extension&le=eng] 
超文本传输协议(Hyper Text Transfer Protocol)
[HTTP 
referer:|http://dict.youdao.com/search?q=HTTP%20referer&keyfrom=chrome.extension&le=eng]
 HTTP参照位址
[http 
Proxy:|http://dict.youdao.com/search?q=http%20Proxy&keyfrom=chrome.extension&le=eng]
 代理服务器
[有道词典|http://fanyi.youdao.com/translate?i=and%20print%20out%20the%20stack%20of%20KafkaConnect&keyfrom=chrome]
and print out t 
...[详细|http://fanyi.youdao.com/translate?i=and%20print%20out%20the%20stack%20of%20KafkaConnect&smartresult=dict&keyfrom=chrome.extension]X
并打印KafkaConnect的堆栈



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)