Re: [DISCUSS] KIP-208: Add SSL support to Kafka Connect REST interface

2018-01-10 Thread Jakub Scholz
I'm sorry guys, I'm a bit busy with something else this week. But I will
get back to his till the end of the week.

Thanks & Regards
Jakub

On Tue, Jan 9, 2018 at 1:19 AM, Ewen Cheslack-Postava 
wrote:

> On Mon, Jan 8, 2018 at 11:39 AM, Randall Hauch  wrote:
>
> > Nice feedback, Ewen. Thanks!
> >
> > On Thu, Jan 4, 2018 at 5:11 PM, Ewen Cheslack-Postava  >
> > wrote:
> >
> > > Hey Jakub,
> > >
> > > Sorry for not getting to this sooner. Overall the proposal looks good
> to
> > > me, I just had a couple of questions.
> > >
> > > 1. For the configs/overrides, does this happen on a per-setting basis
> or
> > if
> > > one override is included do we not use any of the original settings? I
> > > suspect that if you need to override one setting, it probably means
> > you're
> > > using an entirely different config and so the latter behavior seems
> > better
> > > to me. We've talked a bit about doing something similar for the
> > > producer/consumer security settings as well so you don't have to
> specify
> > > security configs in 3 places in your worker config.
> > >
> >
> > Not sure if you were referring to
> > https://issues.apache.org/jira/browse/KAFKA-6387, but I just withdrew
> that
> > proposal (and the corresponding KIP-246) because behavior with existing
> > configurations was not backward compatible, so existing configs might
> have
> > very different behavior after the "inheritance" was implemented.
> >
> > But regardless, I do think that in this case if you have to override one
> of
> > the settings you probably need to override multiple. So I'd be in favor
> of
> > requiring all configs to be specified in the overridden `listeners.*`
> > properties.
> >
>
> Yeah, a related case i was thinking of is how key.converter and
> value.converter overrides work in Connectors. It's not exactly the same,
> but in that case, if you include the key.converter setting in the connector
> config, then nothing with key.converter prefix from the worker is passed
> along. Just might be worth clarifying the all-or-nothing behavior. Also how
> we apply it in this case (e.g. is there one key setting we can use that, if
> it appears, then we do not inherit any security configs from the worker?)
>
>
> >
> >
> > >
> > > 2. For using default values from the worker config, I am wondering how
> > > convinced we are that it will be common for them to be the same? I
> really
> > > don't have enough experience w/ these setups to know, so just a
> question
> > > here. I think the other thing to take into account here is that even
> > though
> > > we're not dealing with authorization in this KIP, we will eventually
> want
> > > it for these APIs. Would we expect to be using the same principal for
> > Kafka
> > > and the Connect REST API? In a case where a company has a Connect
> cluster
> > > that, e.g., an ops team manages and they are the only ones that are
> > > supposed to make changes, that would make sense to me. But for a setup
> > > where some dev team is allowed to use the REST API to create new
> > connectors
> > > but the cluster is managed by an ops team, I would think the Kafka
> > > credentials would be different. I'm not sure how frequent each case
> would
> > > be, so I'm a bit unsure about the default of using the worker security
> > > configs by default. Thoughts?
> > >
> > > 3. We should probably specify the default in the table for
> > > rest.advertised.security.protocol because in ConfigDef if you don't
> > > specify
> > > a default value it becomes a required config. The HTTP default will
> > > probably need to be in there anyway.
> > >
> > > 4. Do we want to list the existing settings as deprecated and just move
> > to
> > > using listeners for consistency? We don't need to remove them anytime
> > soon,
> > > but given that the broker is doing the same, maybe we should just do
> that
> > > in this KIP?
> > >
> >
> > Marking them as deprecated in this KIP sounds good to me.
> >
> > >
> > > I think these are mostly small details, overall it looks like a good
> > plan!
> > >
> >
> > +1
> >
> > Randall
> >
> >
> > >
> > > Thanks,
> > > Ewen
> > >
> > > On Tue, Oct 24, 2017 at 5:19 AM, Jakub Scholz  wrote:
> > >
> > > > There has been no discussion since my last update week ago. Unless
> > > someone
> > > > has some further comments in the next 48 hours, I will start the
> voting
> > > for
> > > > this KIP.
> > > >
> > > > Thanks & Regards
> > > > Jakub
> > > >
> > > > On Tue, Oct 17, 2017 at 5:54 PM, Jakub Scholz 
> wrote:
> > > >
> > > > > Ok, so I updated the KIP according to what we discussed. Please
> have
> > a
> > > > > look at the updates. Two points I'm not 100% sure about:
> > > > >
> > > > > 1) Should we mark the rest.host.name and rest.port options as
> > > > deprecated?
> > > > >
> > > > > 2) I needed to also address the advertised hostname / port. With
> > > multiple
> > > > > listeners it is not clear anymore which one should be used. I saw
> as
> > > one
> > > > > option to add advertised.listeners 

[jira] [Resolved] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods

2018-01-10 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-6412.
---
   Resolution: Fixed
Fix Version/s: 1.1.0

Issue resolved by pull request 4372
[https://github.com/apache/kafka/pull/4372]

> Improve synchronization in CachingKeyValueStore methods
> ---
>
> Key: KAFKA-6412
> URL: https://issues.apache.org/jira/browse/KAFKA-6412
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
> Fix For: 1.1.0
>
> Attachments: 6412-jmh.v1.txt, k-6412.v1.txt
>
>
> Currently CachingKeyValueStore methods are synchronized at method level.
> It seems we can use read lock for getter and write lock for put / delete 
> methods.
> For getInternal(), if the underlying thread is streamThread, the 
> getInternal() may trigger eviction. This can be handled by obtaining write 
> lock at the beginning of the method for streamThread.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-1229) Reload broker config without a restart

2018-01-10 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-1229.
--
Resolution: Duplicate

Resolving as duplicate of KIP-226/KAFKA-6240. Pls reopen of any concern.

> Reload broker config without a restart
> --
>
> Key: KAFKA-1229
> URL: https://issues.apache.org/jira/browse/KAFKA-1229
> Project: Kafka
>  Issue Type: Wish
>  Components: config
>Affects Versions: 0.8.0
>Reporter: Carlo Cabanilla
>Priority: Minor
>
> In order to minimize client disruption, ideally you'd be able to reload 
> broker config without having to restart the server. On *nix system the 
> convention is to have the process reread its configuration if it receives a 
> SIGHUP signal.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP 226 - Dynamic Broker Configuration

2018-01-10 Thread Rajini Sivaram
Hi Ismael,

I have updated the KIP to use AES-256 if available and AES-128 otherwise
for password encryption. Looking at GCM, it looks like GCM is typically
used with a variable initialization vector, while we are using a random,
but constant IV per-password. Also, AES/GCM is not supported by Java7.
Since the authentication and performance benefits of GCM are not required
for this scenario, I am thinking I will leave the default as CBC, but make
sure we test GCM as well so that users have the choice.

On Wed, Jan 10, 2018 at 1:01 AM, Colin McCabe  wrote:

> Thanks, Rajini.  That makes sense.
>
> regards,
> Colin
>
> On Tue, Jan 9, 2018, at 14:38, Rajini Sivaram wrote:
> > Hi Colin,
> >
> > Thank you for reviewing.
> >
> > Yes, validation is done on the broker, not the client.
> >
> > All configs from ZooKeeper are processed and any config that could not be
> > applied are logged as warnings. This includes any configs that are not
> > dynamic in the broker version or any configs that are not supported in
> the
> > broker version. If you downgrade to a version that is older than this KIP
> > (1.0 for example), then you don't get any warnings however.
> >
> >
> > On Tue, Jan 9, 2018 at 9:38 PM, Colin McCabe  wrote:
> >
> > > On Mon, Dec 18, 2017, at 13:40, Jason Gustafson wrote:
> > > > Hi Rajini,
> > > >
> > > > Looking good. Just a few questions.
> > > >
> > > > 1. (Related to Jay's comment) Is the validate() method on
> Reconfigurable
> > > > necessary? I would have thought we'd validate using the ConfigDef.
> Do you
> > > > have a use case in mind in which the reconfigurable component only
> > > permits
> > > > certain reconfigurations?
> > >
> > > Hi,
> > >
> > > Sorry if this is a dumb question, but when we talk about validating on
> the
> > > ConfigDef, we're talking about validating on the server side, right?
> The
> > > software on the client side might be older or newer than the software
> on
> > > the broker side, so it seems inadvisable to do the validation there.
> > >
> > > Also, after a software downgrade, when the broker is restarted, it
> might
> > > find that there is a configuration key that is stored in ZK that is not
> > > dynamic in its (older) software version.  It seems like, with the
> current
> > > proposal, the broker will use the value found in the local
> configuration
> > > (config file) rather than the new ZK version.  Should the broker print
> out
> > > a WARN message in that scenario?
> > >
> > > best,
> > > Colin
> > >
> > > > 2. Should Reconfigurable extend Configurable or is the initial
> > > > configuration also done through reconfigure()? I ask because not all
> > > > plugins interfaces currently extend Configurable (e.g.
> > > > KafkaPrincipalBuilder).
> > > > 3. You mentioned a couple changes to DescribeConfigsOptions and
> > > > DescribeConfigsResult. Perhaps we should list the changes
> explicitly? One
> > > > not totally obvious case is what the synonyms() getter would return
> if
> > > the
> > > > option is not specified (i.e. should it raise an exception or return
> an
> > > > empty list?).
> > > > 4. Config entries in the DescribeConfigs response have an is_default
> > > flag.
> > > > Could that be replaced with the more general config_source?
> > > > 5. Bit of an internal question, but how do you handle config
> > > dependencies?
> > > > For example, suppose I want to add a listener and configure its
> principal
> > > > builder at once. You'd have to validate the principal builder config
> in
> > > the
> > > > context of the listener config, so I guess the order of the entries
> in
> > > > AlterConfigs is significant?
> > > > 6. KIP-48 (delegation tokens) gives us a master secret which is
> shared by
> > > > all brokers. Do you think we would make this dynamically
> configurable?
> > > > Alternatively, it might be possible to use it to encrypt the other
> > > > passwords we store in zookeeper.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > >
> > > >
> > > > On Mon, Dec 18, 2017 at 10:16 AM, Rajini Sivaram <
> > > rajinisiva...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Jay,
> > > > >
> > > > > Thank you for reviewing the KIP.
> > > > >
> > > > > 1) Yes, makes sense. I will update the PR. There are some config
> > > updates
> > > > > that may be allowed depending on the context (e.g. some security
> > > configs
> > > > > can be updated for new listeners, but not existing listeners).
> Perhaps
> > > it
> > > > > is ok to mark them dynamic in the documentation. AdminClient would
> give
> > > > > appropriate error messages if the update is not allowed.
> > > > > 2) Internally, in the implementation, a mixture of direct config
> > > updates
> > > > > (e.g log config as you have pointed out) and reconfigure method
> > > invocations
> > > > > (e.g. SslFactory) are used. For configurable plugins (e.g. metrics
> > > > > reporter), we require the Reconfigurable interface to ensure that
> we
> > > can
> > > > > validate any custom configs and avoid reconfiguration 

Jenkins build is back to normal : kafka-trunk-jdk7 #3087

2018-01-10 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP 226 - Dynamic Broker Configuration

2018-01-10 Thread Ismael Juma
Thanks Rajini. Sounds good.

Ismael

On Wed, Jan 10, 2018 at 11:41 AM, Rajini Sivaram 
wrote:

> Hi Ismael,
>
> I have updated the KIP to use AES-256 if available and AES-128 otherwise
> for password encryption. Looking at GCM, it looks like GCM is typically
> used with a variable initialization vector, while we are using a random,
> but constant IV per-password. Also, AES/GCM is not supported by Java7.
> Since the authentication and performance benefits of GCM are not required
> for this scenario, I am thinking I will leave the default as CBC, but make
> sure we test GCM as well so that users have the choice.
>
> On Wed, Jan 10, 2018 at 1:01 AM, Colin McCabe  wrote:
>
> > Thanks, Rajini.  That makes sense.
> >
> > regards,
> > Colin
> >
> > On Tue, Jan 9, 2018, at 14:38, Rajini Sivaram wrote:
> > > Hi Colin,
> > >
> > > Thank you for reviewing.
> > >
> > > Yes, validation is done on the broker, not the client.
> > >
> > > All configs from ZooKeeper are processed and any config that could not
> be
> > > applied are logged as warnings. This includes any configs that are not
> > > dynamic in the broker version or any configs that are not supported in
> > the
> > > broker version. If you downgrade to a version that is older than this
> KIP
> > > (1.0 for example), then you don't get any warnings however.
> > >
> > >
> > > On Tue, Jan 9, 2018 at 9:38 PM, Colin McCabe 
> wrote:
> > >
> > > > On Mon, Dec 18, 2017, at 13:40, Jason Gustafson wrote:
> > > > > Hi Rajini,
> > > > >
> > > > > Looking good. Just a few questions.
> > > > >
> > > > > 1. (Related to Jay's comment) Is the validate() method on
> > Reconfigurable
> > > > > necessary? I would have thought we'd validate using the ConfigDef.
> > Do you
> > > > > have a use case in mind in which the reconfigurable component only
> > > > permits
> > > > > certain reconfigurations?
> > > >
> > > > Hi,
> > > >
> > > > Sorry if this is a dumb question, but when we talk about validating
> on
> > the
> > > > ConfigDef, we're talking about validating on the server side, right?
> > The
> > > > software on the client side might be older or newer than the software
> > on
> > > > the broker side, so it seems inadvisable to do the validation there.
> > > >
> > > > Also, after a software downgrade, when the broker is restarted, it
> > might
> > > > find that there is a configuration key that is stored in ZK that is
> not
> > > > dynamic in its (older) software version.  It seems like, with the
> > current
> > > > proposal, the broker will use the value found in the local
> > configuration
> > > > (config file) rather than the new ZK version.  Should the broker
> print
> > out
> > > > a WARN message in that scenario?
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > > 2. Should Reconfigurable extend Configurable or is the initial
> > > > > configuration also done through reconfigure()? I ask because not
> all
> > > > > plugins interfaces currently extend Configurable (e.g.
> > > > > KafkaPrincipalBuilder).
> > > > > 3. You mentioned a couple changes to DescribeConfigsOptions and
> > > > > DescribeConfigsResult. Perhaps we should list the changes
> > explicitly? One
> > > > > not totally obvious case is what the synonyms() getter would return
> > if
> > > > the
> > > > > option is not specified (i.e. should it raise an exception or
> return
> > an
> > > > > empty list?).
> > > > > 4. Config entries in the DescribeConfigs response have an
> is_default
> > > > flag.
> > > > > Could that be replaced with the more general config_source?
> > > > > 5. Bit of an internal question, but how do you handle config
> > > > dependencies?
> > > > > For example, suppose I want to add a listener and configure its
> > principal
> > > > > builder at once. You'd have to validate the principal builder
> config
> > in
> > > > the
> > > > > context of the listener config, so I guess the order of the entries
> > in
> > > > > AlterConfigs is significant?
> > > > > 6. KIP-48 (delegation tokens) gives us a master secret which is
> > shared by
> > > > > all brokers. Do you think we would make this dynamically
> > configurable?
> > > > > Alternatively, it might be possible to use it to encrypt the other
> > > > > passwords we store in zookeeper.
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Dec 18, 2017 at 10:16 AM, Rajini Sivaram <
> > > > rajinisiva...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Jay,
> > > > > >
> > > > > > Thank you for reviewing the KIP.
> > > > > >
> > > > > > 1) Yes, makes sense. I will update the PR. There are some config
> > > > updates
> > > > > > that may be allowed depending on the context (e.g. some security
> > > > configs
> > > > > > can be updated for new listeners, but not existing listeners).
> > Perhaps
> > > > it
> > > > > > is ok to mark them dynamic in the documentation. AdminClient
> would
> > give
> > > > > > appropriate error messages if the update is not allowed.
> > > > > > 2) Internally, in the implementat

[jira] [Resolved] (KAFKA-2025) In multi-consumer setup - explicit commit, commits on all partitions

2018-01-10 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2025.
--
Resolution: Auto Closed

Closing inactive issue. The old consumer is no longer supported, please upgrade 
to the Java consumer whenever possible.

> In multi-consumer setup - explicit commit, commits on all partitions
> 
>
> Key: KAFKA-2025
> URL: https://issues.apache.org/jira/browse/KAFKA-2025
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.0
> Environment: 1. Tested in Windows
> 2. Not tested on Linux
>Reporter: Pradeep G
>Assignee: Neha Narkhede
>Priority: Critical
>
> In a setup where there are two consumers C1 & C2 belonging to consumer group 
> CG, two partitions P1 & P2; with auto-commit disabled.
> An explicit commit on ConsumerConnect commits on all the consumers i.e. a 
> commit called by C1 commits all messages being processed by other consumers 
> too here C2. 
> Ideally C1 should be able to commit only those messages it has consumed and 
> not what is being processed by C2.  The effect of this behavior is that; 
> suppose C2 crashes while processing message M after C1 commits, is that 
> message M being processed by C2 is not available on recovery and is lost 
> forever; and in kafka M is marked as consumed.
> I read that this would be addressed in the rewrite - 
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI
> Any thoughts on which release this would be addressed ?.  A quick response 
> would be greatly appreciated.
> Thanks,
> Pradeep



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-2329) Consumers balance fails when multiple consumers are started simultaneously.

2018-01-10 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2329.
--
Resolution: Auto Closed

Closing inactive issue. The old consumer is no longer supported, please upgrade 
to the Java consumer whenever possible.

> Consumers balance fails when multiple consumers are started simultaneously.
> ---
>
> Key: KAFKA-2329
> URL: https://issues.apache.org/jira/browse/KAFKA-2329
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1, 0.8.2.1
>Reporter: Ze'ev Eli Klapow
>Assignee: Ze'ev Eli Klapow
>  Labels: consumer, patch
> Attachments: zookeeper-consumer-connector-epoch-node.patch
>
>
> During consumer startup a race condition can occur if multiple consumers are 
> started (nearly) simultaneously. 
> If a second consumer is started while the first consumer is in the middle of 
> {{zkClient.subscribeChildChanges}} the first consumer will never see the 
> registration of the second consumer, because the consumer registration node 
> for the second consumer will be unwatched, and no new child will be 
> registered later. This causes the first consumer to own all partitions, and 
> then never release ownership causing the second consumer to fail rebalancing.
> The attached patch solves this by using an "epoch" node which all consumers 
> watch and update to trigger  a rebalance. When a rebalance is triggered we 
> check the consumer registrations against a cached state, to avoid unnecessary 
> rebalances. For safety, we also periodically check the consumer registrations 
> and rebalance. We have been using this patch in production at HubSpot for a 
> while and it has eliminated all rebalance issues.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-2331) Kafka does not spread partitions in a topic among all consumers evenly

2018-01-10 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2331.
--
Resolution: Auto Closed

Closing inactive issue. The old consumer is no longer supported, please upgrade 
to the Java consumer whenever possible.

> Kafka does not spread partitions in a topic among all consumers evenly
> --
>
> Key: KAFKA-2331
> URL: https://issues.apache.org/jira/browse/KAFKA-2331
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.8.1.1
>Reporter: Stefan Miklosovic
>
> I want to have 1 topic with 10 partitions. I am using default configuration 
> of Kafka. I create 1 topic with 10 partitions by that helper script and now I 
> am about to produce messages to it.
> The thing is that even all partitions are indeed consumed, some consumers 
> have more then 1 partition assigned even I have number of consumer threads 
> equal to partitions in a topic hence some threads are idle.
> Let's describe it in more detail.
> I know that common stuff that you need one consumer thread per partition. I 
> want to be able to commit offsets per partition and this is possible only 
> when I have 1 thread per consumer connector per partition (I am using high 
> level consumer).
> So I create 10 threads, in each thread I am calling 
> Consumer.createJavaConsumerConnector() where I am doing this
> topicCountMap.put("mytopic", 1);
> and in the end I have 1 iterator which consumes messages from 1 partition.
> When I do this 10 times, I have 10 consumers, consumer per thread per 
> partition where I can commit offsets independently per partition because if I 
> put different number from 1 in topic map, I would end up with more then 1 
> consumer thread for that topic for given consumer instance so if I am about 
> to commit offsets with created consumer instance, it would commit them for 
> all threads which is not desired.
> But the thing is that when I use consumers, only 7 consumers are involved and 
> it seems that other consumer threads are idle but I do not know why.
> The thing is that I am creating these consumer threads in a loop. So I start 
> first thread (submit to executor service), then another, then another and so 
> on.
> So the scenario is that first consumer gets all 10 partitions, then 2nd 
> connects so it is splits between these two to 5 and 5 (or something similar), 
> then other threads are connecting.
> I understand this as a partition rebalancing among all consumers so it 
> behaves well in such sense that if more consumers are being created, 
> partition rebalancing occurs between these consumers so every consumer should 
> have some partitions to operate upon.
> But from the results I see that there is only 7 consumers and according to 
> consumed messages it seems they are split like 3,2,1,1,1,1,1 partition-wise. 
> Yes, these 7 consumers covered all 10 partitions, but why consumers with more 
> then 1 partition do no split and give partitions to remaining 3 consumers?
> I am pretty much wondering what is happening with remaining 3 threads and why 
> they do not "grab" partitions from consumers which have more then 1 partition 
> assigned.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-01-10 Thread Chris Schwarzfischer (JIRA)
Chris Schwarzfischer created KAFKA-6437:
---

 Summary: Streams does not warn about missing input topics, but 
hangs
 Key: KAFKA-6437
 URL: https://issues.apache.org/jira/browse/KAFKA-6437
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
 Environment: Single client on single node broker
Reporter: Chris Schwarzfischer
Priority: Minor


*Case*
Streams application with two input topics being used for a left join.
When the left side topic is missing upon starting the streams application, it 
hangs "in the middle" of the topology (at …9, see below). Only parts of the 
intermediate topics are created (up to …9)
When the missing input topic is created, the streams application resumes 
processing.

{noformat}
Topology:
StreamsTask taskId: 2_0
ProcessorTopology:
KSTREAM-SOURCE-11:
topics: 
[mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
children:   [KTABLE-AGGREGATE-12]
KTABLE-AGGREGATE-12:
states: 
[KTABLE-AGGREGATE-STATE-STORE-09]
children:   [KTABLE-TOSTREAM-20]
KTABLE-TOSTREAM-20:
children:   [KSTREAM-SINK-21]
KSTREAM-SINK-21:
topic:  faxout_udr_month_customer_aggregration
KSTREAM-SOURCE-17:
topics: 
[mystreams_app-KSTREAM-MAP-14-repartition]
children:   [KSTREAM-LEFTJOIN-18]
KSTREAM-LEFTJOIN-18:
states: 
[KTABLE-AGGREGATE-STATE-STORE-09]
children:   [KSTREAM-SINK-19]
KSTREAM-SINK-19:
topic:  data_UDR_joined
Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
{noformat}

*Why this matters*
The applications does quite a lot of preprocessing before joining with the 
missing input topic. This preprocessing won't happen without the topic, 
creating a huge backlog of data.

*Fix*
Issue an `warn` or `error` level message at start to inform about the missing 
topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6438) NSEE while concurrently creating and deleting a topic

2018-01-10 Thread Adam Kotwasinski (JIRA)
Adam Kotwasinski created KAFKA-6438:
---

 Summary: NSEE while concurrently creating and deleting a topic
 Key: KAFKA-6438
 URL: https://issues.apache.org/jira/browse/KAFKA-6438
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 1.0.0
 Environment: kafka_2.11-1.0.0.jar
OpenJDK Runtime Environment (build 1.8.0_102-b14), OpenJDK 64-Bit Server VM 
(build 25.102-b14, mixed mode)
CentOS Linux release 7.3.1611 (Core)
Reporter: Adam Kotwasinski


It appears that deleting a topic and creating it at the same time can cause 
NSEE, what later results in a forced controller shutdown.

Most probably topics are being created because consumers/producers are still 
active (yes, this means the deletion is happening blindly).

The main problem here (for me) is the controller switch, the data loss and 
following unclean election is acceptable (as we admit to deleting blindly).

Environment description:
20 kafka brokers
80k partitions (20k topics 4partitions each)
3 node ZK

Incident:
{code:java}
[2018-01-09 11:19:05,912] INFO [Topic Deletion Manager 6], Partition deletion 
callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 
(kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:06,237] INFO [Controller id=6] New leader and ISR for 
partition mytopic-0 is {"leader":-1,"leader_epoch":1,"isr":[]} 
(kafka.controller.KafkaController)
[2018-01-09 11:19:06,412] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,218] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,304] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,383] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,510] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,661] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,728] INFO [Topic Deletion Manager 6], Deletion for 
replicas 9,10,11 for partition mytopic-0,mytopic-1,mytopic-2 of topic mytopic 
in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] Invoking 
state change to OfflinePartition for partitions 
mytopic-2,mytopic-0,mytopic-1,mytopic-3 (kafka.controller.PartitionStateMachine)
[2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] Invoking 
state change to NonExistentPartition for partitions 
mytopic-2,mytopic-0,mytopic-1,mytopic-3 (kafka.controller.PartitionStateMachine)
[2018-01-09 11:19:08,592] INFO [Controller id=6] New topics: [Set(mytopic, 
other, querytickle_WD2-SALES1_espgms0202v29)], deleted topics: [Set()], new 
partition replica assignment [Map(other-0 -> Vector(8), mytopic-2 -> Vector(6), 
mytopic-0 -> Vector(4), other-2 -> Vector(10), mytopic-1 -> Vector(5), 
mytopic-3 -> Vector(7), other-1 -> Vector(9), other-3 -> Vector(11))] 
(kafka.controller.KafkaController)
[2018-01-09 11:19:08,593] INFO [Controller id=6] New topic creation callback 
for other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
(kafka.controller.KafkaController)
[2018-01-09 11:19:08,596] INFO [Controller id=6] New partition creation 
callback for 
other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
(kafka.controller.KafkaController)
[2018-01-09 11:19:08,596] INFO [PartitionStateMachine controllerId=6] Invoking 
state change to NewPartition for partitions 
other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
(kafka.controller.PartitionStateMachine)
[2018-01-09 11:19:08,642] INFO [PartitionStateMachine controllerId=6] Invoking 
state change to OnlinePartition for partitions 
other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
(kafka.controller.PartitionStateMachine)
[2018-01-09 11:19:08,828] INFO [Topic Deletion Manager 6], Partition deletion 
callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 
(kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:09,127] INFO [Controller id=6] New leader and ISR for 
partition mytopic-0 is {"leader":-1,"leader_epoch"

[jira] [Created] (KAFKA-6439) We are using streamset to produce data into kafka topic (3 node cluster). We are facing following error frequently in production.

2018-01-10 Thread srithar durairaj (JIRA)
srithar durairaj created KAFKA-6439:
---

 Summary: We are using streamset to produce data into kafka topic 
(3 node cluster). We are facing following error frequently in production.  
 Key: KAFKA-6439
 URL: https://issues.apache.org/jira/browse/KAFKA-6439
 Project: Kafka
  Issue Type: Bug
Reporter: srithar durairaj






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [VOTE] KIP-226 - Dynamic Broker Configuration

2018-01-10 Thread Rajini Sivaram
The vote has passed with 4 binding votes (Gwen, Jun, Jason, me) and 2
non-binding votes (Ted You, Tom Bentley).

Many thanks for the reviews and votes. I will update the KIP page.

Regards,

Rajini

On Tue, Jan 9, 2018 at 11:00 AM, Rajini Sivaram 
wrote:

>
> Thank you, Jun! I have updated the KIP.
>
> If there are no other comments or concerns, I will close the vote later
> today.
>
> Thanks,
>
> Rajini
>
> On Mon, Jan 8, 2018 at 10:57 PM, Jun Rao  wrote:
>
>> Hi, Rajini,
>>
>> Thanks for the explanation. Then your suggestion sounds good to me.
>>
>> Jun
>>
>> On Mon, Jan 8, 2018 at 1:32 PM, Rajini Sivaram 
>> wrote:
>>
>> > Hi Jun,
>> >
>> > No,  password.encoder.secret cannot be updated dynamically at the
>> moment.
>> > Dynamic configs are stored in ZooKeeper and since ZK is not secure, all
>> > password configs in ZK are encrypted using password.encoder.secret. We
>> > cannot make password.encoder.secret dynamic since it would need another
>> > secret to encrypt it for storing in ZK and that secret would need to be
>> > static and cannot be rotated.
>> >
>> > On Mon, Jan 8, 2018 at 6:33 PM, Jun Rao  wrote:
>> >
>> > > Hi, Rajini,
>> > >
>> > > Could password.encoder.secret be updated dynamically? If so, each
>> broker
>> > > will still have access to the old secret when password.encoder.secret
>> is
>> > > updated. Perhaps that's a simpler way to handle changing secret than
>> > > introducing an extra config.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > > On Fri, Jan 5, 2018 at 3:09 AM, Rajini Sivaram <
>> rajinisiva...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi Jun,
>> > > >
>> > > > We are using 2-way encryption. The password configs encoded are
>> > > > keystore/truststore passwords and JAAS configuration. We need to be
>> > able
>> > > to
>> > > > extract the actual values for these, so we cannot use 1-way hash.
>> So if
>> > > we
>> > > > have the old secret, we can decrypt and get the original values.
>> > > >
>> > > > Thank you,
>> > > >
>> > > > Rajini
>> > > >
>> > > > On Fri, Jan 5, 2018 at 12:11 AM, Jun Rao  wrote:
>> > > >
>> > > > > Hi, Rajin,
>> > > > >
>> > > > > Does providing the old-secret help? My understanding is that the
>> > > encoded
>> > > > > passwd is the result of a 1-way hash with the secret. So, one
>> can't
>> > > > decode
>> > > > > the passwd with old-secret. If that's the case, one still needs to
>> > > > provide
>> > > > > the unencrypted paaswd to re-encode with the new secret?
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > > On Thu, Jan 4, 2018 at 1:28 AM, Rajini Sivaram <
>> > > rajinisiva...@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Hi Jun/Jason,
>> > > > > >
>> > > > > > I was wondering whether it is worth adding a new property
>> (static
>> > > > config
>> > > > > in
>> > > > > > server.properties) to pass in the previous encoder password as
>> well
>> > > > when
>> > > > > > changing encoder password. So you would set:
>> > > > > >
>> > > > > >- password.encoder.secret=new-password
>> > > > > >- password.encoder.old.secret=old-password
>> > > > > >
>> > > > > > When the broker starts up and loads passwords from ZK, it would
>> > check
>> > > > if
>> > > > > > old-password is being used. If so, it would re-encode all
>> passwords
>> > > in
>> > > > ZK
>> > > > > > using new-password and store them back in ZK. If the
>> new-password
>> > is
>> > > > > > already in use in ZK, the old one will be ignored. This needs an
>> > > extra
>> > > > > > property, but makes it simpler for the user since all other
>> > passwords
>> > > > can
>> > > > > > be used from ZK.
>> > > > > >
>> > > > > > What do you think?
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Wed, Jan 3, 2018 at 6:01 PM, Rajini Sivaram <
>> > > > rajinisiva...@gmail.com>
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Hi Jason,
>> > > > > > >
>> > > > > > > Thank you for reviewing and voting.
>> > > > > > >
>> > > > > > > Thanks, I had missed the rename. Have updated the KIP.
>> > > > > > >
>> > > > > > > The configs can be defined in the static server.properties or
>> in
>> > > > > > > ZooKeeper. If a ZK config cannot be decoded (or is not
>> valid), we
>> > > log
>> > > > > an
>> > > > > > > error and revert to the static config or default. When
>> updating
>> > the
>> > > > > > secret
>> > > > > > > used by the encode, we expect all password values to be
>> specified
>> > > in
>> > > > > > > server.properties. And the decoding or sanity check of the
>> > password
>> > > > in
>> > > > > ZK
>> > > > > > > would fail with the new secret, so we would use the password
>> > values
>> > > > > from
>> > > > > > > server.properties. Once the broker starts up, the values can
>> be
>> > > reset
>> > > > > in
>> > > > > > ZK
>> > > > > > > using AdminClient and they will be encoded using the new
>> secret.
>> > > > > > >
>> > > > > > >
>> > > > > > > On Wed, Jan 3, 2018 at 5:34 PM, Jason Gustafson <
>> > > ja...@confluent.io>
>> > > > > > > wrote:
>> > 

[jira] [Created] (KAFKA-6440) Expose Connect leader via REST

2018-01-10 Thread Ryan P (JIRA)
Ryan P created KAFKA-6440:
-

 Summary: Expose Connect leader via REST
 Key: KAFKA-6440
 URL: https://issues.apache.org/jira/browse/KAFKA-6440
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 1.0.0
Reporter: Ryan P
Priority: Minor


[KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework]
 adds a metric to expose the current leader of a connect cluster. It would be 
helpful to make this information available via REST as well as it would not 
require the use of a JMX client. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Build failed in Jenkins: kafka-trunk-jdk7 #3088

2018-01-10 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-6398: fix KTable.filter that does not include its parent's

--
[...truncated 398.83 KB...]
kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearEarliestOnEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearEarliestOnEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPreserveResetOffsetOnClearEarliestIfOneExists STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPreserveResetOffsetOnClearEarliestIfOneExists PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldFetchEndOffsetOfEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldFetchEndOffsetOfEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearAllEntries STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearAllEntries PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearLatestOnEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearLatestOnEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotResetEpochHistoryHeadIfUndefinedPassed STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotResetEpochHistoryHeadIfUndefinedPassed PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > shouldGetEpochsFromReplica 
STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > shouldGetEpochsFromReplica PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnUnknownTopicOrPartitionIfThrown STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnUnknownTopicOrPartitionIfThrown PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnNoLeaderForPartitionIfThrown STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnNoLeaderForPartitionIfThrown PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldSurviveFastLeaderChange STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldSurviveFastLeaderChange PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
offsetsShouldNotGoBackwards STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
offsetsShouldNotGoBackwards PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldFollowLeaderEpochBasicWorkflow STARTED
ERROR: No tool found matching GRADLE_3_4_RC_2_HOME
ERROR: Could not install GRADLE_3_5_HOME
java.lang.NullPointerException
at 
hudson.plugins.toolenv.ToolEnvBuildWrapper$1.buildEnvVars(ToolEnvBuildWrapper.java:46)
at hudson.model.AbstractBuild.getEnvironment(AbstractBuild.java:895)
at hudson.plugins.git.GitSCM.getParamExpandedRepos(GitSCM.java:421)
at 
hudson.plugins.git.GitSCM.compareRemoteRevisionWithImpl(GitSCM.java:629)
at hudson.plugins.git.GitSCM.compareRemoteRevisionWith(GitSCM.java:594)
at hudson.scm.SCM.compareRemoteRevisionWith(SCM.java:391)
at hudson.scm.SCM.poll(SCM.java:408)
at hudson.model.AbstractProject._poll(AbstractProject.java:1384)
at hudson.model.AbstractProject.poll(AbstractProject.java:1287)
at hudson.triggers.SCMTrigger$Runner.runPolling(SCMTrigger.java:594)
at hudson.triggers.SCMTrigger$Runner.run(SCMTrigger.java:640)
at 
hudson.util.SequentialExecutionQueue$QueueEntry.run(SequentialExecutionQueue.java:119)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 

Documentation changes

2018-01-10 Thread Ivan Babrou
What does it take to merge a documentation change?

* https://github.com/apache/kafka/pull/3567

Seeing zero activity is very discouraging.


Re: Documentation changes

2018-01-10 Thread Jun Rao
Hi, Ivan,

Thanks for the PR. Sorry for the delay. Just merged it.

Jun

On Wed, Jan 10, 2018 at 12:56 PM, Ivan Babrou  wrote:

> What does it take to merge a documentation change?
>
> * https://github.com/apache/kafka/pull/3567
>
> Seeing zero activity is very discouraging.
>


Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field

2018-01-10 Thread Jun Rao
Hi, Dong,

My replies are the following.

60. What you described could also work. The drawback is that we will be
unnecessarily changing the partition epoch when a partition hasn't really
changed. I was imagining that the partition epoch will be stored in
/brokers/topics/[topic]/partitions/[partitionId], instead of at the topic
level. So, not sure if ZK size limit is an issue.

61, 62 and 65. To me, the offset + offset_epoch is a unique identifier for
a message. So, if a message hasn't changed, the offset and the associated
offset_epoch ideally should remain the same (it will be kind of weird if
two consumer apps save the offset on the same message, but the offset_epoch
are different). partition_epoch + leader_epoch give us that. global_epoch +
leader_epoch don't. If we use this approach, we can solve not only the
problem that you have identified, but also other problems when there is
data loss or topic re-creation more reliably. For example, in the future,
if we include the partition_epoch and leader_epoch in the fetch request,
the server can do a more reliable check of whether that offset is valid or
not. I am not sure that we can rely upon all external offsets to be removed
on topic deletion. For example, a topic may be deleted by an admin who may
not know all the applications.

If we agree on the above, the second question is then how to reliably
propagate the partition_epoch and the leader_epoch to the consumer when
there are leader or partition changes. The leader_epoch comes from the
message, which is reliable. So, I was suggesting that when we store an
offset, we can just store the leader_epoch from the message set containing
that offset. Similarly, I was thinking that if the partition_epoch is in
the fetch response, we can propagate partition_epoch reliably where is
partition_epoch change.

63. My point is that once a leader is producing a message in the new
partition_epoch, ideally, we should associate the new offsets with the new
partition_epoch. Otherwise, the offset_epoch won't be the correct unique
identifier (useful for solving other problems mentioned above). I was
originally thinking that the leader will include the partition_epoch in the
metadata cache in the fetch response. It's just that right now, metadata
cache is updated on UpdateMetadataRequest, which typically happens after
the LeaderAndIsrRequest. Another approach is for the leader to cache the
partition_epoch in the Partition object and return that (instead of the one
in metadata cache) in the fetch response.

65. It seems to me that the global_epoch and the partition_epoch have
different purposes. A partition_epoch has the benefit that it (1) can be
used to form a unique identifier for a message and (2) can be used to
solve other
corner case problems in the future. I am not sure having just a
global_epoch can achieve these. global_epoch is useful to determine which
version of the metadata is newer, especially with topic deletion.

Thanks,

Jun

On Tue, Jan 9, 2018 at 11:34 PM, Dong Lin  wrote:

> Regarding the use of the global epoch in 65), it is very similar to the
> proposal of the metadata_epoch we discussed earlier. The main difference is
> that this epoch is incremented when we create/expand/delete topic and does
> not change when controller re-send metadata.
>
> I looked at our previous discussion. It seems that we prefer
> partition_epoch over the metadata_epoch because 1) we prefer not to have an
> ever growing metadata_epoch and 2) we can reset offset better when topic is
> re-created. The use of global topic_epoch avoids the drawback of an ever
> quickly ever growing metadata_epoch. Though the global epoch does not allow
> us to recognize the invalid offset committed before the topic re-creation,
> we can probably just delete the offset when we delete a topic. Thus I am
> not very sure whether it is still worthwhile to have a per-partition
> partition_epoch if the metadata already has the global epoch.
>
>
> On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Thanks so much. These comments very useful. Please see below my comments.
> >
> > On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao  wrote:
> >
> >> Hi, Dong,
> >>
> >> Thanks for the updated KIP. A few more comments.
> >>
> >> 60. Perhaps having a partition epoch is more flexible since in the
> future,
> >> we may support deleting a partition as well.
> >>
> >
> > Yeah I have considered this. I think we can probably still support
> > deleting a partition by using the topic_epoch -- when partition of a
> topic
> > is deleted or created, epoch of all partitions of this topic will be
> > incremented by 1. Therefore, if that partition is re-created later, the
> > epoch of that partition will still be larger than its epoch before the
> > deletion, which still allows the client to order the metadata for the
> > purpose of this KIP. Does this sound reasonable?
> >
> > The advantage of using topic_epoch instead of partition_epoch is that the
> > size of the /b

Re: [VOTE] KIP-219 - Improve Quota Communication

2018-01-10 Thread Jun Rao
Hi, Jiangjie,

Thanks for the updated KIP. +1

Jun

On Mon, Jan 8, 2018 at 7:45 PM, Becket Qin  wrote:

> Thanks for the comments, Jun.
>
> 1. Good point.
> 2. Also makes sense. Usually the connection.max.idle.ms is high enough so
> the throttling is impacted.
>
> I have updated the KIP to reflect the changes.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Mon, Jan 8, 2018 at 6:30 PM, Jun Rao  wrote:
>
> > Hi, Jiangjie,
> >
> > Sorry for the late response. The proposal sounds good overall. A couple
> of
> > minor comments below.
> >
> > 1. For throttling a fetch request, we could potentially just send an
> empty
> > response. We can return a throttle time calculated from a full response,
> > but only mute the channel on the server based on a throttle time
> calculated
> > based on the empty response. This has the benefit that the server will
> mute
> > the channel much shorter, which will prevent the consumer from
> rebalancing
> > when throttled.
> >
> > 2. The wiki says "connections.max.idle.ms should be ignored during the
> > throttle time X." This has the potential issue that a server may not
> detect
> > that a client connection is already gone until after an arbitrary amount
> of
> > time. Perhaps we could still just close a connection if the server has
> > muted it for longer than connections.max.idle.ms. This will at least
> bound
> > the time for a server to detect closed client connections.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Nov 20, 2017 at 5:30 PM, Becket Qin 
> wrote:
> >
> > > Hi,
> > >
> > > We would like to start the voting thread for KIP-219. The KIP proposes
> to
> > > improve the quota communication between the brokers and clients,
> > especially
> > > for cases of long throttling time.
> > >
> > > The KIP wiki is following:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 219+-+Improve+quota+
> > > communication
> > >
> > > The discussion thread is here:
> > > http://markmail.org/search/?q=kafka+KIP-219#query:kafka%
> > > 20KIP-219+page:1+mid:ooxabguy7nz7l7zy+state:results
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> >
>


Build failed in Jenkins: kafka-trunk-jdk7 #3089

2018-01-10 Thread Apache Jenkins Server
See 


Changes:

[ibobrik] KAFKA-4711: fix docs on unclean.leader.election.enable default

--
[...truncated 410.18 KB...]

kafka.tools.ConsoleProducerTest > testInvalidConfigs STARTED

kafka.tools.ConsoleProducerTest > testInvalidConfigs PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer PASSED

kafka.tools.MirrorMakerTest > 
testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage STARTED

kafka.tools.MirrorMakerTest > 
testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage PASSED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler STARTED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler PASSED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandlerWithHeaders 
STARTED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandlerWithHeaders 
PASSED

kafka.tools.ConsumerPerformanceTest > testDetailedHeaderMatchBody STARTED

kafka.tools.ConsumerPerformanceTest > testDetailedHeaderMatchBody PASSED

kafka.tools.ConsumerPerformanceTest > testNonDetailedHeaderMatchBody STARTED

kafka.tools.ConsumerPerformanceTest > testNonDetailedHeaderMatchBody PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidOldConsumerConfigWithAutoOffsetResetSmallest STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidOldConsumerConfigWithAutoOffsetResetSmallest PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewConsumerConfigWithNoOffsetReset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewConsumerConfigWithNoOffsetReset PASSED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit STARTED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewConsumerConfigWithAutoOffsetResetAndMatchingFromBeginning 
STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewConsumerConfigWithAutoOffsetResetAndMatchingFromBeginning 
PASSED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails STARTED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails PASSED

kafka.tools.ConsoleConsumerTest > 
shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningNewConsumer
 STARTED

kafka.tools.ConsoleConsumerTest > 
shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningNewConsumer
 PASSED

kafka.tools.ConsoleConsumerTest > 
shouldSetAutoResetToSmallestWhenFromBeginningConfigured STARTED

kafka.tools.ConsoleConsumerTest > 
shouldSetAutoResetToSmallestWhenFromBeginningConfigured PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewConsumerConfigWithAutoOffsetResetEarliest STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewConsumerConfigWithAutoOffsetResetEarliest PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidOldConsumerConfigWithAutoOffsetResetLargest STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidOldConsumerConfigWithAutoOffsetResetLargest PASSED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile STARTED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile PASSED

kafka.tools.ConsoleConsumerTest > groupIdsProvidedInDifferentPlacesMustMatch 
STARTED

kafka.tools.ConsoleConsumerTest > groupIdsProvidedInDifferentPlacesMustMatch 
PASSED

kafka.tools.ConsoleConsumerTest > 
shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningOldConsumer
 STARTED

kafka.tools.ConsoleConsumerTest > 
shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningOldConsumer
 PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset PASSED

kafka.tools.ConsoleConsumerTest > testDefaultConsumer STARTED

kafka.tools.ConsoleConsumerTest > testDefaultConsumer PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewConsumerConfigWithAutoOffsetResetLatest STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewConsumerConfigWithAutoOffsetResetLatest PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig PASSED

kafka.message.MessageCompressionTest > testCompressSize STARTED

kafka.message.MessageCompressionTest > testCompressSize PASSED

kafka.message.MessageCompressionTest > testSimpleComp

Build failed in Jenkins: kafka-trunk-jdk8 #2323

2018-01-10 Thread Apache Jenkins Server
See 


Changes:

[ibobrik] KAFKA-4711: fix docs on unclean.leader.election.enable default

--
[...truncated 1.42 MB...]
org.apache.kafka.common.config.ConfigDefTest > testValidateCannotParse STARTED

org.apache.kafka.common.config.ConfigDefTest > testValidateCannotParse PASSED

org.apache.kafka.common.config.ConfigDefTest > testValidate STARTED

org.apache.kafka.common.config.ConfigDefTest > testValidate PASSED

org.apache.kafka.common.config.ConfigDefTest > 
testInternalConfigDoesntShowUpInDocs STARTED

org.apache.kafka.common.config.ConfigDefTest > 
testInternalConfigDoesntShowUpInDocs PASSED

org.apache.kafka.common.config.ConfigDefTest > testInvalidDefaultString STARTED

org.apache.kafka.common.config.ConfigDefTest > testInvalidDefaultString PASSED

org.apache.kafka.common.config.ConfigDefTest > testSslPasswords STARTED

org.apache.kafka.common.config.ConfigDefTest > testSslPasswords PASSED

org.apache.kafka.common.config.ConfigDefTest > testBaseConfigDefDependents 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testBaseConfigDefDependents 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringClass 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringClass 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringShort 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringShort 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testInvalidDefault STARTED

org.apache.kafka.common.config.ConfigDefTest > testInvalidDefault PASSED

org.apache.kafka.common.config.ConfigDefTest > toRst STARTED

org.apache.kafka.common.config.ConfigDefTest > toRst PASSED

org.apache.kafka.common.config.ConfigDefTest > testCanAddInternalConfig STARTED

org.apache.kafka.common.config.ConfigDefTest > testCanAddInternalConfig PASSED

org.apache.kafka.common.config.ConfigDefTest > testMissingRequired STARTED

org.apache.kafka.common.config.ConfigDefTest > testMissingRequired PASSED

org.apache.kafka.common.config.ConfigDefTest > 
testParsingEmptyDefaultValueForStringFieldShouldSucceed STARTED

org.apache.kafka.common.config.ConfigDefTest > 
testParsingEmptyDefaultValueForStringFieldShouldSucceed PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringPassword 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringPassword 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringList 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringList 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringLong 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringLong 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringBoolean 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringBoolean 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testNullDefaultWithValidator 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testNullDefaultWithValidator 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testMissingDependentConfigs 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testMissingDependentConfigs 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringDouble 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringDouble 
PASSED

org.apache.kafka.common.config.ConfigDefTest > toEnrichedRst STARTED

org.apache.kafka.common.config.ConfigDefTest > toEnrichedRst PASSED

org.apache.kafka.common.config.ConfigDefTest > testDefinedTwice STARTED

org.apache.kafka.common.config.ConfigDefTest > testDefinedTwice PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringString 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringString 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testNestedClass STARTED

org.apache.kafka.common.config.ConfigDefTest > testNestedClass PASSED

org.apache.kafka.common.config.ConfigDefTest > testBadInputs STARTED

org.apache.kafka.common.config.ConfigDefTest > testBadInputs PASSED

org.apache.kafka.common.config.ConfigDefTest > testValidateMissingConfigKey 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testValidateMissingConfigKey 
PASSED

org.apache.kafka.common.config.AbstractConfigTest > testOriginalsWithPrefix 
STARTED

org.apache.kafka.common.config.AbstractConfigTest > testOriginalsWithPrefix 
PASSED

org.apache.kafka.common.config.AbstractConfigTest > testConfiguredInstances 
STARTED

org.apache.kafka.common.config.AbstractConfigTest > testConfiguredInstances 
PASSED

org.apache.kafka.common.config.AbstractConfigTest > 
testValuesWithPrefixOverride STARTED

org.apache.kafka.common.config.AbstractConfigTest > 
testValuesWithPrefixOverride PAS

[jira] [Resolved] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called before StreamThread.start

2018-01-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6383.
--
Resolution: Fixed

Issue resolved by pull request 4382
[https://github.com/apache/kafka/pull/4382]

> StreamThread.shutdown doesn't clean up completely when called before 
> StreamThread.start
> ---
>
> Key: KAFKA-6383
> URL: https://issues.apache.org/jira/browse/KAFKA-6383
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Rohan Desai
>Assignee: Rohan Desai
> Fix For: 1.1.0
>
>
> The following code leaks a producer network thread:
> {code}
> ks = new KafkaStreams(...);
> ks.close();
> {code}
> The underlying issue is that KafkaStreams creates a bunch of StreamsThreads 
> via StreamThread.create, which in turn creates a bunch of stuff (including a 
> producer). These resources are cleaned up only when the thread exits. So if 
> the thread was never started, then they are never cleaned up. 
> StreamThread.shutdown should clean up if it sees that the thread has never 
> been started.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is back to normal : kafka-trunk-jdk7 #3090

2018-01-10 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk8 #2324

2018-01-10 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-6383: Complete shut down for streams threads that have not 
started

--
[...truncated 3.88 MB...]
org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigRegexRouter PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigSetSchemaMetadata STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigSetSchemaMetadata PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigTimestampConverter STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigTimestampConverter PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigHoistField STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigHoistField PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigMaskField STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigMaskField PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigInsertField STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigInsertField PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigFlatten STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigFlatten PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigReplaceField STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigReplaceField PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigTimestampRouter STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigTimestampRouter PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigValueToKey STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigValueToKey PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigExtractField STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigExtractField PASSED

org.apache.kafka.connect.runtime.ConnectorConfigTest > unconfiguredTransform 
STARTED

org.apache.kafka.connect.runtime.ConnectorConfigTest > unconfiguredTransform 
PASSED

org.apache.kafka.connect.runtime.ConnectorConfigTest > 
multipleTransformsOneDangling STARTED

org.apache.kafka.connect.runtime.ConnectorConfigTest > 
multipleTransformsOneDangling PASSED

org.apache.kafka.connect.runtime.ConnectorConfigTest > misconfiguredTransform 
STARTED

org.apache.kafka.connect.runtime.ConnectorConfigTest > misconfiguredTransform 
PASSED

org.apache.kafka.connect.runtime.ConnectorConfigTest > noTransforms STARTED

org.apache.kafka.connect.runtime.ConnectorConfigTest > noTransforms PASSED

org.apache.kafka.connect.runtime.ConnectorConfigTest > danglingTransformAlias 
STARTED

org.apache.kafka.connect.runtime.ConnectorConfigTest > danglingTransformAlias 
PASSED

org.apache.kafka.connect.runtime.ConnectorConfigTest > multipleTransforms 
STARTED

org.apache.kafka.connect.runtime.ConnectorConfigTest > multipleTransforms PASSED

org.apache.kafka.connect.runtime.ConnectorConfigTest > singleTransform STARTED

org.apache.kafka.connect.runtime.ConnectorConfigTest > singleTransform PASSED

org.apache.kafka.connect.runtime.ConnectorConfigTest > wrongTransformationType 
STARTED

org.apache.kafka.connect.runtime.ConnectorConfigTest > wrongTransformationType 
PASSED

org.apache.kafka.connect.runtime.standalone.StandaloneHerderTest > 
testPutConnectorConfig STARTED

org.apache.kafka.connect.runtime.standalone.StandaloneHerderTest > 
testPutConnectorConfig PASSED

org.apache.kafka.connect.runtime.standalone.StandaloneHerderTest > 
testPutTaskConfigs STARTED

org.apache.kafka.connect.runtime.standalone.StandaloneHerderTest > 
testPutTaskConfigs PASSED

org.apache.kafka.connect.runtime.standalone.StandaloneHerderTest > 
testCreateConnectorFailedBasicValidation STARTED

org.apache.kafka.connect.runtime.standalone.StandaloneHerderTest > 
testCreateConnectorFailedBasicValidation PASSED

org.apache.kafka.connect.runtime.standalone.StandaloneHerderTest > 
testCreateConnectorFailedCustomValidation STARTED

org.apache.kafka.connect.runtime.standalone.StandaloneHerderTest > 
testCreateConnectorFailedCustomValidation PASSED

org.apache.kafka.connect.runtime.standalone.StandaloneHerderTest > 
testCreateConnectorAlreadyExists STARTED

org.apache.kafka.connect.runtime.standalone.StandaloneHerderTest > 
testCreateConnectorAlreadyExists PASSED

org.apache.kafka.connect.runtime.standalone.StandaloneHerderTest > 
testDestroyConnector STARTED

org.apache.kafka.connect.runtime.standalone.StandaloneHerderTest > 
testDestroyConnecto

[jira] [Created] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller

2018-01-10 Thread Ivan Babrou (JIRA)
Ivan Babrou created KAFKA-6441:
--

 Summary: FetchRequest populates buffer of size MinBytes, even if 
response is smaller
 Key: KAFKA-6441
 URL: https://issues.apache.org/jira/browse/KAFKA-6441
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.1
Reporter: Ivan Babrou


We're using Sarama Go client as consumer, but I don't think it's relevant. 
Producer is syslog-ng with Kafka output, I'm not quite sure which log format 
Kafka itself is using, but I can assume 0.11.0.0, because that's what is set in 
topic settings.

Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a silly 
reason, Kafka decides to reply with at least minSize buffer with just one 1KB 
log message. When Sarama was using older consumer API, everything was okay. 
When we upgraded to 0.11.0.0 consumer API, consumer traffic for 125Mbit/s topic 
spiked to 55000Mbit/s on the wire and consumer wasn't even able to keep up.

1KB message in a 16MB buffer is 1,600,000% overhead.

I don't think there's any valid reason to do this.

It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at 
changes is harder than it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)