[jira] [Created] (KAFKA-7729) Mysterious disk increase when switching to RedHat OpenJDK

2018-12-13 Thread Dimitris Mandalidis (JIRA)
Dimitris Mandalidis created KAFKA-7729:
--

 Summary: Mysterious disk increase when switching to RedHat OpenJDK
 Key: KAFKA-7729
 URL: https://issues.apache.org/jira/browse/KAFKA-7729
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Dimitris Mandalidis


(maybe it's not a bug actually, sort of a question)

We run a Kafka cluster 1.1.0 with 3 brokers running in RHEL 7.5 having their 
logs over an LVM ext4 partition (fsck'd an hour ago). The cluster is using 
Oracle JDK 8u144 and it uses 45G of disk space under normal circumstances.

We 've started switching to RedHat OpenJDK 1.8.0_191, one broker at a time. 
Since a broker had to be restarted, we thought that disk space increase was 
expected during rejoining the cluster, because it will first try to become ISR 
and it will roll segments (and delete if needed) afterwards (correct me if I 'm 
wrong). So the first OpenJDK broker ended up using 65G of disk space. For the 
second broker, because we will be running short of disk space, we decided to 
take him down and delete all the logs. So it also ended up using 65G of space.

We let the two brokers to operate as such for 8 hours expecting that they will 
eventually stabilize in the previous 45G but they didn't. The current situation 
is that we have 2 OpenJDK brokers holding 65G of disk usage, and one Oracle JDK 
broker which still holds 45G and waits to be migrated.

The actual question is that we cannot explain why we saw 20G disk space 
increase just switching to OpenJDK. We had performed rolling upgrades in the 
past (we 've started with 0.10) but we have never seen such a difference. Do 
you have any idea?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7730) Limit total number of active connections in the broker

2018-12-13 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-7730:
-

 Summary: Limit total number of active connections in the broker
 Key: KAFKA-7730
 URL: https://issues.apache.org/jira/browse/KAFKA-7730
 Project: Kafka
  Issue Type: New Feature
  Components: network
Affects Versions: 2.1.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.2.0


Add a new listener config `max.connections` to limit the maximum number of 
active connections on each listener. See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-402%3A+Improve+fairness+in+SocketServer+processors
 for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Vote for KIP-393 (Fix time windowed serde to deserialize changelog topic)

2018-12-13 Thread Damian Guy
+1 (binding)

On Wed, 12 Dec 2018 at 13:27, Adam Bellemare 
wrote:

> +1 (non-binding) from me. Looks like a pretty clear-cut case.
>
>
>
> On Tue, Dec 11, 2018 at 1:11 AM Shawn Nguyen 
> wrote:
>
> > Thanks for the feedback Guozhang! I updated the KIP.
> >
> > In the meantime, could I ask for additional binding votes/approval on
> this
> > KIP proposal?
> >
> > Shawn
> >
> > On Thu, Dec 6, 2018 at 1:22 PM Liquan Pei  wrote:
> >
> > > +1 (non-binding)
> > >
> > > On Wed, Dec 5, 2018 at 4:51 PM Guozhang Wang 
> wrote:
> > >
> > >> Hello Shawn,
> > >>
> > >> Thanks for the writeup. I've made a pass over it and here are some
> minor
> > >> comments:
> > >>
> > >> 1) As we discussed in the PR:
> https://github.com/apache/kafka/pull/5307
> > ,
> > >> the public APIs that we will add is
> > >>
> > >> In WindowedSerdes:
> > >> ```
> > >> static public  Serde>
> > timeWindowedChangelogSerdeFrom(final
> > >> Class type, final long windowSize)
> > >> ```
> > >>
> > >> In TimeWindowedSerde
> > >> ```
> > >> TimeWindowedSerde forChangelog(final boolean);
> > >> ```
> > >>
> > >> Other classes such as WindowedKeySchema are internal classes for
> > >> implementation details and hence do not need to be listed in the wiki
> as
> > >> public APIs.
> > >>
> > >>
> > >> 2) The wiki doc may reads a bit confusing for audience who are not
> > >> familiar
> > >> with the PR, since we mentioned the "forChangelog()" function and the
> > >> "isChangelog" parameter without clear definitions, but only explained
> > what
> > >> it is later in the docs as java code examples. I think rephrasing the
> > >> early
> > >> paragraphs to explain a bit more why we will add a new internal field
> > >> along
> > >> with a setter, its semantics (its default value and how
> deserialization
> > >> will be different depending on that) would be better.
> > >>
> > >> Otherwise, I'm +1 on the KIP, thanks!
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Wed, Dec 5, 2018 at 8:18 AM Shawn Nguyen 
> > >> wrote:
> > >>
> > >> > Hey all,
> > >> >
> > >> > I wanted to start a vote on approval of KIP-393
> > >> > <
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-393%3A+Time+windowed+serde+to+properly+deserialize+changelog+input+topic
> > >> > >
> > >> > to
> > >> > fix the current time windowed serde for properly deserializing
> > changelog
> > >> > input topics. Let me know what you guys think.
> > >> >
> > >> > Thanks,
> > >> > Shawn
> > >> >
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> > > --
> > > Liquan Pei
> > > Software Engineer, Confluent Inc
> > >
> >
>


Re: [DISCUSS] KIP-399: Extend ProductionExceptionHandler to cover serialization exceptions

2018-12-13 Thread Matthias J. Sax
For store updates, records are first serialized and afterwards put into
the store and written into the changelog topic.

In the current implementation, if the send() into the changelog topic
produces an error and the handler skips over it, the local store content
and the changelog topic diverge. This seems to be a correctness issue.

For serialization error, it would not happen that store and changelog
diverge, because serialization happens before and put/send. Thus, with
this KIP we could skip both put() and send(). However, I am still
wondering, if it would be ok to skip a store update for this case? (Btw:
the current PR does not address this atm, and a serialization error for
a store write would not be covered but kill the instance).

IIRC, the original idea of the KIP was to allow skipping over record for
output topics only. That's why I am wondering if it's ok to allow
skipper over record in repartitions topics, too.

In the end, it's some data loss for all 3 cases, so maybe it's ok to
allow skipping for all 3 cases. However, we should not allow that local
store and changelog topic diverge IMHO (what might been an orthogonal
bug thought).

I also don't have an answer or preference. Just think, it's important to
touch on those cases and get input how people think about it.


-Matthias



On 12/11/18 11:43 AM, Kamal Chandraprakash wrote:
> Matthias,
> 
> For changelog topics, I think it does not make sense to allow skipping
> records if serialization fails? For internal repartitions topics, I am
> not sure if we should allow it or not. Would you agree with this? We
> should discuss the implication to derive a sound design.
> 
> Can you explain the issue that happens when records are skipped to
> changelog / internal-repartition topics ? So, that I can look into it.
> 
> On Fri, Dec 7, 2018 at 12:07 AM Matthias J. Sax 
> wrote:
> 
 To accept different types of records from multiple topologies, I have to
 define the ProducerRecord without generics.
>>
>> Yes. It does make sense. My point was, that the KIP should
>> mention/explain this explicitly to allow other not familiar with the
>> code base to understand it more easily :)
>>
>>
>>
>> About `ClassCastException`: seems to be an implementation detail. No
>> need to make it part of the KIP discussion.
>>
>>
>>
>> One more thing that came to my mind. We use the `RecordCollector` to
>> write into all topics, ie, user output topics and internal repartition
>> and changelog topics.
>>
>> For changelog topics, I think it does not make sense to allow skipping
>> records if serialization fails? For internal repartitions topics, I am
>> not sure if we should allow it or not. Would you agree with this? We
>> should discuss the implication to derive a sound design.
>>
>> I was also just double checking the code, and it seems that the current
>> `ProductionExceptionHandler` is applied for all topics. This seems to be
>> incorrect to me. Seems we missed this case when doing KIP-210? (Or did
>> we discuss this and I cannot remember? Might be worth to double check.)
>>
>> Last thought: of course, the handler will know which topic is affected
>> and can provide a corresponding implementation. Was just wondering if we
>> should be more strict?
>>
>>
>> -Matthias
>>
>> On 12/6/18 10:01 AM, Kamal Chandraprakash wrote:
>>> Matt,
>>> I agree with Matthias on not to altering the serializer as it's used
>> by
>>> multiple components.
>>>
>>> Matthias,
>>>
>>>  - the proposed method accepts a `ProducerRecord` -- it might be good to
>>> explain why this cannot be done in a type safe way (ie, missing generics)
>>>
>>> To accept different types of records from multiple topologies, I have to
>>> define the ProducerRecord without generics.
>>>
>>> - `AlwaysProductionExceptionHandler` ->
>>> `AlwaysContinueProductionExceptionHandler`
>>>
>>> Updated the typo error in KIP.
>>>
>>>  - `DefaultProductionExceptionHandler` is not mentioned
>>>
>>> The `handleSerializationException` method in the
>>> `ProductionExceptionHandler` interface will have default implementation
>>> that is set to FAIL by default.
>>> This is done to avoid any changes in the user implementation. So, I
>> didn't
>>> mentioned the `DefaultProductionExceptionHandler` class. Updated the KIP.
>>>
>>> - Why do you distinguish between `ClassCastException` and "any other
>>> unchecked exception? Both second case seems to include the first one?
>>>
>>> In SinkNode.java#93
>>> <
>> https://github.com/apache/kafka/blob/87cc31c4e7ea36e7e832a1d02d71480a91a75293/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java#L93
>>>
>>> on
>>> hitting `ClassCastException`, we are halting the streams as it's a fatal
>>> error.
>>> To keep the original behavior, I've to distinguish the exceptions.
>>>
>>>
>>> On Thu, Dec 6, 2018 at 10:44 PM Matthias J. Sax 
>>> wrote:
>>>
 Well, that's exactly the point. The serializer should not be altered
 IMHO because this would have impact on other compon

[jira] [Created] (KAFKA-7731) JMX metrics for client connections: how many, what version, what language, source ip etc...

2018-12-13 Thread Antony Stubbs (JIRA)
Antony Stubbs created KAFKA-7731:


 Summary: JMX metrics for client connections: how many, what 
version, what language, source ip etc...
 Key: KAFKA-7731
 URL: https://issues.apache.org/jira/browse/KAFKA-7731
 Project: Kafka
  Issue Type: New Feature
  Components: core
Affects Versions: 2.1.0
Reporter: Antony Stubbs


Extremely useful for diagnosing large installations with many clients, auditing 
client usage, behaviour etc..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-12-13 Thread Matthias J. Sax
Just catching up on this discussion.

My overall personal take is, that I am not a big fan of the interface
`Named` that is used as a factory. I would rather prefer to add a
control object parameter to all methods that don't have one yet. This
KIP was started a while ago, and we added new naming capabilities in the
meantime. Guozhang's example in the PR comment about naming in
stream-stream join shows, that we might end up in a confusion situation
for users if we use `Named`. Also, in 2.1, user can already name as
repartition-/changelog-topics and stores. Thus, KIP-307 boils down to
provide non-functional naming?

Hence, for all methods that allow to specify names already, I don't see
any reason to change them, but use the existing API to also name the
processor(s) instead of allowing uses to specify a new name.

About the inconsistency in method naming. I agree, that `as` is very
generic and maybe not the best choice.

I think it might be helpful, to have a table overview in the KIP, that
list all existing static/non-static methods that allow to specify a
name, plus a columns with the new suggested naming for those methods?

Thoughts?


-Matthias


On 12/12/18 12:45 AM, Florian Hussonnois wrote:
> Thank you very much for your feedbacks.
> 
> Currently, there is still lot of discussions regarding the Named interface.
> On the one hand we should provided consistency over the stream API and on
> the other hand we should not break the semantic as John point it up.
> 
> Guozhang, I'm sorry, but I'm little bit confused, maybe I missed something.
> In your comment you have suggested that :
> * Produced/Consumed/Suppressed should extends Named
> * Named should have a private-package method to get the specified processor
> name internally (processorName())
> * Finally we should end up with something like :  Named -> XXX ->
> XXXInternal or Named -> Produced -> ProducedInternal
> 
> The objective behind that is to :
> * consolidate the internal method processorName()
> * consolidate the method withName that exists now existing into Produced,
> Consumed and Suppressed.
> 
> But, Named is an interface so we can't define a private-package method on
> it. Also, for example Produced and ProducedInternal are not in the same
> package so having a private-package method doesn't really help.
> In addition, if we add the withName method into Named interface this can
> become confusing for developers because action interfaces (ValueMapper,
> Reducer, etc) extend it.
> The interface would look like :
> 
> public interface Named> {
> default String name() {
> return null;
> }
> default Named withName(final String name) {
> return null;
> }
> ...
> }
> 
> So maybe instead of adding another method to Named we could create a new
> package-private class that could be extended by
> Produced/Consumed/Joined/Suppressed. For exemple,
> class SettableName> implements Named {
> 
> protected String processorName;
> 
> SettableName(final SettableName settable) {
> this(Objects.requireNonNull(settable, "settable can't be
> null").name());
> }
> 
> SettableName(final String processorName) {
> this.processorName = processorName;
> }
> 
> @Override
> public String name() {
> return processorName;
> }
> public T withName(final String processorName) {
> this.processorName = processorName;
> return (T)this;
> }
> }
> 
> In that way, we will get : public class Produced implements
> SettableName { ...
> 
> WDYT?
> 
> 
> Le mar. 11 déc. 2018 à 02:46, Guozhang Wang  a écrit :
> 
>> I had one meta comment on the PR:
>> https://github.com/apache/kafka/pull/5909#discussion_r240447153
>>
>> On Mon, Dec 10, 2018 at 5:22 PM John Roesler  wrote:
>>
>>> Hi Florian,
>>>
>>> I hope it's ok if I ask a few questions at this late stage...
>>>
>>> Comment 1 ==
>>>
>>> It seems like the proposal is to add a new "Named" interface that is
>>> intended to be mixed in with the existing API objects at various points.
>>>
>>> Just to preface some of my comments, it looks like your KIP was created
>>> quite a while ago, so the API may have changed somewhat since you
>> started.
>>>
>>> As I see the API, there are a few different kinds of DSL method
>> arguments:
>>> * functions: things like Initializer, Aggregator, ValueJoiner,
>>> ForEachAction... All of these are essentially Streams-flavored Function
>>> interfaces with different arities, type bounds, and semantics.
>>> * config objects: things like Produced, Consumed, Joined, Grouped...
>> These
>>> are containers for configurations, where the target of the configuration
>> is
>>> the operation itself
>>> * raw configurations: things like a raw topic-name string and
>> Materialized:
>>> These are configurations for operations that have no config object, and
>> for
>>> various reasons, we didn't make one. The distinguishing feature is that
>> the
>>> target of the configuration is not the operation itself,

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-13 Thread Matthias J. Sax
Just a meta comment: do we really need to deprecate existing
`transform()` etc methods?

The last argument is a vararg, and thus, just keeping the existing API
for this part seems to work too, allowing to implement both patterns?

Also, instead of adding a default method, we could also add a new
interface `StoreBuilderSupplier` with method `List
stateStores()` -- users could implement `TransformerSupplier` and
`StoreBuilderSupplier` at once; and for this case, we require that users
don't provide store name in `transform()`.

Similar, we could add an interface `StoreNameSupplier` with method
`List stateStores()`. This allows to "auto-wire" a transformer
to existing stores (to avoid the issue to add the same store multiple
times).

Hence, for shared stores, there would be one "main" transformer that
implements `StoreBuilderSupplier` and that must be added first to the
topology. The other transformers would implement `StoreNameSupplier` and
just connect to those stores.

Another possibility to avoid the issue of adding the same stores
multiple times would be, that the DSL always calls `addStateStore()` but
catches a potential "store exists already" exception and falls back to
`connectProcessorAndStateStore()` for this case. Thus, we would not need
the `StoreNameSupplier` interface and the order in which transformers
are added would not matter either. The only disadvantage I see, might be
potential bugs about sharing state if two different stores are named the
same by mistake (this would not be detected).



Just some ideas I wanted to share. What do you think?



-Matthias

On 12/11/18 3:46 AM, Paul Whalen wrote:
> Ah yes of course, this was an oversight, I completely ignored the multiple
> processors sharing the same state store when writing up the KIP.  Which is
> funny, because I've actually done this (different processors sharing state
> stores) a fair amount myself, and I've settled on a pattern where I group
> the Processors in an enclosing class, and that enclosing class handles as
> much as possible.  Here's a gist showing the rough structure, just for
> context: https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
> . Note how it adds the stores to the topology, as well as providing a
> public method with the store names.
> 
> I don't think my proposal completely conflicts with the multiple processors
> sharing state stores use case, since you can create a supplier that
> provides the store name you want, somewhat independently of your actual
> Processor logic.  The issue I do see though, is that
> topology.addStateStore() can only be called once for a given store.  So for
> your example, if the there was a single TransformerSupplier that was passed
> into both transform() calls, "store1" would be added (under the hood) to
> the topology twice, which is no good.
> 
> Perhaps this suggests that one of my alternatives on the KIP might be
> desirable: either not having the suppliers return StoreBuilders (just store
> names), or not deprecating the old methods that take "String...
> stateStoreNames". I'll have to think about it a bit.
> 
> Paul
> 
> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang  wrote:
> 
>> Hello Paul,
>>
>> Thanks for the great writeup (very detailed and crystal motivation
>> sections!).
>>
>> This is quite an interesting idea and I do like the API cleanness you
>> proposed. The original motivation of letting StreamsTopology to add state
>> stores though, is to allow different processors to share the state store.
>> For example:
>>
>> builder.addStore("store1");
>>
>> // a path of stream transformations that leads to KStream stream1.
>> stream1.transform(..., "store1");
>>
>> // another path that generates a KStream stream2.
>> stream2.transform(..., "store1");
>>
>> Behind the scene, Streams will make sure stream1 / stream2 transformations
>> will always be grouped together as a single group of tasks, each of which
>> will be executed by a single thread and hence there's no concurrency issues
>> on accessing the store from different operators within the same task. I'm
>> not sure how common this use case is, but I'd like to hear if you have any
>> thoughts maintaining this since the current proposal seems exclude this
>> possibility.
>>
>>
>> Guozhang
>>
>>
>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen  wrote:
>>
>>> Here's KIP-401 for discussion, a minor Kafka Streams API change that I
>>> think could greatly increase the usability of the low-level processor
>> API.
>>> I have some code written but will wait to see if there is buy in before
>>> going all out and creating a pull request.  It seems like most of the
>> work
>>> would be in updating documentation and tests.
>>>
>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
>>>
>>> Thanks!
>>> Paul
>>>
>>
>>
>> --
>> -- Guozhang
>>
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Created] (KAFKA-7732) Kafka broker lag metrics is more than 0, but no partitions are under replicated

2018-12-13 Thread Mathias Kub (JIRA)
Mathias Kub created KAFKA-7732:
--

 Summary: Kafka broker lag metrics is more than 0, but no 
partitions are under replicated
 Key: KAFKA-7732
 URL: https://issues.apache.org/jira/browse/KAFKA-7732
 Project: Kafka
  Issue Type: Bug
  Components: metrics
Affects Versions: 2.1.0
Reporter: Mathias Kub
 Attachments: 2018-12-13 17-13-34.flv

Hi,

I just upgraded Kafka from 1.1.1 to 2.1.0 and
{code}
kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
{code}
shows billions of lag, but no partition is under replicated.

{code}
kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=ReplicaFetcherThread-0-2,topic=topicname,partition=1
{code}

shows 0 or - I guess - the offset as lag, depending on when I display the value.

I attached a video demonstrating the problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-402: Improve fairness in SocketServer processors

2018-12-13 Thread Harsha
Thanks for the details Rajini.  It would be great if you can add a few details 
to the KIP, on how many connections you are able to handle in your cluster with 
number 20 to give some context.

Thanks,
Harsha

On Tue, Dec 11, 2018, at 10:22 AM, Rajini Sivaram wrote:
> Hi Harsha,
> 
> Thanks for reviewing the KIP.
> 
> 1) Yes, agree that we also need a max.connections configuration per-broker.
> I was thinking of doing that in a separate KIP, but I could add that here
> as well.
> 2) The number of connections processed in each iteration doesn't feel like
> an externalizable config.It is not a limit on connection rate, it is simply
> ensuring that existing connections are processed by each Processor after
> atmost every 20 new connections. It will be hard to describe this
> configuration for users to enable configuring this in a way that is
> suitable for a connection flood since it would depend on the number of
> factors like existing connection count etc. It feels like we should come up
> with a number that works well. We have been running with this code for a
> while and so far haven't run into any noticeable degradations with 20.
> 
> 
> 
> On Tue, Dec 11, 2018 at 6:03 PM Harsha  wrote:
> 
> > Hi Rajini,
> >Overall KIP looks good to me.  Is it possible to use
> > max.connections config that we already have, althought its per IP.
> > But broker level max.connections would also be good have to guard against
> > DOS'ing  a broker.
> > Eitherway having constant like 20 without a configurable option doesn't
> > sound right and as the KIP states that one can use num.network.threads to
> > increase this capacity, it still not a viable option. Most of the time
> > users tend to keep network threads minimal and given this  configuration
> > will only need when a burst of requests comes through , allowing users to
> > choose that ceiling would be beneficial.  Can you add any details on why 20
> > is sufficient , with default num.network.threads with 3 if one broker is
> > getting more than 60 simultaneous connections  this would result in
> > perceived slower responses from client side right?
> >
> > Thanks,
> > Harsha
> >
> >
> > On Tue, Dec 11, 2018, at 2:48 AM, Rajini Sivaram wrote:
> > > Hi all,
> > >
> > > I have submitted a KIP to improve fairness in channel processing in
> > > SocketServer to protect brokers from connection storms:
> > >
> > >-
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-402%3A+Improve+fairness+in+SocketServer+processors
> > >
> > > Feedback and suggestions welcome.
> > >
> > > Thank you,
> > >
> > > Rajini
> >


[jira] [Resolved] (KAFKA-7655) Metadata spamming requests from Kafka Streams under some circumstances, potential DOS

2018-12-13 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7655.

   Resolution: Fixed
Fix Version/s: 2.0.2
   2.1.1
   2.2.0

> Metadata spamming requests from Kafka Streams under some circumstances, 
> potential DOS
> -
>
> Key: KAFKA-7655
> URL: https://issues.apache.org/jira/browse/KAFKA-7655
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Pasquale Vazzana
>Assignee: Pasquale Vazzana
>Priority: Major
>  Labels: performance, pull-request-available, security
> Fix For: 2.2.0, 2.1.1, 2.0.2
>
>
> There is a bug in the InternalTopicManager that makes the client believe that 
> a topic exists even though it doesn't, it occurs mostly in those few seconds 
> between when a topic is marked for deletion and when it is actually deleted. 
> In that timespan, the Broker gives inconsistent information, first it hides 
> the topic but then it refuses to create a new one therefore the client 
> believes the topic was existing already and it starts polling for metadata.
> The consequence is that the client goes into a loop where it polls for topic 
> metadata and if this is done by many threads it can take down a small cluster 
> or degrade greatly its performances.
> The real life scenario is probably a reset gone wrong. Reproducing the issue 
> is fairly simple, these are the steps:
>  * Stop a Kafka streams application
>  * Delete one of its changelog and the local store
>  * Restart the application immediately after the topic delete
>  * You will see the Kafka streams application hanging after the bootstrap 
> saying something like: INFO  Metadata - Cluster ID: 
>  
> I am attaching a patch that fixes the issue client side but my personal 
> opinion is that this should be tackled on the broker as well, metadata 
> requests seem expensive and it would be easy to craft a DDOS that can 
> potentially take down an entire cluster in seconds just by flooding the 
> brokers with metadata requests.
> The patch kicks in only when a topic that wasn't existing in the first call 
> to getNumPartitions triggers a TopicExistsException. When this happens it 
> forces the re-validation of the topic and if it still looks like doesn't 
> exists plan a retry with some delay, to give the broker the necessary time to 
> sort it out.
> I think this patch makes sense beside the above mentioned use case where a 
> topic it's not existing, because, even if the topic was actually created, the 
> client should not blindly trust it and should still re-validate it by 
> checking the number of partitions. IE: a topic can be created automatically 
> by the first request and then it would have the default partitions rather 
> than the expected ones.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7733) MockConsumer doesn't respect reset strategy

2018-12-13 Thread JIRA
Stig Rohde Døssing created KAFKA-7733:
-

 Summary: MockConsumer doesn't respect reset strategy
 Key: KAFKA-7733
 URL: https://issues.apache.org/jira/browse/KAFKA-7733
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Stig Rohde Døssing
Assignee: Stig Rohde Døssing


The MockConsumer throws OffsetOutOfRangeException if a record is behind the 
beginning offset. This is unlike the real consumer, which will use 
auto.offset.reset to decide whether to seek to the beginning, end or throw an 
exception.

It is convenient if the poll method does the offset reset properly, since it 
allows for testing cases like a consumer requesting offsets from a truncated 
log.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Need help on how to commit kafka consumer with other threads

2018-12-13 Thread SriPrakash Sai
I implemented one application using @kafkaListener. All listened events
will be kept in Redis for some time. WIth the help of Spring scheduler,
above events will be spanned across multiple threads and processed some
business logic.

Can you suggest me how to commit the events to kafka after processing the
business logic. Please let me know if you need any more details.

Regards
Prakash


Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-12-13 Thread John Roesler
Hi again, all,

Matthias, I agree with you.

Florian, thanks for your response.

I think your proposal is the best way to address the ask for hiding the
name() getter. But I'd like to question that ask and instead propose that
we just make the name() getter part of the public API.

The desire to "hide" the getters causes a lot of complexity in our code
base, and it will become completely impractical with the mixin strategy of
Named.

If we were to switch strategies back to mixing Named in to the control
objects rather than the functions, then the path forward becomes quite
clear.

On the other hand, it seems harmless for anyone who wants to be able to
query the name from a control object after setting it, so my vote would be
simply to keep the Named interface as:

public interface Named> {
  String name();
  T withName(String name);
}

Under this proposal, we only mix Named in to the control objects, which
means we have no need of default implementations anymore (because we can
update all the control objects concurrently with adding this interface to
them).

This does hinge on switching over to a control-object-only strategy, which
introduces the need to add about 50 new control object classes, which would
only serve to implement Named. As a middle ground, maybe we could just add
one generic control object class, like:

public class NamedOperation implements Named {
  private final String name;
  private NamedOperation(final String name) { this.name = name; }
  public static NamedOperation name(final String name) {
return new NamedOperation(name);
  }
  public String name() { return name; }
  public NamedOperation withName(final String name) {
return new NamedOperation(name);
  }
}

And then, we'd add overloads for all the methods that don't have control
objects already (for example, filter() ):

// existing
KStream filter(Predicate predicate);

// new
KStream filter(Predicate predicate,
NamedOperation named);

Additionally, in regard to Matthias's point about existing control objects
with naming semantics, they would extend Named (but not NamedOperation) for
uniformity.

You provided a good approach to hide the getter with your SettableName
class; I think what you proposed is the only way we could hide the name.
In the end, though, it's a lot of complexity added (control object class
hierarchy, inheritance, mutable state, internal casting) for something of
dubious value: to be able to hide the name from someone *after they
themselves have set it*.

Although it'll be a pain, perhaps Matthias's suggestion to enumerate all
the API methods is the best way to be sure we all agree on what's going to
happen.

Thanks again for wrangling with this issue,
-John

On Thu, Dec 13, 2018 at 9:03 AM Matthias J. Sax 
wrote:

> Just catching up on this discussion.
>
> My overall personal take is, that I am not a big fan of the interface
> `Named` that is used as a factory. I would rather prefer to add a
> control object parameter to all methods that don't have one yet. This
> KIP was started a while ago, and we added new naming capabilities in the
> meantime. Guozhang's example in the PR comment about naming in
> stream-stream join shows, that we might end up in a confusion situation
> for users if we use `Named`. Also, in 2.1, user can already name as
> repartition-/changelog-topics and stores. Thus, KIP-307 boils down to
> provide non-functional naming?
>
> Hence, for all methods that allow to specify names already, I don't see
> any reason to change them, but use the existing API to also name the
> processor(s) instead of allowing uses to specify a new name.
>
> About the inconsistency in method naming. I agree, that `as` is very
> generic and maybe not the best choice.
>
> I think it might be helpful, to have a table overview in the KIP, that
> list all existing static/non-static methods that allow to specify a
> name, plus a columns with the new suggested naming for those methods?
>
> Thoughts?
>
>
> -Matthias
>
>
> On 12/12/18 12:45 AM, Florian Hussonnois wrote:
> > Thank you very much for your feedbacks.
> >
> > Currently, there is still lot of discussions regarding the Named
> interface.
> > On the one hand we should provided consistency over the stream API and on
> > the other hand we should not break the semantic as John point it up.
> >
> > Guozhang, I'm sorry, but I'm little bit confused, maybe I missed
> something.
> > In your comment you have suggested that :
> > * Produced/Consumed/Suppressed should extends Named
> > * Named should have a private-package method to get the specified
> processor
> > name internally (processorName())
> > * Finally we should end up with something like :  Named -> XXX ->
> > XXXInternal or Named -> Produced -> ProducedInternal
> >
> > The objective behind that is to :
> > * consolidate the internal method processorName()
> > * consolidate the method withName that exists now existing into Produced,
> > Consumed and Suppressed.
> >
> > But, Named is an interface so we can't

Re: [DISCUSS] KIP-391: Allow Producing with Offsets for Cluster Replication

2018-12-13 Thread Edoardo Comar
Hi,
as we haven't got any more feedback, we'd like to start a vote on KIP-391 
on Monday

https://cwiki.apache.org/confluence/display/KAFKA/KIP-391%3A+Allow+Producing+with+Offsets+for+Cluster+Replication

--

Edoardo Comar

IBM Event Streams
IBM UK Ltd, Hursley Park, SO21 2JN


Edoardo Comar/UK/IBM wrote on 10/12/2018 10:20:06:

> From: Edoardo Comar/UK/IBM
> To: dev@kafka.apache.org
> Date: 10/12/2018 10:20
> Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for 
> Cluster Replication
> 
> (shameless bump) any additional feedback is welcome ... thanks!
> 
> Edoardo Comar  wrote on 27/11/2018 15:35:09:
> 
> > From: Edoardo Comar 
> > To: dev@kafka.apache.org
> > Date: 27/11/2018 15:35
> > Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for 
> > Cluster Replication
> > 
> > Hi Jason
> > 
> > we envisioned the replicator to replicate the __consumer_offsets topic 
too 
> > (although without producing-with-offsets to it!).
> > 
> > As there is no client-side implementation yet using the leader epoch, 
> > we could not yet see the impact of writing to the destination cluster 
> > __consumer_offsets records with an invalid leader epoch.
> > 
> > Also, applications might still use external storage mechanism for 
consumer 
> > offsets where the leader_epoch is missing.
> > 
> > Perhaps the replicator could - for the __consumer_offsets topic - just 

> > omit the leader_epoch field in the data sent to destination.
> > 
> > What do you think ?
> > 
> > 
> > Jason Gustafson  wrote on 27/11/2018 00:09:56:
> > 
> > > Another wrinkle to consider is KIP-320. If you are planning to 
replicate
> > > __consumer_offsets directly, then you will have to account for 
leader 
> > epoch
> > > information which is stored with the committed offsets. But I cannot 

> > think
> > > how it would be possible to replicate the leader epoch information 
in
> > > messages even if you can preserve offsets.
> > > 
> > > -Jason
> > > 
> > > On Mon, Nov 26, 2018 at 1:16 PM Mayuresh Gharat 
> > 
> > > wrote:
> > > 
> > > > Hi Edoardo,
> > > >
> > > > Thanks a lot for the KIP.
> > > >  I have a few questions/suggestions in addition to what Radai has 
> > mentioned
> > > > above :
> > > >
> > > >1. Is this meant only for 1:1 replication, for example one 
Kafka 
> > cluster
> > > >replicating to other, instead of having multiple Kafka clusters
> > > > mirroring
> > > >into one Kafka cluster?
> > > >2. Are we relying on exactly once produce in the replicator? If 

> > not, how
> > > >are retries handled in the replicator ?
> > > >3. What is the recommended value for inflight requests, here. 
Is it
> > > >suppose to be strictly 1, if yes, it would be great to mention 
that 
> > in
> > > > the
> > > >KIP.
> > > >4. How is unclean Leader election between source cluster and 
> > destination
> > > >cluster handled?
> > > >5. How are offsets resets in case of the replicator's consumer 
> > handled?
> > > >6. It would be good to explain the workflow in the KIP, with an
> > > >example,  regarding how this KIP will change the replication 
> > scenario
> > > > and
> > > >how it will benefit the consumer apps.
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > > On Mon, Nov 26, 2018 at 8:08 AM radai  

> > wrote:
> > > >
> > > > > a few questions:
> > > > >
> > > > > 1. how do you handle possible duplications caused by the 
"special"
> > > > > producer timing-out/retrying? are you explicitely relying on the
> > > > > "exactly once" sequencing?
> > > > > 2. what about the combination of log compacted topics + 
replicator
> > > > > downtime? by the time the replicator comes back up there might 
be
> > > > > "holes" in the source offsets (some msgs might have been 
compacted
> > > > > out)? how is that recoverable?
> > > > > 3. similarly, what if you try and fire up replication on a 
non-empty
> > > > > source topic? does the kip allow for offsets starting at some
> > > > > arbitrary X > 0 ? or would this have to be designed from the 
start.
> > > > >
> > > > > and lastly, since this KIP seems to be designed fro 
active-passive
> > > > > failover (there can be no produce traffic except the replicator)
> > > > > wouldnt a solution based on seeking to a time offset be more 
> > generic?
> > > > > your producers could checkpoint the last (say log append) 
timestamp 
> > of
> > > > > records theyve seen, and when restoring in the remote site seek 
to
> > > > > those timestamps (which will be metadata in their committed 
offsets) 
> > -
> > > > > assumming replication takes > 0 time you'd need to handle some 
dups,
> > > > > but every kafka consumer setup needs to know how to handle those
> > > > > anyway.
> > > > > On Fri, Nov 23, 2018 at 2:27 AM Edoardo Comar 
 
> > wrote:
> > > > > >
> > > > > > Hi Stanislav
> > > > > >
> > > > > > > > The flag is needed to distinguish a batch with a desired 
base
> > > > offset
> > > > > > of

[jira] [Resolved] (KAFKA-2124) gradlew is not working on a fresh checkout

2018-12-13 Thread Grant Henke (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke resolved KAFKA-2124.

Resolution: Duplicate
  Assignee: Grant Henke

> gradlew is not working on a fresh checkout
> --
>
> Key: KAFKA-2124
> URL: https://issues.apache.org/jira/browse/KAFKA-2124
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Reporter: Jakob Homan
>Assignee: Grant Henke
>Priority: Major
>
> For a fresh checkout, the gradlew script is not working:
> {noformat}heimdallr 15:54 $ asfclone kafka
> Cloning into 'kafka'...
> remote: Counting objects: 25676, done.
> remote: Compressing objects: 100% (36/36), done.
> remote: Total 25676 (delta 5), reused 0 (delta 0), pack-reused 25627
> Receiving objects: 100% (25676/25676), 19.58 MiB | 4.29 MiB/s, done.
> Resolving deltas: 100% (13852/13852), done.
> Checking connectivity... done.
> /tmp/kafka /tmp
> /tmp
> ✔ /tmp
> heimdallr 15:54 $ cd kafka
> ✔ /tmp/kafka [trunk|✔]
> heimdallr 15:54 $ ./gradlew tasks
> Error: Could not find or load main class 
> org.gradle.wrapper.GradleWrapperMain{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-201: Rationalising Policy interfaces

2018-12-13 Thread Tom Bentley
Hi Anna,

Firstly, let me apologise again about having missed your previous emails
about this.

Thank you for the feedback. You raise some valid points about ambiguity.
The problem with pulling the metadata into CreateTopicRequest and
AlterTopicRequest is that you lose the benefit of being able to eaily write
a common policy across creation and alter cases. For example, with the
proposed design the policy maker could write code like this (forgive my
pseudo-Java)

public void validateCreateTopic(requestMetadata, ...) {
commonPolicy(requestMetadata.requestedState());
  }

  public void validateAlterTopic(requestMetadata, ...) {
commonPolicy(requestMetadata.requestedState());
  }

  private void commonPolicy(RequestedTopicState requestedState) {
// ...
  }

I think that's an important feature of the API because (I think) very often
the policy maker is interested in defining the universe of prohibited
configurations without really caring about whether the request is a create
or an alter. Having a single RequestedTopicState for both create and alter
means they can do that trivially in one place. Having different methods in
the two Request classes prevents this and forces the policy maker to pick
apart the different requestState objects before calling any common
method(s).

I think my intention at the time (and it's many months ago now, so I might
not have remembered fully) was that RequestedTopicState would basically
represent what the topic would look like after the requested changes were
applied (I accept this isn't how it's Javadoc'd in the KIP), rather than
representing the request itself. Thus if the request changed the assignment
of some of the partitions and the policy maker was interested in precisely
which partitions would be changed, and how, they would indeed have to
compute that for themselves by looking up the current topic state from the
cluster state and seeing how they differed. Indeed they'd have to do this
diff even to figure out that the user was requesting a change to the topic
assigned (or similarly for topic config, etc). To me this is acceptable
because I think most people writing such policies are just interested in
defining what is not allowed, so giving them a representation of the
proposed topic state which they can readily check against is the most
direct API. In this interpretation generatedReplicaAssignment() would just
be some extra metadata annotating whether any difference between the
current and proposed states was directly from the user, or generated on the
broker. You're right that it's ambiguous when the request didn't actually
change the assignment but I didn't envisage policy makers using it except
when the assignments differed anyway. To me it would be acceptable to
Javadoc this.

Given this interpretation of RequestedTopicState as "what the topic would
look like after the requested changes were applied" can you see any other
problems with the proposal? Or do you have use cases where the policy maker
is more interested in what the request is changing?

Kind regards,

Tom

On Fri, 7 Dec 2018 at 08:41, Tom Bentley  wrote:

> Hi Anna and Mickael,
>
> Sorry for remaining silent on this for so long. I should have time to look
> at this again next week.
>
> Kind regards,
>
> Tom
>
> On Mon, 3 Dec 2018 at 10:11, Mickael Maison 
> wrote:
>
>> Hi Tom,
>>
>> This is a very interesting KIP. If you are not going to continue
>> working on it, would it be ok for us to grab it and complete it?
>> Thanks
>> On Thu, Jun 14, 2018 at 7:06 PM Anna Povzner  wrote:
>> >
>> > Hi Tom,
>> >
>> > Just wanted to check what you think about the comments I made in my last
>> > message. I think this KIP is a big improvement to our current policy
>> > interfaces, and really hope we can get this KIP in.
>> >
>> > Thanks,
>> > Anna
>> >
>> > On Thu, May 31, 2018 at 3:29 PM, Anna Povzner 
>> wrote:
>> >
>> > > Hi Tom,
>> > >
>> > >
>> > > Thanks for the KIP. I am aware that the voting thread was started, but
>> > > wanted to discuss couple of concerns here first.
>> > >
>> > >
>> > > I think the coupling of
>> RequestedTopicState#generatedReplicaAssignment()
>> > > and TopicState#replicasAssignments() does not work well in case where
>> the
>> > > request deals only with a subset of partitions (e.g., add partitions)
>> or no
>> > > assignment at all (alter topic config). In particular:
>> > >
>> > > 1) Alter topic config use case: There is no replica assignment in the
>> > > request, and generatedReplicaAssignment()  returning either true or
>> false
>> > > is both misleading. The user can interpret this as assignment being
>> > > generated or provided by the user originally (e.g., on topic create),
>> while
>> > > I don’t think we track such thing.
>> > >
>> > > 2) On add partitions, we may have manual assignment for new
>> partitions.
>> > > What I understood from the KIP,  generatedReplicaAssignment() will
>> return
>> > > true or false based on whether new partitions were manually assig

Build failed in Jenkins: kafka-2.0-jdk8 #202

2018-12-13 Thread Apache Jenkins Server
See 


Changes:

[matthias] KAFKA-7655 Metadata spamming requests from Kafka Streams under some

--
[...truncated 436.12 KB...]

kafka.zk.ReassignPartitionsZNodeTest > testDecodeValidJson STARTED

kafka.zk.ReassignPartitionsZNodeTest > testDecodeValidJson PASSED

kafka.zk.KafkaZkClientTest > testZNodeChangeHandlerForDataChange STARTED

kafka.zk.KafkaZkClientTest > testZNodeChangeHandlerForDataChange PASSED

kafka.zk.KafkaZkClientTest > testCreateAndGetTopicPartitionStatesRaw STARTED

kafka.zk.KafkaZkClientTest > testCreateAndGetTopicPartitionStatesRaw PASSED

kafka.zk.KafkaZkClientTest > testLogDirGetters STARTED

kafka.zk.KafkaZkClientTest > testLogDirGetters PASSED

kafka.zk.KafkaZkClientTest > testSetGetAndDeletePartitionReassignment STARTED

kafka.zk.KafkaZkClientTest > testSetGetAndDeletePartitionReassignment PASSED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationsDeletion STARTED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationsDeletion PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion PASSED

kafka.zk.KafkaZkClientTest > testGetChildren STARTED

kafka.zk.KafkaZkClientTest > testGetChildren PASSED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset STARTED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset PASSED

kafka.zk.KafkaZkClientTest > testClusterIdMethods STARTED

kafka.zk.KafkaZkClientTest > testClusterIdMethods PASSED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testUpdateLeaderAndIsr STARTED

kafka.zk.KafkaZkClientTest > testUpdateLeaderAndIsr PASSED

kafka.zk.KafkaZkClientTest > testUpdateBrokerInfo STARTED

kafka.zk.KafkaZkClientTest > testUpdateBrokerInfo PASSED

kafka.zk.KafkaZkClientTest > testCreateRecursive STARTED

kafka.zk.KafkaZkClientTest > testCreateRecursive PASSED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData STARTED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods PASSED

kafka.zk.KafkaZkClientTest > testSetTopicPartitionStatesRaw STARTED

kafka.zk.KafkaZkClientTest > testSetTopicPartitionStatesRaw PASSED

kafka.zk.KafkaZkClientTest > testAclManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testAclManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testPreferredReplicaElectionMethods STARTED

kafka.zk.KafkaZkClientTest > testPreferredReplicaElectionMethods PASSED

kafka.zk.KafkaZkClientTest > testPropagateLogDir STARTED

kafka.zk.KafkaZkClientTest > testPropagateLogDir PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndStat STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndStat PASSED

kafka.zk.KafkaZkClientTest > testReassignPartitionsInProgress STARTED

kafka.zk.KafkaZkClientTest > testReassignPartitionsInProgress PASSED

kafka.zk.KafkaZkClientTest > testCreateTopLevelPaths STARTED

kafka.zk.KafkaZkClientTest > testCreateTopLevelPaths PASSED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationGetters STARTED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationGetters PASSED

kafka.zk.KafkaZkClientTest > testLogDirEventNotificationsDeletion STARTED

kafka.zk.KafkaZkClientTest > testLogDirEventNotificationsDeletion PASSED

kafka.zk.KafkaZkClientTest > testGetLogConfigs STARTED

kafka.zk.KafkaZkClientTest > testGetLogConfigs PASSED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods STARTED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath STARTED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath PASSED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath STARTED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicZNode STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicZNode PASSED

kafka.zk.KafkaZkClientTest > testDeletePath STARTED

kafka.zk.KafkaZkClientTest > testDeletePath PASSED

kafka.zk.KafkaZkClientTest > testGetBrokerMethods STARTED

kafka.zk.KafkaZkClientTest > testGetBrokerMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateTokenChangeNotification STARTED

kafka.zk.KafkaZkClientTest > testCreateTokenChangeNotification PASSED

kafka.zk.KafkaZkClientTest > testGetTopicsAndPartitions STARTED

kafka.zk.KafkaZkClientTest > testGetTopicsAndPartitions PASSED

kafka.zk.KafkaZkClientTest > testRegisterBrokerInfo STARTED

kafka.zk.KafkaZkClientTest > testRegisterBrokerInfo PASSED

kafka.zk.KafkaZkClientTest > testConsumerOffsetPath STARTED

kafka.zk.KafkaZkClientTest > testConsumerOffsetPath PASSED

kafka.zk.KafkaZkClientTest > testControllerManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testControlle

Build failed in Jenkins: kafka-trunk-jdk8 #3259

2018-12-13 Thread Apache Jenkins Server
See 


Changes:

[mjsax] KAFKA-7655 Metadata spamming requests from Kafka Streams under some

--
[...truncated 2.25 MB...]
kafka.coordinator.group.GroupCoordinatorTest > testValidJoinGroup PASSED

kafka.coordinator.group.GroupCoordinatorTest > 
shouldDelayRebalanceUptoRebalanceTimeout STARTED

kafka.coordinator.group.GroupCoordinatorTest > 
shouldDelayRebalanceUptoRebalanceTimeout PASSED

kafka.coordinator.group.GroupCoordinatorTest > testFetchOffsets STARTED

kafka.coordinator.group.GroupCoordinatorTest > testFetchOffsets PASSED

kafka.coordinator.group.GroupCoordinatorTest > 
testSessionTimeoutDuringRebalance STARTED

kafka.coordinator.group.GroupCoordinatorTest > 
testSessionTimeoutDuringRebalance PASSED

kafka.coordinator.group.GroupCoordinatorTest > testNewMemberJoinExpiration 
STARTED

kafka.coordinator.group.GroupCoordinatorTest > testNewMemberJoinExpiration 
PASSED

kafka.coordinator.group.GroupCoordinatorTest > testFetchTxnOffsetsWithAbort 
STARTED

kafka.coordinator.group.GroupCoordinatorTest > testFetchTxnOffsetsWithAbort 
PASSED

kafka.coordinator.group.GroupCoordinatorTest > testSyncGroupLeaderAfterFollower 
STARTED

kafka.coordinator.group.GroupCoordinatorTest > testSyncGroupLeaderAfterFollower 
PASSED

kafka.coordinator.group.GroupCoordinatorTest > testSyncGroupFromUnknownMember 
STARTED

kafka.coordinator.group.GroupCoordinatorTest > testSyncGroupFromUnknownMember 
PASSED

kafka.coordinator.group.GroupCoordinatorTest > testValidLeaveGroup STARTED

kafka.coordinator.group.GroupCoordinatorTest > testValidLeaveGroup PASSED

kafka.coordinator.group.GroupCoordinatorTest > testDescribeGroupInactiveGroup 
STARTED

kafka.coordinator.group.GroupCoordinatorTest > testDescribeGroupInactiveGroup 
PASSED

kafka.coordinator.group.GroupCoordinatorTest > 
testFetchTxnOffsetsIgnoreSpuriousCommit STARTED

kafka.coordinator.group.GroupCoordinatorTest > 
testFetchTxnOffsetsIgnoreSpuriousCommit PASSED

kafka.coordinator.group.GroupCoordinatorTest > testSyncGroupNotCoordinator 
STARTED

kafka.coordinator.group.GroupCoordinatorTest > testSyncGroupNotCoordinator 
PASSED

kafka.coordinator.group.GroupCoordinatorTest > testBasicFetchTxnOffsets STARTED

kafka.coordinator.group.GroupCoordinatorTest > testBasicFetchTxnOffsets PASSED

kafka.coordinator.group.GroupCoordinatorTest > 
shouldResetRebalanceDelayWhenNewMemberJoinsGroupInInitialRebalance STARTED

kafka.coordinator.group.GroupCoordinatorTest > 
shouldResetRebalanceDelayWhenNewMemberJoinsGroupInInitialRebalance PASSED

kafka.coordinator.group.GroupCoordinatorTest > 
testHeartbeatUnknownConsumerExistingGroup STARTED

kafka.coordinator.group.GroupCoordinatorTest > 
testHeartbeatUnknownConsumerExistingGroup PASSED

kafka.coordinator.group.GroupCoordinatorTest > testValidHeartbeat STARTED

kafka.coordinator.group.GroupCoordinatorTest > testValidHeartbeat PASSED

kafka.coordinator.group.GroupCoordinatorTest > 
testRequestHandlingWhileLoadingInProgress STARTED

kafka.coordinator.group.GroupCoordinatorTest > 
testRequestHandlingWhileLoadingInProgress PASSED

kafka.network.SocketServerTest > testGracefulClose STARTED

kafka.network.SocketServerTest > testGracefulClose PASSED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone STARTED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone PASSED

kafka.network.SocketServerTest > controlThrowable STARTED

kafka.network.SocketServerTest > controlThrowable PASSED

kafka.network.SocketServerTest > testRequestMetricsAfterStop STARTED

kafka.network.SocketServerTest > testRequestMetricsAfterStop PASSED

kafka.network.SocketServerTest > testConnectionIdReuse STARTED

kafka.network.SocketServerTest > testConnectionIdReuse PASSED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
STARTED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
PASSED

kafka.network.SocketServerTest > testProcessorMetricsTags STARTED

kafka.network.SocketServerTest > testProcessorMetricsTags PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIp STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIp PASSED

kafka.network.SocketServerTest > testConnectionId STARTED

kafka.network.SocketServerTest > testConnectionId PASSED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics STARTED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics PASSED

kafka.network.SocketServerTest > testNoOpAction STARTED

kafka.network.SocketServerTest > testNoOpAction PASSED

kafka.network.SocketServerTest > simpleRequest STARTED

kafka.network.SocketServerTest > simpleRequest PASSED

kafka.network.SocketServerTest > closingChannelException STARTED

kafka.network.SocketServerTest > closingChannelException PASS

[jira] [Resolved] (KAFKA-5335) Controller should batch updatePartitionReassignmentData() operation

2018-12-13 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-5335.
-
Resolution: Won't Do

> Controller should batch updatePartitionReassignmentData() operation
> ---
>
> Key: KAFKA-5335
> URL: https://issues.apache.org/jira/browse/KAFKA-5335
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently controller will update partition reassignment data every time a 
> partition in the reassignment is completed. It means that if user specifies a 
> huge reassignment znode of size 1 MB to move 10K partitions, controller will 
> need to write roughly 0.5 MB * 1 = 5 GB data to zookeeper in order to 
> complete this reassignment. This is because controller needs to write the 
> remaining partitions to the znode every time a partition is completely moved.
> This is problematic because such a huge reassignment may greatly slow down 
> Kafka controller. Note that partition reassignment doesn't necessarily cause 
> data movement between brokers because we may use it only to recorder the 
> replica list of partitions to evenly distribute preferred leader.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : kafka-2.1-jdk8 #83

2018-12-13 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-12-13 Thread Florian Hussonnois
Hi all,

Thanks again. I agree with your propositions.
Also IMHO, overloading all methods (filter, map) to accept a new control
object seems to provide a more natural development experience for users.

Actually, this was the first proposition for this KIP, but we have rejected
it because this solution led to adding a lot of new methods.
As you mentioned it, the API has evolve since the creation of this KIP -
some existing control objects already allow to customize internal names. We
should so keep on that strategy.

If everyone is OK with that, I will update the KIP and the PR accordingly;

Thanks.

Le jeu. 13 déc. 2018 à 18:08, John Roesler  a écrit :

> Hi again, all,
>
> Matthias, I agree with you.
>
> Florian, thanks for your response.
>
> I think your proposal is the best way to address the ask for hiding the
> name() getter. But I'd like to question that ask and instead propose that
> we just make the name() getter part of the public API.
>
> The desire to "hide" the getters causes a lot of complexity in our code
> base, and it will become completely impractical with the mixin strategy of
> Named.
>
> If we were to switch strategies back to mixing Named in to the control
> objects rather than the functions, then the path forward becomes quite
> clear.
>
> On the other hand, it seems harmless for anyone who wants to be able to
> query the name from a control object after setting it, so my vote would be
> simply to keep the Named interface as:
>
> public interface Named> {
>   String name();
>   T withName(String name);
> }
>
> Under this proposal, we only mix Named in to the control objects, which
> means we have no need of default implementations anymore (because we can
> update all the control objects concurrently with adding this interface to
> them).
>
> This does hinge on switching over to a control-object-only strategy, which
> introduces the need to add about 50 new control object classes, which would
> only serve to implement Named. As a middle ground, maybe we could just add
> one generic control object class, like:
>
> public class NamedOperation implements Named {
>   private final String name;
>   private NamedOperation(final String name) { this.name = name; }
>   public static NamedOperation name(final String name) {
> return new NamedOperation(name);
>   }
>   public String name() { return name; }
>   public NamedOperation withName(final String name) {
> return new NamedOperation(name);
>   }
> }
>
> And then, we'd add overloads for all the methods that don't have control
> objects already (for example, filter() ):
>
> // existing
> KStream filter(Predicate predicate);
>
> // new
> KStream filter(Predicate predicate,
> NamedOperation named);
>
> Additionally, in regard to Matthias's point about existing control objects
> with naming semantics, they would extend Named (but not NamedOperation) for
> uniformity.
>
> You provided a good approach to hide the getter with your SettableName
> class; I think what you proposed is the only way we could hide the name.
> In the end, though, it's a lot of complexity added (control object class
> hierarchy, inheritance, mutable state, internal casting) for something of
> dubious value: to be able to hide the name from someone *after they
> themselves have set it*.
>
> Although it'll be a pain, perhaps Matthias's suggestion to enumerate all
> the API methods is the best way to be sure we all agree on what's going to
> happen.
>
> Thanks again for wrangling with this issue,
> -John
>
> On Thu, Dec 13, 2018 at 9:03 AM Matthias J. Sax 
> wrote:
>
> > Just catching up on this discussion.
> >
> > My overall personal take is, that I am not a big fan of the interface
> > `Named` that is used as a factory. I would rather prefer to add a
> > control object parameter to all methods that don't have one yet. This
> > KIP was started a while ago, and we added new naming capabilities in the
> > meantime. Guozhang's example in the PR comment about naming in
> > stream-stream join shows, that we might end up in a confusion situation
> > for users if we use `Named`. Also, in 2.1, user can already name as
> > repartition-/changelog-topics and stores. Thus, KIP-307 boils down to
> > provide non-functional naming?
> >
> > Hence, for all methods that allow to specify names already, I don't see
> > any reason to change them, but use the existing API to also name the
> > processor(s) instead of allowing uses to specify a new name.
> >
> > About the inconsistency in method naming. I agree, that `as` is very
> > generic and maybe not the best choice.
> >
> > I think it might be helpful, to have a table overview in the KIP, that
> > list all existing static/non-static methods that allow to specify a
> > name, plus a columns with the new suggested naming for those methods?
> >
> > Thoughts?
> >
> >
> > -Matthias
> >
> >
> > On 12/12/18 12:45 AM, Florian Hussonnois wrote:
> > > Thank you very much for your feedbacks.
> > >
> > > Currently, there is still lot of discussions

Re: [DISCUSS] KIP-404: Add Kafka Connect configuration parameter for disabling WADL output on OPTIONS request

2018-12-13 Thread Randall Hauch
Thanks, Alex. The KIP looks good to me.

Randall

On Wed, Dec 12, 2018 at 10:08 PM Guozhang Wang  wrote:

> Alex,
>
> Thanks for putting up this KIP. The proposal lgtm.
>
> Guozhang
>
> On Wed, Dec 12, 2018 at 7:41 PM Oleksandr Diachenko  >
> wrote:
>
> > Hi all,
> >
> > I would like to start a discussing for the following KIP:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-404%3A+Add+Kafka+Connect+configuration+parameter+for+disabling+WADL+output+on+OPTIONS+request
> > .
> >
> > The KIP proposes to add a configuration parameter for Connect Worker,
> which
> > would allow to not expose WADL information in Connect REST api responces.
> >
> > Feedback is appreciated, thanks in advance.
> >
> > Regards, Alex.
> >
>
>
> --
> -- Guozhang
>


[jira] [Created] (KAFKA-7734) Metrics tags should use LinkedHashMap to guarantee ordering

2018-12-13 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-7734:


 Summary: Metrics tags should use LinkedHashMap to guarantee 
ordering
 Key: KAFKA-7734
 URL: https://issues.apache.org/jira/browse/KAFKA-7734
 Project: Kafka
  Issue Type: Improvement
  Components: metrics
Reporter: Guozhang Wang


Today we store metrics tags inside MetricName from various places, and many of 
them are using `HashMap`. However, for metrics reporters like JMXReporter, the 
mBeanName is constructed by looping over `metricName.tags().entrySet()` which 
does not guarantee ordering. This resulted a few places where the mBeanName 
string not as expected, e.g. we document the Streams cache metrics as 

{code}
kafka.streams:type=stream-record-cache-metrics,client-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)
{code}

However, what I've seen from JMXReporter is, for example:

{code}
kafka.streams:type=stream-record-cache-metrics,record-cache-id=all,client-id=streams-saak-test-client-StreamThread-1,task-id=1_3
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-382: MirrorMaker 2.0

2018-12-13 Thread Jun Rao
Hi, Ryanne,

Regarding the single connect cluster model, yes, the co-existence of a MM2
REST API and the nearly identical Connect API is one of my concerns.
Implementation wise, my understanding is that the producer URL in a
SourceTask is always obtained from the connect worker's configuration. So,
not sure how you would customize the producer URL for individual SourceTask
w/o additional support from the Connect framework.

Thanks,

Jun


On Mon, Dec 10, 2018 at 1:17 PM Ryanne Dolan  wrote:

> Jun, thanks for your time reviewing the KIP.
>
> > In a MirrorSourceConnector, it seems that the offsets of the source will
> be stored in a different cluster from the target cluster?
>
> Jan Filipiak raised this issue as well, and suggested that no state be
> tracked in the source cluster. I've since implemented MirrorSourceConnector
> accordingly. And actually, this issue coincides with another major weakness
> of legacy MirrorMaker: "rebalance storm". In both cases, the problem is due
> to MirrorMaker using high-level consumer groups for replication.
>
> MM2 does not use consumer groups at all, but instead manages its own
> partition assignments and offsets. MirrorSourceConnector monitors
> topic-partitions and assigns them to MirrorSourceTasks directly -- there
> are no high-level subscriptions and therefore no rebalances. Likewise,
> MirrorSourceConnector stores its own offsets in the target cluster, so no
> state information is lost if the source cluster disappears. Both of these
> features are facilitated by the Connect framework and were inspired by
> Uber's uReplicator.
>
> > If the single connect cluster model is indeed useful, it seems that we
> should support it in the general connect framework since it can be useful
> for managing other types connectors.
>
> Sönke Liebau suggested this as well. I've spent some time looking into
> this, and I do believe it would be possible to bring these features to
> Connect in general without breaking the existing APIs. For example, maybe a
> connector config could specify which worker to use as a property like
> worker.name=foo, and otherwise a default worker would be used. In this
> case, a "MirrorMaker cluster" would just be a Connect cluster with a
> pre-configured set of workers.
>
> My plan is to contribute MM2 and then help pull features from MM2 into
> Connect. I don't think it would make sense to prime Connect first, nor do I
> want to propose a bunch of changes to Connect in this one KIP. If the
> concern is primarily around the co-existence of a MM2 REST API and the
> nearly identical Connect API, perhaps it would make sense to split off the
> "MirrorMaker clusters" section of this KIP into a separate KIP aimed at
> Connect in general? Would love to hear your thoughts on this.
>
> > Could you provide a bit more details on the content of the heartbeat
> topic?
>
> At present the heartbeat is just a timestamp and the alias of the cluster
> of origin. This is more powerful than existing Connector-level metrics, as
> these heartbeats are themselves replicated and can be traced across
> multiple hops in the replication topology. I'll add this to the KIP.
>
> > Also, if this is useful, should we just add it add in the connect
> framework, instead of just mirror maker?
>
> Same deal, I'd love to see this, but I don't think we should try to prime
> Connect before adopting MM2.
>
> > RemoteClusterUtils. Since this is part of the public interface, could you
> document the public APIs?
>
> Will do, thanks.
>
> > source.cluster.bootstrap.servers/target.cluster.bootstrap.servers: Does a
> Source/Sink connect need both?
>
> Sort of. I'm using this to construct an AdminClient for topic ACL and
> configuration sync, since the Connect framework doesn't expose it. I intend
> to follow-up KIP-382 with a proposal to expose this info to Connectors.
> There's also KIP-158, but it deals with topic creation only.
>
> Thanks again for the feedback!
>
> Ryanne
>
>
>
> On Fri, Dec 7, 2018 at 6:22 PM Jun Rao  wrote:
>
> > Hi, Ryanne,
> >
> > Thanks for the KIP. At the high level, this looks like a reasonable
> > proposal. A few comments below.
> >
> > 1. About using a single connector cluster to manage connectors accessing
> > multiple Kafka clusters. It's good that you brought this up.  The
> following
> > are the tradeoffs that I see. The benefit of using a single connect
> cluster
> > is that it simplifies the management. There are a couple of potential
> > downsides.
> > (a) In a MirrorSourceConnector, it seems that the offsets of the source
> > will be stored in a different cluster from the target cluster? If the
> data
> > in the target Kafka cluster is lost (say the whole cluster is wiped out),
> > one has to manually reset the offset to re-mirror the missing data. (2)
> If
> > the offsets are stored in a separate cluster from the produced data, it
> > prevents the connector from running features such as EOS since currently
> > EOS doesn't span Kafka clusters. If the single conn

Build failed in Jenkins: kafka-trunk-jdk11 #157

2018-12-13 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] Fix the missing ApiUtils tests in streams module. (#6003)

--
[...truncated 2.24 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueW

Dose kafka connector rest aoi support multiple thread access?

2018-12-13 Thread tao tony
Dear teams,


We use Kafka-1.1.0 connector to load data,And start a connector using 
rest api by application.

Before we start a connector,we'll check it was not  existed and then 
create it. This was  encapsulated in a Quartz job.And each connector  
had a job.

We use spring resttemplate as below:

   ResponseEntity response = 
restTemplate.getForEntity(requestUrl, String.class);


But when the jobs running  at the same time, it often  throw two kinds  
exception: "404 Not Found " and " 409 Conflict ". "404 Not Found " is a 
normal exception,it meams the connector not existed.

I could not handle the " 409 Conflict " exception,it was happed when the 
connector service is running,so I could not judge it was a connector 
server exception or it was just the connector reject the request.

" 409 Conflict " also returned when I use curl to get connector list:

  curl -X GET http://172.17.5.203:8083/connectors
{"error_code":409,"message":"Cannot complete request momentarily due to 
stale configuration (typically caused by a concurrent config change)"}


Dear teams,how could I solve the problem?I'm not sure whether it was 
caused  by the connector rest api not support multiple thread access.


Thanks.