[jira] [Created] (KAFKA-7336) Kafka Connect source task when producing record with invalid topic name

2018-08-24 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-7336:
-

 Summary: Kafka Connect source task when producing record with 
invalid topic name
 Key: KAFKA-7336
 URL: https://issues.apache.org/jira/browse/KAFKA-7336
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
Reporter: Gunnar Morling


If a Kafka Connect source task returns a {{SourceRecord}} with an invalid topic 
name (e.g. "dbserver1.inventory.test@data"), that source task hangs (presumably 
indefinitely?) and doesn't continue it's polling loop. The log is flooded with 
this message:

{code}
connect_1| 2018-08-24 08:47:29,014 WARN   ||  [Producer 
clientId=producer-4] Error while fetching metadata with correlation id 833 : 
{dbserver1.inventory.test@data=INVALID_TOPIC_EXCEPTION}   
[org.apache.kafka.clients.NetworkClient]
{code}

The producer thread is stuck in the loop here:

{code}
KafkaProducer.waitOnMetadata(String, Integer, long) line: 938  
KafkaProducer.doSend(ProducerRecord, Callback) line: 823  
KafkaProducer.send(ProducerRecord, Callback) line: 803
WorkerSourceTask.sendRecords() line: 318
WorkerSourceTask.execute() line: 228
WorkerSourceTask(WorkerTask).doRun() line: 175  
WorkerSourceTask(WorkerTask).run() line: 219
Executors$RunnableAdapter.call() line: 511   
FutureTask.run() line: 266   
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149  
ThreadPoolExecutor$Worker.run() line: 624   
Thread.run() line: 748  
{code}

This causes the task to remain in RUNNING state, but no further invocations of 
{{poll()}} are done.

Of course we'll work around this and make sure to not produce records with 
invalid topic names, but I think the source task should transition to FAILED 
state in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-336: Consolidate ExtendedSerializer/Serializer and ExtendedDeserializer/Deserializer

2018-08-24 Thread Viktor Somogyi-Vass
I think in the first draft I didn't provide an implementation for them as
it seemed very simple and straightforward. I looked up a couple of
implementations of the ExtendedSerializers on github and the general
behavior seems to be that they delegate to the 2 argument (headerless)
method:

https://github.com/khoitnm/practice-spring-kafka-grpc/blob/a6fc9b3395762c4889807baedd822f4653d5dcdd/kafka-common/src/main/java/org/tnmk/common/kafka/serialization/protobuf/ProtobufSerializer.java
https://github.com/hong-zhu/nxgen/blob/5cf1427d4e1a8f5c7fab47955af32a0d4f4873af/nxgen-kafka-client/src/main/java/nxgen/kafka/client/event/serdes/EventSerializer.java
https://github.com/jerry-jx/spring-kafka/blob/ac323ec5b8b9a0ca975db2f7322ff6878fce481a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java
https://github.com/enzobonggio/nonblocking-kafka/blob/bc1a379b2d9593b46cf9604063bc5b38e2785d19/src/main/java/com/example/kafka/producer/CustomJsonSerializer.java

Of course 4 example is not representative but it shows that these users
usually delegate to the "headerless" (2 argument) method. I've tried to
look it up on other code search sites but haven't had much luck so far.
Given these examples and the way they implement them I'd say it's more
common to delegate to the headerless method, that's why I think it's a good
approach for us too. Now having a default implementation for that is again
a good question. I think current use cases wouldn't change in either case
(unless we deprecate the headerless one).
For the new use cases it depends what do we want to propagate going
forward. Do we want only one method to exist or two? As Ismael highlighted
it might be confusing if we have 2 methods, both with default
implementation and in this case we want to push the 3 argument one for
users.

So I see three possible ways:
1.) Don't provide a default implementation for the headerless method. This
supports the current implementations and encourages the delegation style in
future implementations. This might be the simplest option.
2.) Provide a default implementation for the headerless method. This would
be a bit confusing, so we'd likely push the use of the 3 parameter method
and deprecate the headerless. This would however further litter the code
base with deprecation warnings as we're using the headerless method in a
lot of places (think of the current serializers/deserializers). So in this
case we would want to clean up the code base a little where we can and may
remove the headerless method entirely in Kafka 3. But they would hang
around until that point. I think in this case the implementation for the
headerless is a detail question as that is deprecated so we don't expect
new implementations to use that method.
If we decide to move this way, we have explored two options so far:
returning null / empty array or throwing exceptions. (And I honestly
started to like the latter as calling that with no real implementation is
really a programming error.)
3.) We can do it in multiple steps. In the first step we do 1 and later 2.
I think it would also make sense as the Kafka code base heavily uses the
headerless method still (think of the existing serializers/deserializers)
and it would give us time to eliminate/change those use cases.

Cheers,
Viktor

On Thu, Aug 23, 2018 at 11:55 PM Jason Gustafson  wrote:

> To clarify, what I am suggesting is to only remove the default
> implementation for these methods. So users would be required to implement
> serialize(topic, data) and deserialize(topic, data).
>
> -Jason
>
> On Thu, Aug 23, 2018 at 1:48 PM, Jason Gustafson 
> wrote:
>
> > Hey Viktor,
> >
> > Thinking about it a little more, I wonder if we should just not provide a
> > default method for serialize(topic, data) and deserialize(topic, data).
> > Implementing these methods is a trivial burden for users and it feels
> like
> > there's no good solution which allows both methods to have default
> > implementations.
> >
> > Also, ack on KIP-331. Thanks for the pointer.
> >
> > -Jason
> >
> > On Thu, Aug 23, 2018 at 12:30 PM, Viktor Somogyi-Vass <
> > viktorsomo...@gmail.com> wrote:
> >
> >> Hi Ismael,
> >>
> >> Regarding the deprecation of the 2 parameter method: should we do this
> >> with
> >> the Serializer interface as well?
> >>
> >> I've updated the "Rejected Alternatives" with a few.
> >> I've added this circular reference one too but actually there's a way
> >> (pretty heavyweight) by adding a guard class that prevents recursive
> >> invocation of either methods. I've tried this out but it seems to me an
> >> overshoot. So just for the sake of completeness I'll copy it here. :)
> >>
> >> public interface Deserializer extends Closeable {
> >>
> >> class Guard {
> >>
> >> private Set objects = Collections.synchronizedSet(new
> >> HashSet<>()); // might as well use concurrent hashmap
> >>
> >> private void methodCallInProgress(Object x) {
> >> objects.add(x);
> >>

Re: [VOTE] KIP-346 - Improve LogCleaner behavior on error

2018-08-24 Thread Stanislav Kozlovski
Hi Jun,

Yes, my intention was to have them per logDir but I failed to mention it in
the KIP. Updated.

Thanks,
Stan

On Fri, Aug 24, 2018 at 12:00 AM Jun Rao  wrote:

> Hi, Stan,
>
> Thanks for the KIP. Looks good to me overall. Just one comment below.
>
> uncleanable-partitions-count is per logDir, but uncleanable-bytes is not.
> Should we make them consistent?
>
> Jun
>
>
> On Wed, Aug 22, 2018 at 4:15 AM, Stanislav Kozlovski <
> stanis...@confluent.io
> > wrote:
>
> > Hi everybody,
> >
> > @Jason - I've updated the section. Thanks for the reminder
> >
> > I'm glad to say that the vote *has passed* with 3 binding votes (Jason,
> > Gwen, Harsha) and 6 non-binding votes (Dhruvil, Colin, Mickael,
> Manikumar,
> > Ray, Ted, Thomas).
> >
> > The PR is ready for review at https://github.com/apache/kafka/pull/5439
> >
> > On Tue, Aug 21, 2018 at 4:55 PM Jason Gustafson 
> > wrote:
> >
> > > +1 Thanks for the KIP! I'd suggest mentioning the configurations that
> > were
> > > previously proposed in the rejected alternatives section. We may
> > reconsider
> > > them in the future.
> > >
> > > On Mon, Aug 13, 2018 at 9:48 AM, Dhruvil Shah 
> > > wrote:
> > >
> > > > Thanks for the KIP, Stanislav! +1 (non-binding)
> > > >
> > > > - Dhruvil
> > > >
> > > > On Mon, Aug 13, 2018 at 9:39 AM Colin McCabe 
> > wrote:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > > On Tue, Aug 7, 2018, at 04:19, Stanislav Kozlovski wrote:
> > > > > > Hey everybody,
> > > > > > I'm starting a vote on KIP-346
> > > > > > <
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 346+-+Improve+LogCleaner+behavior+on+error
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best,
> > > > > > Stanislav
> > > > >
> > > >
> > >
> >
> >
> > --
> > Best,
> > Stanislav
> >
>


-- 
Best,
Stanislav


Re: [DISCUSS] KIP-363: Allow performance tools to print final results to output file

2018-08-24 Thread Viktor Somogyi-Vass
Hi Attila,

Thanks for the KIP, I think overall it looks good. I have three comments:
1. Would you mind adding an example? (Later on we'd need anyway for the
public doc.)
2. Do you want to add any 3rd party CSV reader/writer library or will you
implement that too?
3. What is the separator or is that configurable?

Cheers,
Viktor

On Fri, Aug 24, 2018 at 8:18 AM Kevin Lu  wrote:

> Hi Attila,
>
> Thanks for the KIP.
>
> I think this would be a useful feature. Every time I have to benchmark
> using these performance tools, I end up redirecting the output to a file
> anyways.
>
> Just a couple minor questions...
>
> 1. If the configured file already exists, what would be the outcome? My
> intuition is that the performance tool will spit out some type of error and
> quit as we do not want to accidentally overwrite files.
>
> 2. Will the performance tool still output directly to shell if this option
> is specified?
>
> Regards,
> Kevin
>
> On Wed, Aug 22, 2018 at 12:16 PM Attila Sasvári 
> wrote:
>
> > Hi all,
> >
> > I have created a minor KIP to allow consumer and producer performance
> tools
> > to print final results to output file in CSV format.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-363%3A+Allow+performance+tools+to+print+final+results+to+output+file
> >
> > Please take a look and share your thoughts!
> >
> > Thanks,
> > Attila
> >
>


Re: [DISCUSS] KIP-363: Allow performance tools to print final results to output file

2018-08-24 Thread Attila Sasvári
Thanks for your feedback, Kevin & Viktor! I will update the KIP next week.

@Kevin - I also had to write sed based one-liners when I ran performance
tests to extract results (that were displayed in some charts later).
1. Initially, I wanted to overwrite any exisisting file and document this
behaviour in the help message. Throwing an exception might be a better
idea.
2. Yes, tools shall continue to print out all the information to standard
output/error. Specifying the output-path option will only cause that the
final results (last line about the performance metrics) will be printed to
the given file.

@ Viktor
1. Thanks, I will add an example.
2. I would suggest the use of Apache Commons CSV library
(https://commons.apache.org/proper/commons-csv/) to avoid reinventing the
wheel. It is an implementation detail, so I would not add this to the KIP.
3. Delimiters/separators shall not be configurable (to keep things simple,
only CSV is supported). Fields within a record are separated by commas;
records are separated by line break(s). More info about the CSV format:
https://tools.ietf.org/html/rfc4180#section-1

Best,
- Attila

Viktor Somogyi-Vass  (időpont: 2018. aug. 24., P,
14:27) ezt írta:

> Hi Attila,
>
> Thanks for the KIP, I think overall it looks good. I have three comments:
> 1. Would you mind adding an example? (Later on we'd need anyway for the
> public doc.)
> 2. Do you want to add any 3rd party CSV reader/writer library or will you
> implement that too?
> 3. What is the separator or is that configurable?
>
> Cheers,
> Viktor
>
> On Fri, Aug 24, 2018 at 8:18 AM Kevin Lu  wrote:
>
> > Hi Attila,
> >
> > Thanks for the KIP.
> >
> > I think this would be a useful feature. Every time I have to benchmark
> > using these performance tools, I end up redirecting the output to a file
> > anyways.
> >
> > Just a couple minor questions...
> >
> > 1. If the configured file already exists, what would be the outcome? My
> > intuition is that the performance tool will spit out some type of error
> and
> > quit as we do not want to accidentally overwrite files.
> >
> > 2. Will the performance tool still output directly to shell if this
> option
> > is specified?
> >
> > Regards,
> > Kevin
> >
> > On Wed, Aug 22, 2018 at 12:16 PM Attila Sasvári 
> > wrote:
> >
> > > Hi all,
> > >
> > > I have created a minor KIP to allow consumer and producer performance
> > tools
> > > to print final results to output file in CSV format.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-363%3A+Allow+performance+tools+to+print+final+results+to+output+file
> > >
> > > Please take a look and share your thoughts!
> > >
> > > Thanks,
> > > Attila
> > >
> >
>


[VOTE] KIP-348 Deprecate null from SourceTask#poll()

2018-08-24 Thread Chia-Ping Tsai
hi kafka,

I'm starting a vote on KIP-348
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89065853

--
chia-ping


[jira] [Created] (KAFKA-7337) Enhance Producer Performance tool to generate keys

2018-08-24 Thread Zenifer Cheruveettil (JIRA)
Zenifer Cheruveettil created KAFKA-7337:
---

 Summary: Enhance Producer Performance tool to generate keys
 Key: KAFKA-7337
 URL: https://issues.apache.org/jira/browse/KAFKA-7337
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Affects Versions: 1.0.0
Reporter: Zenifer Cheruveettil


{{kafka-producer-perf-test.sh}} cannot generate messages with keys. It would be 
helpful to have the option to generate messages with keys, especially when 
using this tool together with applications such as Kafka streams applications 
which expect messages to have a key.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-358: Migrate Streams API to Duration instead of long ms times

2018-08-24 Thread John Roesler
Hi Nikolay,

First: I wanted to let you know that we have dropped the `grace(long)`
method from the Windows interface, but we do still need to transition the
same method on TimeWindows and JoinWindows (
https://github.com/apache/kafka/pull/5536)

I have also been thinking it would be nice to replace `Windows` with an
interface, but for different reasons. I think we can even do it without
breaking source compatibility (but it would break binary compatibility):
create a new interface `WindowSpec`, deprecate `Windows` and make it
implement `WindowSpec`, add a new method:
`KGroupedStream#windowedBy(WindowSpec)`, and deprecate the old one.

However, I don't think this would solve your problem, since the Windows
interface has two audiences: the DSL user and the implementer who wishes to
provide a new kind of windowing. I think we want to provide Duration to the
former, and long or Duration is fine for the latter. However, both of these
audiences are "external", so having an "internal" interface won't fit the
bill.

I think my last PR #5536 actually helps the situation quite a bit. Let's
forget about the deprecated members. Now, all the public members of Windows
are abstract methods, so Windows is effectively an interface now. Here's
how it looks:

public abstract class Windows {
public abstract Map windowsFor(final long timestamp);
public abstract long size();
public abstract long gracePeriodMs();
}

Notice that there is no part of this involved with the DSL. When you're
writing a topology, you don't call any of these methods. It's strictly an
interface that tells a Windows implementation what Streams expects from it.
A very simple implementation could have no builder methods at all and just
return fixed answers to these method calls (this is basically what
UnlimitedWindows does). It seems like, if we want to use long millis
internally, then we just need to leave Windows alone.

What we do want to change is the builder methods in TimeWindows,
JoinWindows, and UnlimitedWindows. For example, `TimeWindows#of(long)`
would become `TimeWindows#of(Duration)`, etc. These are the DSL methods.

Does that make sense?
-John



On Thu, Aug 23, 2018 at 8:59 AM Nikolay Izhikov  wrote:

> Hello, Mathias.
>
> Thanks for your feedback.
>
> > Thus, it might make sense to keep old and just add new ones?
>
> As far as I understand, we will keep old methods anyway to prevent public
> API backward compatibility.
> I agree with you, methods that used internally shouldn't be deprecated.
>
> > End users can use the "nicer" new ones, while we can still use the
> existing ones internally?
> > Not sure if it would be possible to keep the old ones without exposing
> them as public API?
>
> I think, when we decide to remove methods with `long` from public API, we
> can do the following:
>
> 1. Create an interface like `WindowsInternal`.
> 2. Change Windows to an interface.
> 3. Create package-private implementation `WindowsImpl`.
>
> ```
> package org.apache.kafka.streams.kstream.internals;
> public interface WindowsInternal {
> public long start();
> public long end();
> //etc...
> }
>
> package org.apache.kafka.streams.kstream;
> public interface Windows {
> public Instant start();
> public Instant end();
> //...
> }
>
> class WindowsImpl implements Windows,
> WindowsInternal {
>
> }
> ```
>
> So, in public API we will expose only `Windows` interface and internally
> we can use `WindowsInternal`
> But, of course, this will be huge changes in public API.
>
> > Let me know what you think about this.
>
> I think in this KIP we shouldn't deprecate methods, that are used
> internally.
> I changed it, now my proposal is just add new methods.
>
> Please, let me know if anything more need to be done.
>
> В Ср, 22/08/2018 в 17:29 -0700, Matthias J. Sax пишет:
> > Thanks a lot for the KIP.
> >
> > From my understanding, the idea of the KIP is to improve the public API
> > at DSL level. However, not all public methods listed are part of DSL
> > level API, but part of runtime API. Those methods are called during
> > processing and are on the hot code path. I am not sure, if we want to
> > update those methods. We should carefully think about this, and consider
> > to keep Long/long type to keep runtime overhead small. Note, that the
> > methods I mention are not required to specify a program using the DSL
> > and thus is questionable if the DSL API would be improved if we change
> > the methods.
> >
> > It's unfortunate, that some part of the public API stretch the DSL
> > builder part as well as the runtime part...
> >
> > This affects the following methods (please double check if I missed any):
> >
> >  - Windows#windowsFor()
> >  - Window#start()
> >  - Window#end()
> >  - JoinWindows#windowFor()
> >  - SessionWindows#inactivitiyGap()
> >  - TimeWindows#windowFor()
> >  - UnlimitedWindows#win

Re: [DISCUSS] KIP-358: Migrate Streams API to Duration instead of long ms times

2018-08-24 Thread John Roesler
Hey Matthias,

Thanks for pointing that out. I agree that we only really need to change
methods that are API-facing, and we probably want to avoid using
Duration/Instant for Streams-facing members.

Like I said in my last email, I think the whole Windows interface is
Streams-facing, and the builders we provide are otherwise API-facing.
Likewise, `Window` is Streams-facing, so start and end should not use
Duration. In SessionWindows, inactivityGap is Streams-facing.

I actually think that ProcessorContext#schedule() is API-facing, so it
should use Duration. The rationale is that streams processing doesn't call
this method, only implementer of Processor do. Does that seem right?

Also, it seems like  ReadOnlyWindowStore#fetch() (2x) and #fetchAll() are
API-facing (for IQ). When we call fetch() during processing, it's actually
`WindowStore#fetch()`. Maybe we should move "WindowStoreIterator
fetch(K key,
long timeFrom, long timeTo)" to the WindowStore interface and make all the
ReadOnlyWindowStore methods take Durations. And likewise with the
SessionStore interfaces.

What do you think?

Thanks,
-John




On Fri, Aug 24, 2018 at 10:51 AM John Roesler  wrote:

> Hi Nikolay,
>
> First: I wanted to let you know that we have dropped the `grace(long)`
> method from the Windows interface, but we do still need to transition the
> same method on TimeWindows and JoinWindows (
> https://github.com/apache/kafka/pull/5536)
>
> I have also been thinking it would be nice to replace `Windows` with an
> interface, but for different reasons. I think we can even do it without
> breaking source compatibility (but it would break binary compatibility):
> create a new interface `WindowSpec`, deprecate `Windows` and make it
> implement `WindowSpec`, add a new method:
> `KGroupedStream#windowedBy(WindowSpec)`, and deprecate the old one.
>
> However, I don't think this would solve your problem, since the Windows
> interface has two audiences: the DSL user and the implementer who wishes to
> provide a new kind of windowing. I think we want to provide Duration to the
> former, and long or Duration is fine for the latter. However, both of these
> audiences are "external", so having an "internal" interface won't fit the
> bill.
>
> I think my last PR #5536 actually helps the situation quite a bit. Let's
> forget about the deprecated members. Now, all the public members of Windows
> are abstract methods, so Windows is effectively an interface now. Here's
> how it looks:
>
> public abstract class Windows {
> public abstract Map windowsFor(final long timestamp);
> public abstract long size();
> public abstract long gracePeriodMs();
> }
>
> Notice that there is no part of this involved with the DSL. When you're
> writing a topology, you don't call any of these methods. It's strictly an
> interface that tells a Windows implementation what Streams expects from it.
> A very simple implementation could have no builder methods at all and just
> return fixed answers to these method calls (this is basically what
> UnlimitedWindows does). It seems like, if we want to use long millis
> internally, then we just need to leave Windows alone.
>
> What we do want to change is the builder methods in TimeWindows,
> JoinWindows, and UnlimitedWindows. For example, `TimeWindows#of(long)`
> would become `TimeWindows#of(Duration)`, etc. These are the DSL methods.
>
> Does that make sense?
> -John
>
>
>
> On Thu, Aug 23, 2018 at 8:59 AM Nikolay Izhikov 
> wrote:
>
>> Hello, Mathias.
>>
>> Thanks for your feedback.
>>
>> > Thus, it might make sense to keep old and just add new ones?
>>
>> As far as I understand, we will keep old methods anyway to prevent public
>> API backward compatibility.
>> I agree with you, methods that used internally shouldn't be deprecated.
>>
>> > End users can use the "nicer" new ones, while we can still use the
>> existing ones internally?
>> > Not sure if it would be possible to keep the old ones without exposing
>> them as public API?
>>
>> I think, when we decide to remove methods with `long` from public API, we
>> can do the following:
>>
>> 1. Create an interface like `WindowsInternal`.
>> 2. Change Windows to an interface.
>> 3. Create package-private implementation `WindowsImpl`.
>>
>> ```
>> package org.apache.kafka.streams.kstream.internals;
>> public interface WindowsInternal {
>> public long start();
>> public long end();
>> //etc...
>> }
>>
>> package org.apache.kafka.streams.kstream;
>> public interface Windows {
>> public Instant start();
>> public Instant end();
>> //...
>> }
>>
>> class WindowsImpl implements Windows,
>> WindowsInternal {
>>
>> }
>> ```
>>
>> So, in public API we will expose only `Windows` interface and internally
>> we can use `WindowsInternal`
>> But, of course, this will be huge changes in public API.
>>
>> > Let me know what you think about thi

Re: [DISCUSS] KIP-362: Dynamic Session Window Support

2018-08-24 Thread Guozhang Wang
Hello Lei,

Thanks for the proposal. I've just made a quick pass over it and there is a
question I have:

The session windows are defined per key, i.e. does that mean that each
incoming record of the key can dynamically change the gap of the window?
For example, say you have the following record for the same key coming in
order, where the first time is the timestamp of the record, and the second
value is the extracted gap value:

(10, 10), (19, 5), ...


When we receive the first record at time 10, the gap is extracted as 10,
and hence the window will be expired at 20 if no other record is received.
When we receive the second record at time 19, the gap is modified to 5, and
hence the window will be expired at 24 if no other record is received.


If that's the case, I'm wondering how out-of-order data can be handled
then, consider this stream:

(10, 10), (19, 5), (15, 3) ...

I.e. you received a late record indicating at timestamp 15, which shorten
the gap to 3. It means that the window SHOULD actually be expired at 18,
and hence the next record (19, 5) should be for a new session already.
Today Streams session window implementation does not do "window split", so
have you thought about how this can be extended?

Also since in your proposal each session window's gap value would be
different, we need to store this value along with each record then, how
would we store it, and what would be the upgrade path if it is not a
compatible change on disk storage etc?



Guozhang



On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen  wrote:

> Hi All,
>
> I created a KIP to add dynamic gap session window support to Kafka Streams
> DSL.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 362%3A+Support+dynamic+gap+session+window
>
> Please take a look,
>
> Thanks,
> Lei
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-358: Migrate Streams API to Duration instead of long ms times

2018-08-24 Thread John Roesler
Quick afterthought: I guess that `Window` is exposed to the API via
`Windowed` keys. I think it would be fine to not deprecate the `long` start
and end, but add `Instant` variants for people preferring that interface.

On Fri, Aug 24, 2018 at 11:10 AM John Roesler  wrote:

> Hey Matthias,
>
> Thanks for pointing that out. I agree that we only really need to change
> methods that are API-facing, and we probably want to avoid using
> Duration/Instant for Streams-facing members.
>
> Like I said in my last email, I think the whole Windows interface is
> Streams-facing, and the builders we provide are otherwise API-facing.
> Likewise, `Window` is Streams-facing, so start and end should not use
> Duration. In SessionWindows, inactivityGap is Streams-facing.
>
> I actually think that ProcessorContext#schedule() is API-facing, so it
> should use Duration. The rationale is that streams processing doesn't call
> this method, only implementer of Processor do. Does that seem right?
>
> Also, it seems like  ReadOnlyWindowStore#fetch() (2x) and #fetchAll() are
> API-facing (for IQ). When we call fetch() during processing, it's actually
> `WindowStore#fetch()`. Maybe we should move "WindowStoreIterator fetch(K
> key, long timeFrom, long timeTo)" to the WindowStore interface and make
> all the ReadOnlyWindowStore methods take Durations. And likewise with the
> SessionStore interfaces.
>
> What do you think?
>
> Thanks,
> -John
>
>
>
>
> On Fri, Aug 24, 2018 at 10:51 AM John Roesler  wrote:
>
>> Hi Nikolay,
>>
>> First: I wanted to let you know that we have dropped the `grace(long)`
>> method from the Windows interface, but we do still need to transition the
>> same method on TimeWindows and JoinWindows (
>> https://github.com/apache/kafka/pull/5536)
>>
>> I have also been thinking it would be nice to replace `Windows` with an
>> interface, but for different reasons. I think we can even do it without
>> breaking source compatibility (but it would break binary compatibility):
>> create a new interface `WindowSpec`, deprecate `Windows` and make it
>> implement `WindowSpec`, add a new method:
>> `KGroupedStream#windowedBy(WindowSpec)`, and deprecate the old one.
>>
>> However, I don't think this would solve your problem, since the Windows
>> interface has two audiences: the DSL user and the implementer who wishes to
>> provide a new kind of windowing. I think we want to provide Duration to the
>> former, and long or Duration is fine for the latter. However, both of these
>> audiences are "external", so having an "internal" interface won't fit the
>> bill.
>>
>> I think my last PR #5536 actually helps the situation quite a bit. Let's
>> forget about the deprecated members. Now, all the public members of Windows
>> are abstract methods, so Windows is effectively an interface now. Here's
>> how it looks:
>>
>> public abstract class Windows {
>> public abstract Map windowsFor(final long timestamp);
>> public abstract long size();
>> public abstract long gracePeriodMs();
>> }
>>
>> Notice that there is no part of this involved with the DSL. When you're
>> writing a topology, you don't call any of these methods. It's strictly an
>> interface that tells a Windows implementation what Streams expects from it.
>> A very simple implementation could have no builder methods at all and just
>> return fixed answers to these method calls (this is basically what
>> UnlimitedWindows does). It seems like, if we want to use long millis
>> internally, then we just need to leave Windows alone.
>>
>> What we do want to change is the builder methods in TimeWindows,
>> JoinWindows, and UnlimitedWindows. For example, `TimeWindows#of(long)`
>> would become `TimeWindows#of(Duration)`, etc. These are the DSL methods.
>>
>> Does that make sense?
>> -John
>>
>>
>>
>> On Thu, Aug 23, 2018 at 8:59 AM Nikolay Izhikov 
>> wrote:
>>
>>> Hello, Mathias.
>>>
>>> Thanks for your feedback.
>>>
>>> > Thus, it might make sense to keep old and just add new ones?
>>>
>>> As far as I understand, we will keep old methods anyway to prevent
>>> public API backward compatibility.
>>> I agree with you, methods that used internally shouldn't be deprecated.
>>>
>>> > End users can use the "nicer" new ones, while we can still use the
>>> existing ones internally?
>>> > Not sure if it would be possible to keep the old ones without exposing
>>> them as public API?
>>>
>>> I think, when we decide to remove methods with `long` from public API,
>>> we can do the following:
>>>
>>> 1. Create an interface like `WindowsInternal`.
>>> 2. Change Windows to an interface.
>>> 3. Create package-private implementation `WindowsImpl`.
>>>
>>> ```
>>> package org.apache.kafka.streams.kstream.internals;
>>> public interface WindowsInternal {
>>> public long start();
>>> public long end();
>>> //etc...
>>> }
>>>
>>> package org.apache.kafka.streams.kstream;
>>> public interface Windows {
>>> pu

Re: Request for contributor permissions

2018-08-24 Thread Matthias J. Sax
Done.

On 8/23/18 8:19 PM, 王又田 wrote:
> JIRA ID: tony80720
> 
> Cwiki ID:  Yu Tien Wang
> 
> Thanks in advance!
> 
> Tony
> 
> 
> 
> ---
> This email has been checked for viruses by Avast antivirus software.
> https://www.avast.com/antivirus
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-08-24 Thread Joel Koshy
I had some offline discussions with Lucas on this KIP. While it is much
more work than the original proposals, separating the control plane
entirely removes any interference with the data plane as summarized under
the rejected alternatives section.

Just a few minor comments:

   - Can you update the link to the discussion thread and vote thread?
   - The idle ratio metrics are fairly important for monitoring. I think we
   agreed that these would only apply to the data plane (otherwise there will
   always be some skew due to the controller plane). If so, can you clarify
   that somewhere in the doc?
   - Personally, I prefer the term CONTROL to CONTROLLER in the configs.
   CONTROLLER makes it sound like it is a special listener on the controller.
   CONTROL clarifies that this is a listener for receiving control plane
   requests from the controller.


Thanks,

Joel

On Wed, Aug 22, 2018 at 12:45 AM, Eno Thereska 
wrote:

> Ok thanks, if you guys are seeing this at LinkedIn then the motivation
> makes more sense.
>
> Eno
>
> On Tue, Aug 21, 2018 at 5:39 PM, Becket Qin  wrote:
>
> > Hi Eno,
> >
> > Thanks for the comments. This KIP is not really about improving the
> > performance in general. It is about ensuring the cluster state can still
> be
> > updated quickly even if the brokers are under heavy load.
> >
> > We have seen quite often that it took dozens of seconds for a broker to
> > process the requests sent by the controller when the cluster is under
> heavy
> > load. This leads to the issues Lucas mentioned in the motivation part.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > > On Aug 20, 2018, at 11:33 PM, Eno Thereska 
> > wrote:
> > >
> > > Hi folks,
> > >
> > > I looked at the previous numbers that Lucas provided (thanks!) but it's
> > > still not clear to me whether the performance benefits justify the
> added
> > > complexity. I'm looking for some intuition here (a graph would be great
> > but
> > > not required): for a small/medium/large cluster, what are the expected
> > > percentage of control requests today that will benefit from the change?
> > > It's a bit hard to go through this level of detail without knowing the
> > > expected end-to-end benefit. The best folks to answer this might be
> ones
> > > running such clusters, and ideally should pitch in with some data.
> > >
> > > Thanks
> > > Eno
> > >
> > > On Mon, Aug 20, 2018 at 7:29 AM, Becket Qin 
> > wrote:
> > >
> > >> Hi Lucas,
> > >>
> > >> In KIP-103, we introduced a convention to define and look up the
> > listeners.
> > >> So it would be good if the later KIPs can follow the same convention.
> > >>
> > >> From what I understand, the advertised.listeners is actually designed
> > for
> > >> our purpose, i.e. providing a list of listeners that can be used in
> > >> different cases. In KIP-103 it was used to separate internal traffic
> > from
> > >> the external traffic. It is not just for the user traffic or data
> > >> only. So adding
> > >> a controller listener is not repurposing the config. Also, ZK
> structure
> > is
> > >> only visible to brokers, the clients will still only see the listeners
> > they
> > >> are seeing today.
> > >>
> > >> For this KIP, we are essentially trying to separate the controller
> > traffic
> > >> from the inter-broker data traffic. So adding a new
> > >> listener.name.for.controller config seems reasonable. The behavior
> would
> > >> be:
> > >> 1. If the listener.name.for.controller is set, the broker-controller
> > >> communication will go through that listener.
> > >> 2. Otherwise, the controller traffic falls back to use
> > >> inter.broker.listener.name or inter.broker.security.protocol, which
> is
> > the
> > >> current behavior.
> > >>
> > >> Regarding updating the security protocol with one line change v.s
> > two-lines
> > >> change, I am a little confused, can you elaborate?
> > >>
> > >> Regarding the possibility of hurry and misreading. It is the system
> > admin's
> > >> responsibility to configure the right listener to ensure that
> different
> > >> kinds of traffic are using the correct endpoints. So I think it is
> > better
> > >> that we always follow the same of convention instead of doing it in
> > >> different ways.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >>
> > >>
> > >> On Fri, Aug 17, 2018 at 4:34 AM, Lucas Wang 
> > wrote:
> > >>
> > >>> Thanks for the review, Becket.
> > >>>
> > >>> (1) After comparing the two approaches, I still feel the current
> > writeup
> > >> is
> > >>> a little better.
> > >>> a. The current writeup asks for an explicit endpoint while reusing
> the
> > >>> existing "inter.broker.listener.name" with the exactly same
> semantic,
> > >>> and your proposed change asks for a new listener name for controller
> > >> while
> > >>> reusing the existing "advertised.listeners" config with a slight
> > semantic
> > >>> change since a new controller endpoint needs to be added to it.
> > >>> Hence conceptually the current w

unable to build schema registry

2018-08-24 Thread Simon Nunn

Getting the following error when trying to build the schema registry.  I have 
tried various versions of kafka, but not sure what I need to do.  Any help 
would be appreciated.


[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.6.1:compile (default-compile) 
on project kafka-schema-registry: Compilation failure: Compilation failure:
[ERROR] 
/C:/projects/schema-registry/core/src/main/java/io/confluent/kafka/schemaregistry/masterelector/kafka/SchemaRegistryCoordinator.java:[25,37]
 cannot find symbol
[ERROR]   symbol:   class Timer
[ERROR]   location: package org.apache.kafka.common.utils
[ERROR] 
/C:/projects/schema-registry/core/src/main/java/io/confluent/kafka/schemaregistry/masterelector/kafka/SchemaRegistryCoordinator.java:[207,57]
 cannot find symbol
[ERROR]   symbol:   class Timer
[ERROR]   location: class 
io.confluent.kafka.schemaregistry.masterelector.kafka.SchemaRegistryCoordinator
[ERROR] 
/C:/projects/schema-registry/core/src/main/java/io/confluent/kafka/schemaregistry/masterelector/kafka/SchemaRegistryCoordinator.java:[98,36]
 cannot find symbol
[ERROR]   symbol:   method timer(long)
[ERROR]   location: variable time of type org.apache.kafka.common.utils.Time
[ERROR] 
/C:/projects/schema-registry/core/src/main/java/io/confluent/kafka/schemaregistry/masterelector/kafka/SchemaRegistryCoordinator.java:[114,23]
 cannot find symbol
[ERROR]   symbol:   method timer(long)
[ERROR]   location: variable time of type org.apache.kafka.common.utils.Time
[ERROR] -> [Help 1]


Access to Jira

2018-08-24 Thread Jonathan Santilli
Hello,

am working on KAFKA-7165 ,
one of the steps is to assign the Jira issue to myself, but am not
allowed, I would like to have Jira access to do it, please.

Cheers!
-- 
Santilli Jonathan


Request for contributor permissions

2018-08-24 Thread 陳映彤
JIRA ID: Chloe chen
Cwiki ID: Chloe chen

Thanks in advance! 

從我的 iPhone 傳送

Request for contributor permissions

2018-08-24 Thread LEE, Tung-Yu
JIRA ID: leetungyu
Cwiki ID: leetungyu

Thanks a lot!


Re: Access to Jira

2018-08-24 Thread Jun Rao
Hi, Jonathan,

Thanks for your interest. Added you to the contributor list.

Jun

On Fri, Aug 24, 2018 at 6:18 AM, Jonathan Santilli <
jonathansanti...@gmail.com> wrote:

> Hello,
>
> am working on KAFKA-7165  >,
> one of the steps is to assign the Jira issue to myself, but am not
> allowed, I would like to have Jira access to do it, please.
>
> Cheers!
> --
> Santilli Jonathan
>


Re: [DISCUSS] KIP-358: Migrate Streams API to Duration instead of long ms times

2018-08-24 Thread Matthias J. Sax
It's tricky... :)

Some APIs have "dual use" as I mentioned in my first reply. I agree that
it would be good to avoid abstract class and use interfaces if possible.
As long as the change is source code compatible, it should be fine IMHO
-- we need to document binary incompatibility of course.

I think it's best, if the KIPs gets update with a proposal on how to
handle "dual use" parts. It's easier to discuss if it's written down IMHO.

For `ProcessorContext#schedule()`, you are right John: it's seems fine
to use `Duration`, as it won't be called often (usually only within
`Processor#init()`) -- I mixed it up with `Punctuator#punctuate(long)`.
However, thinking about this twice, we might even want to update both
methods. Punctuation callbacks don't happen every millisecond and thus
the overhead to use `Instance` should not be a problem.

@Nikolay: it seems the KIP does not mention `Punctuator#punctuate(long)`
-- should we add it?


-Matthias


On 8/24/18 10:11 AM, John Roesler wrote:
> Quick afterthought: I guess that `Window` is exposed to the API via
> `Windowed` keys. I think it would be fine to not deprecate the `long` start
> and end, but add `Instant` variants for people preferring that interface.
> 
> On Fri, Aug 24, 2018 at 11:10 AM John Roesler  wrote:
> 
>> Hey Matthias,
>>
>> Thanks for pointing that out. I agree that we only really need to change
>> methods that are API-facing, and we probably want to avoid using
>> Duration/Instant for Streams-facing members.
>>
>> Like I said in my last email, I think the whole Windows interface is
>> Streams-facing, and the builders we provide are otherwise API-facing.
>> Likewise, `Window` is Streams-facing, so start and end should not use
>> Duration. In SessionWindows, inactivityGap is Streams-facing.
>>
>> I actually think that ProcessorContext#schedule() is API-facing, so it
>> should use Duration. The rationale is that streams processing doesn't call
>> this method, only implementer of Processor do. Does that seem right?
>>
>> Also, it seems like  ReadOnlyWindowStore#fetch() (2x) and #fetchAll() are
>> API-facing (for IQ). When we call fetch() during processing, it's actually
>> `WindowStore#fetch()`. Maybe we should move "WindowStoreIterator fetch(K
>> key, long timeFrom, long timeTo)" to the WindowStore interface and make
>> all the ReadOnlyWindowStore methods take Durations. And likewise with the
>> SessionStore interfaces.
>>
>> What do you think?
>>
>> Thanks,
>> -John
>>
>>
>>
>>
>> On Fri, Aug 24, 2018 at 10:51 AM John Roesler  wrote:
>>
>>> Hi Nikolay,
>>>
>>> First: I wanted to let you know that we have dropped the `grace(long)`
>>> method from the Windows interface, but we do still need to transition the
>>> same method on TimeWindows and JoinWindows (
>>> https://github.com/apache/kafka/pull/5536)
>>>
>>> I have also been thinking it would be nice to replace `Windows` with an
>>> interface, but for different reasons. I think we can even do it without
>>> breaking source compatibility (but it would break binary compatibility):
>>> create a new interface `WindowSpec`, deprecate `Windows` and make it
>>> implement `WindowSpec`, add a new method:
>>> `KGroupedStream#windowedBy(WindowSpec)`, and deprecate the old one.
>>>
>>> However, I don't think this would solve your problem, since the Windows
>>> interface has two audiences: the DSL user and the implementer who wishes to
>>> provide a new kind of windowing. I think we want to provide Duration to the
>>> former, and long or Duration is fine for the latter. However, both of these
>>> audiences are "external", so having an "internal" interface won't fit the
>>> bill.
>>>
>>> I think my last PR #5536 actually helps the situation quite a bit. Let's
>>> forget about the deprecated members. Now, all the public members of Windows
>>> are abstract methods, so Windows is effectively an interface now. Here's
>>> how it looks:
>>>
>>> public abstract class Windows {
>>> public abstract Map windowsFor(final long timestamp);
>>> public abstract long size();
>>> public abstract long gracePeriodMs();
>>> }
>>>
>>> Notice that there is no part of this involved with the DSL. When you're
>>> writing a topology, you don't call any of these methods. It's strictly an
>>> interface that tells a Windows implementation what Streams expects from it.
>>> A very simple implementation could have no builder methods at all and just
>>> return fixed answers to these method calls (this is basically what
>>> UnlimitedWindows does). It seems like, if we want to use long millis
>>> internally, then we just need to leave Windows alone.
>>>
>>> What we do want to change is the builder methods in TimeWindows,
>>> JoinWindows, and UnlimitedWindows. For example, `TimeWindows#of(long)`
>>> would become `TimeWindows#of(Duration)`, etc. These are the DSL methods.
>>>
>>> Does that make sense?
>>> -John
>>>
>>>
>>>
>>> On Thu, Aug 23, 2018 at 8:59 AM Nikolay Izhikov 
>>> wrote:
>>>
 Hello, Mathias.

 Thanks for your feedback.

Re: Request for contributor permissions

2018-08-24 Thread Jun Rao
Hi, Yingtung,

Thanks for your interest. Gave you permission to both jira and wiki.

Jun

On Thu, Aug 23, 2018 at 8:58 PM, 陳映彤  wrote:

> JIRA ID: Chloe chen
> Cwiki ID: Chloe chen
>
> Thanks in advance!
>
> 從我的 iPhone 傳送


Re: Request for contributor permissions

2018-08-24 Thread Jun Rao
Hi, Tung-Yu,

Thanks for your interest. Gave you permission to both jira and wiki.

Jun

On Fri, Aug 24, 2018 at 12:41 AM, LEE, Tung-Yu  wrote:

> JIRA ID: leetungyu
> Cwiki ID: leetungyu
>
> Thanks a lot!
>


Re: [DISCUSS] KIP-351: Add --critical-partitions option to describe topics command

2018-08-24 Thread Kevin Lu
Hi All,

I am having some trouble re-formulating this KIP to output partitions that
are under the configured "min.insync.replicas" as I am not sure how to
reliably get the configured "min.insync.replicas" in all cases.

The challenge I am facing is when "min.insync.replicas" is configured to
non-default on the broker, and topics are created without
"min.insync.replicas" specified. Since the topic is created without
specifying "min.insync.replicas", then the value is not saved in Zookeeper
and it is directly used by the brokers.

The TopicCommand hits zookeeper so I am unable to get the configured value
without querying the brokers somehow...

Example:
- Start a broker with min.insync.replicas=2 in server.properties
- Use kafka-topics.sh to create topic without specifying min.insync.replicas

The ZK node /config/ for the created topic will only have direct overrides,
and will not have the broker's configured min.insync.replicas.

Any ideas on how to approach this?

Regards,
Kevin

On Mon, Aug 6, 2018 at 8:21 AM Kevin Lu  wrote:

> Hi Jason,
>
> Thanks for the response!
>
> I completely agree with you and Mickael about adding a
> --under-minisr-partitions option to match the existing metric. I will
> create a separate KIP to discuss the --under-minisr-partitions option. I
> believe there is a technical challenge with retrieving the
> min.insync.replicas configuration from zookeeper currently as it may also
> be stored as a broker configuration, but let me do some digging to confirm.
>
> I am going to modify KIP-351 to represent the the gap that you have
> mentioned (exactly at min.isr) as this is an important state that we
> specifically monitor to alert on.
>
> Any other thoughts?
>
> Regards,
> Kevin
>
> On Thu, Aug 2, 2018 at 11:23 PM Jason Gustafson 
> wrote:
>
>> Hey Kevin,
>>
>> Thanks for the KIP. I like Mickael's suggestion to
>> add --under-minisr-partitions since it fits with the metric we already
>> expose. It's also a good question whether there should be a separate
>> category for partitions which are right at min.isr. I'm reluctant to add
>> new categories, but I agree there might be a gap at the moment. Say you
>> have a replication factor of 3 and the min isr is set to 1. Our notion of
>> URP does not capture the difference between having an ISR down to a size
>> of
>> 1 and one down to a size of 2. The reason this might be significant is
>> that
>> a shrink of the ISR down to 2 may just be caused by a rolling restart or a
>> transient network blip. A shrink to 1, on the other hand, might be
>> indicative of a more severe problem and could be cause for a call from
>> pagerduty.
>>
>> -Jason
>>
>> On Thu, Aug 2, 2018 at 9:28 AM, Kevin Lu  wrote:
>>
>> > Hi Mickael,
>> >
>> > Thanks for the suggestion!
>> >
>> > Correct me if I am mistaken, but if a producer attempts to send to a
>> > partition that is under min ISR (and ack=all or -1) then the send will
>> fail
>> > with a NotEnoughReplicas or NotEnoughReplicasAfterAppend exception? At
>> this
>> > point, client-side has already suffered failure but the server-side is
>> > still fine for now?
>> >
>> > If the above is true, then this would be a FATAL case for producers.
>> >
>> > Would it be valuable to include the CRITICAL case where a topic
>> partition
>> > has exactly min ISR so that Kafka operators can take action so it does
>> not
>> > become FATAL? This could be in the same option or a new one.
>> >
>> > Thanks!
>> >
>> > Regards,
>> > Kevin
>> >
>> > On Thu, Aug 2, 2018 at 2:27 AM Mickael Maison > >
>> > wrote:
>> >
>> > > What about also adding a --under-minisr-partitions option?
>> > >
>> > > That would match the
>> > > "kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount"
>> > > broker metric and it's usually pretty relevant when investigating
>> > > issues
>> > >
>> > > On Thu, Aug 2, 2018 at 8:54 AM, Kevin Lu 
>> wrote:
>> > > > Hi friends!
>> > > >
>> > > > This thread is to discuss KIP-351
>> > > > <
>> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-351%3A
>> > +Add+--critical-partitions+option+to+describe+topics+command
>> > > >
>> > > > !
>> > > >
>> > > > I am proposing to add a --critical-partitions option to the describe
>> > > topics
>> > > > command that will only list out topic partitions that have 1 ISR
>> left
>> > > (RF >
>> > > > 1) as they would be in a critical state and need immediate
>> > > repartitioning.
>> > > >
>> > > > I wonder if the name "critical" is appropriate?
>> > > >
>> > > > Thoughts?
>> > > >
>> > > > Thanks!
>> > > >
>> > > > Regards,
>> > > > Kevin
>> > >
>> >
>>
>


Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-08-24 Thread Jason Gustafson
Hey All,

Nice to see some solid progress on this. It sounds like one of the
complications is allowing static and dynamic registration to coexist. I'm
wondering if we can do something like the following:

1. Statically registered members (those joining the group with a non-null `
member.name`) maintain a session with the coordinator just like dynamic
members.
2. If a session is active for a static member when a rebalance begins, then
basically we'll keep the current behavior. The rebalance will await the
static member joining the group.
3. If a static member does not have an active session, then the coordinator
will not wait for it to join, but will still include it in the rebalance.
The coordinator will forward the cached subscription information to the
leader and will cache the assignment after the rebalance completes. (Note
that we still have the generationId to fence offset commits from a static
zombie if the assignment changes.)
4. When a static member leaves the group or has its session expire, no
rebalance is triggered. Instead, we can begin a timer to expire the static
registration. This would be a longish timeout (like 30 minutes say).

So basically static members participate in all rebalances regardless
whether they have an active session. In a given rebalance, some of the
members may be static and some dynamic. The group leader can differentiate
the two based on the presence of the `member.name` (we have to add this to
the JoinGroupResponse). Generally speaking, we would choose leaders
preferentially from the active members that support the latest JoinGroup
protocol and are using static membership. If we have to choose a leader
with an old version, however, it would see all members in the group (static
or dynamic) as dynamic members and perform the assignment as usual.

Would that work?

-Jason


On Thu, Aug 23, 2018 at 5:26 PM, Guozhang Wang  wrote:

> Hello Boyang,
>
> Thanks for the updated proposal, a few questions:
>
> 1. Where will "change-group-timeout" be communicated to the broker? Will
> that be a new field in the JoinGroupRequest, or are we going to piggy-back
> on the existing session-timeout field (assuming that the original value
> will not be used anywhere in the static membership any more)?
>
> 2. "However, if the consumer takes longer than session timeout to return,
> we shall still trigger rebalance but it could still try to catch
> `change-group-timeout`.": what does this mean? I thought your proposal is
> that for static memberships, the broker will NOT trigger rebalance even
> after session-timeout has been detected, but only that after
> change-group-timeout
> which is supposed to be longer than session-timeout to be defined?
>
> 3. "A join group request with member.name set will be treated as
> `static-membership` strategy", in this case, how would the switch from
> dynamic to static happen, since whoever changed the member.name to
> not-null
> will be rejected, right?
>
> 4. "just erase the cached mapping, and wait for session timeout to trigger
> rebalance should be sufficient." this is also a bit unclear to me: who will
> erase the cached mapping? Since it is on the broker-side I assume that
> broker has to do it. Are you suggesting to use a new request for it?
>
> 5. "Halfway switch": following 3) above, if your proposal is basically to
> let "first join-request wins", and the strategy will stay as is until all
> members are gone, then this will also not happen since whoever used
> different strategy as the first guy who sends join-group request will be
> rejected right?
>
>
> Guozhang
>
>
> On Tue, Aug 21, 2018 at 9:28 AM, John Roesler  wrote:
>
> > This sounds good to me!
> >
> > Thanks for the time you've spent on it,
> > -John
> >
> > On Tue, Aug 21, 2018 at 12:13 AM Boyang Chen 
> wrote:
> >
> > > Thanks Matthias for the input. Sorry I was busy recently and haven't
> got
> > > time to update this thread. To summarize what we come up so far, here
> is
> > a
> > > draft updated plan:
> > >
> > >
> > > Introduce a new config called `member.name` which is supposed to be
> > > provided uniquely by the consumer client. The broker will maintain a
> > cache
> > > with [key:member.name, value:member.id]. A join group request with
> > > member.name set will be treated as `static-membership` strategy, and
> > will
> > > reject any join group request without member.name. So this
> coordination
> > > change will be differentiated from the `dynamic-membership` protocol we
> > > currently have.
> > >
> > >
> > > When handling static join group request:
> > >
> > >   1.   The broker will check the membership to see whether this is a
> new
> > > member. If new, broker allocate a unique member id, cache the mapping
> and
> > > move to rebalance stage.
> > >   2.   Following 1, if this is an existing member, broker will not
> change
> > > group state, and return its cached member.id and current assignment.
> > > (unless this is leader, we shall trigger rebalance)
> > >   3.   Althou

Re: [DISCUSS] KIP-363: Make FunctionConversions private

2018-08-24 Thread John Roesler
I'm also in favor of this. I don't think it's controversial either. Should
we just move to a vote?

On Thu, Aug 23, 2018 at 7:01 PM Guozhang Wang  wrote:

> +1.
>
> On Thu, Aug 23, 2018 at 12:47 PM, Ted Yu  wrote:
>
> > +1
> >
> > In the Motivation section, you can quote the comment from pull request so
> > that reader doesn't have to click through.
> >
> > Cheers
> >
> > On Thu, Aug 23, 2018 at 12:13 PM Joan Goyeau  wrote:
> >
> > > Hi,
> > >
> > > As pointed out in this comment #5539 (comment)
> > >  the
> > > object FunctionConversions is only of internal use and therefore should
> > be
> > > private to the lib only so that we can do changes without going through
> > KIP
> > > like this one.
> > >
> > > KIP:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-363%3A+Make+
> > FunctionConversions+private
> > >
> > > Thanks
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Build failed in Jenkins: kafka-trunk-jdk10 #429

2018-08-24 Thread Joan Goyeau
Yeah it's was all good in the end :)

Thanks Ted

On Thu, 23 Aug 2018 at 21:42 Ted Yu  wrote:

> I ran streams unit tests as of
> commit 4156ea0a9bcca67d209fd3b43d2268c9abd5a0b5 .
>
> All tests passed locally.
>
> FYI
>
> On Thu, Aug 23, 2018 at 12:23 PM Joan Goyeau  wrote:
>
> > I'm looking into this one.
> >
> > On Thu, 23 Aug 2018 at 20:19 Apache Jenkins Server <
> > jenk...@builds.apache.org> wrote:
> >
> > > See <
> > >
> >
> https://builds.apache.org/job/kafka-trunk-jdk10/429/display/redirect?page=changes
> > > >
> > >
> > > Changes:
> > >
> > > [wangguoz] KAFKA-7316: Fix Streams Scala filter recursive call #5538
> > >
> > > --
> > > [...truncated 1.98 MB...]
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldNotHaveSameAssignmentOnAnyTwoHosts PASSED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldRebalanceTasksToClientsBasedOnCapacity STARTED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldRebalanceTasksToClientsBasedOnCapacity PASSED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > >
> > shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks
> > > STARTED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > >
> > shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks
> > > PASSED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > >
> > >
> >
> shouldNotAssignStandbyTaskReplicasWhenNoClientAvailableWithoutHavingTheTaskAssigned
> > > STARTED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > >
> > >
> >
> shouldNotAssignStandbyTaskReplicasWhenNoClientAvailableWithoutHavingTheTaskAssigned
> > > PASSED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldAssignEachActiveTaskToOneClientWhenMoreClientsThanTasks STARTED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldAssignEachActiveTaskToOneClientWhenMoreClientsThanTasks PASSED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldAssignTopicGroupIdEvenlyAcrossClientsWithStandByTasks STARTED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldAssignTopicGroupIdEvenlyAcrossClientsWithStandByTasks PASSED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldAssignTasksNotPreviouslyActiveToNewClient STARTED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldAssignTasksNotPreviouslyActiveToNewClient PASSED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldAssignOneActiveTaskToEachProcessWhenTaskCountSameAsProcessCount
> > > STARTED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldAssignOneActiveTaskToEachProcessWhenTaskCountSameAsProcessCount
> > > PASSED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldReBalanceTasksAcrossClientsWhenCapacityLessThanTaskCount
> STARTED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldReBalanceTasksAcrossClientsWhenCapacityLessThanTaskCount PASSED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldBalanceActiveAndStandbyTasksAcrossAvailableClients STARTED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldBalanceActiveAndStandbyTasksAcrossAvailableClients PASSED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldAssignActiveAndStandbyTasks STARTED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldAssignActiveAndStandbyTasks PASSED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > >
> shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks
> > > STARTED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > >
> shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks
> > > PASSED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest
> > > > shouldAssignTasksToClientWithPreviousStandbyTasks STARTED
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.assignment.Stic

[VOTE] KIP-363: Make FunctionConversions private

2018-08-24 Thread Joan Goyeau
Hi,

As pointed out in this comment #5539 (comment)
 "This
class was already defaulted to public visibility, and we can't retract it
now, without a KIP.", the object FunctionConversions is only of internal
use and therefore should be private to the lib only so that we can do
changes without going through KIP like this one.

Please make your vote.

On Fri, 24 Aug 2018 at 19:14 John Roesler  wrote:

> I'm also in favor of this. I don't think it's controversial either. Should
> we just move to a vote?
>
> On Thu, Aug 23, 2018 at 7:01 PM Guozhang Wang  wrote:
>
> > +1.
> >
> > On Thu, Aug 23, 2018 at 12:47 PM, Ted Yu  wrote:
> >
> > > +1
> > >
> > > In the Motivation section, you can quote the comment from pull request
> so
> > > that reader doesn't have to click through.
> > >
> > > Cheers
> > >
> > > On Thu, Aug 23, 2018 at 12:13 PM Joan Goyeau  wrote:
> > >
> > > > Hi,
> > > >
> > > > As pointed out in this comment #5539 (comment)
> > > > 
> the
> > > > object FunctionConversions is only of internal use and therefore
> should
> > > be
> > > > private to the lib only so that we can do changes without going
> through
> > > KIP
> > > > like this one.
> > > >
> > > > KIP:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-363%3A+Make+
> > > FunctionConversions+private
> > > >
> > > > Thanks
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: [VOTE] KIP-363: Make FunctionConversions private

2018-08-24 Thread Guozhang Wang
+1 from me (binding).

On Fri, Aug 24, 2018 at 11:24 AM, Joan Goyeau  wrote:

> Hi,
>
> As pointed out in this comment #5539 (comment)
>  "This
> class was already defaulted to public visibility, and we can't retract it
> now, without a KIP.", the object FunctionConversions is only of internal
> use and therefore should be private to the lib only so that we can do
> changes without going through KIP like this one.
>
> Please make your vote.
>
> On Fri, 24 Aug 2018 at 19:14 John Roesler  wrote:
>
> > I'm also in favor of this. I don't think it's controversial either.
> Should
> > we just move to a vote?
> >
> > On Thu, Aug 23, 2018 at 7:01 PM Guozhang Wang 
> wrote:
> >
> > > +1.
> > >
> > > On Thu, Aug 23, 2018 at 12:47 PM, Ted Yu  wrote:
> > >
> > > > +1
> > > >
> > > > In the Motivation section, you can quote the comment from pull
> request
> > so
> > > > that reader doesn't have to click through.
> > > >
> > > > Cheers
> > > >
> > > > On Thu, Aug 23, 2018 at 12:13 PM Joan Goyeau 
> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > As pointed out in this comment #5539 (comment)
> > > > > 
> > the
> > > > > object FunctionConversions is only of internal use and therefore
> > should
> > > > be
> > > > > private to the lib only so that we can do changes without going
> > through
> > > > KIP
> > > > > like this one.
> > > > >
> > > > > KIP:
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-363%3A+Make+
> > > > FunctionConversions+private
> > > > >
> > > > > Thanks
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-08-24 Thread Guozhang Wang
Hey Jason,

I like your idea to simplify the upgrade protocol to allow co-exist of
static and dynamic members. Admittedly it may make the coordinator-side
logic a bit more complex, but I think it worth doing it.

Originally I was trying to kill more birds with one stone with KIP-345,
e.g. to fix the multi-rebalance issue on starting up / shutting down a
multi-instance client (mentioned as case 1)/2) in my early email), and
hence proposing to have a pure static-membership protocol. But thinking
twice about it I now feel it may be too ambitious and worth fixing in
another KIP. With that, I think what you've proposed here is a good way to
go for KIP-345 itself.

Note there are a few details in your proposal we'd still need to figure out:

1. How this longish static member expiration timeout defined? Is it via a
broker, hence global config, or via a client config which can be
communicated to broker via JoinGroupRequest?

2. Assuming that for static members, LEAVE_GROUP request will not trigger a
rebalance immediately either, similar to session timeout, but only the
longer member expiration timeout, can we remove the internal "
internal.leave.group.on.close" config, which is a quick walk-around then?



Guozhang


On Fri, Aug 24, 2018 at 11:14 AM, Jason Gustafson 
wrote:

> Hey All,
>
> Nice to see some solid progress on this. It sounds like one of the
> complications is allowing static and dynamic registration to coexist. I'm
> wondering if we can do something like the following:
>
> 1. Statically registered members (those joining the group with a non-null `
> member.name`) maintain a session with the coordinator just like dynamic
> members.
> 2. If a session is active for a static member when a rebalance begins, then
> basically we'll keep the current behavior. The rebalance will await the
> static member joining the group.
> 3. If a static member does not have an active session, then the coordinator
> will not wait for it to join, but will still include it in the rebalance.
> The coordinator will forward the cached subscription information to the
> leader and will cache the assignment after the rebalance completes. (Note
> that we still have the generationId to fence offset commits from a static
> zombie if the assignment changes.)
> 4. When a static member leaves the group or has its session expire, no
> rebalance is triggered. Instead, we can begin a timer to expire the static
> registration. This would be a longish timeout (like 30 minutes say).
>
> So basically static members participate in all rebalances regardless
> whether they have an active session. In a given rebalance, some of the
> members may be static and some dynamic. The group leader can differentiate
> the two based on the presence of the `member.name` (we have to add this to
> the JoinGroupResponse). Generally speaking, we would choose leaders
> preferentially from the active members that support the latest JoinGroup
> protocol and are using static membership. If we have to choose a leader
> with an old version, however, it would see all members in the group (static
> or dynamic) as dynamic members and perform the assignment as usual.
>
> Would that work?
>
> -Jason
>
>
> On Thu, Aug 23, 2018 at 5:26 PM, Guozhang Wang  wrote:
>
> > Hello Boyang,
> >
> > Thanks for the updated proposal, a few questions:
> >
> > 1. Where will "change-group-timeout" be communicated to the broker? Will
> > that be a new field in the JoinGroupRequest, or are we going to
> piggy-back
> > on the existing session-timeout field (assuming that the original value
> > will not be used anywhere in the static membership any more)?
> >
> > 2. "However, if the consumer takes longer than session timeout to return,
> > we shall still trigger rebalance but it could still try to catch
> > `change-group-timeout`.": what does this mean? I thought your proposal is
> > that for static memberships, the broker will NOT trigger rebalance even
> > after session-timeout has been detected, but only that after
> > change-group-timeout
> > which is supposed to be longer than session-timeout to be defined?
> >
> > 3. "A join group request with member.name set will be treated as
> > `static-membership` strategy", in this case, how would the switch from
> > dynamic to static happen, since whoever changed the member.name to
> > not-null
> > will be rejected, right?
> >
> > 4. "just erase the cached mapping, and wait for session timeout to
> trigger
> > rebalance should be sufficient." this is also a bit unclear to me: who
> will
> > erase the cached mapping? Since it is on the broker-side I assume that
> > broker has to do it. Are you suggesting to use a new request for it?
> >
> > 5. "Halfway switch": following 3) above, if your proposal is basically to
> > let "first join-request wins", and the strategy will stay as is until all
> > members are gone, then this will also not happen since whoever used
> > different strategy as the first guy who sends join-group request will be
> > rejected ri

Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-08-24 Thread Jason Gustafson
Hey Guozhang,

Responses below:

Originally I was trying to kill more birds with one stone with KIP-345,
> e.g. to fix the multi-rebalance issue on starting up / shutting down a
> multi-instance client (mentioned as case 1)/2) in my early email), and
> hence proposing to have a pure static-membership protocol. But thinking
> twice about it I now feel it may be too ambitious and worth fixing in
> another KIP.


I was considering an extension to support pre-initialization of the static
members of the group, but I agree we should probably leave this problem for
future work.

1. How this longish static member expiration timeout defined? Is it via a
> broker, hence global config, or via a client config which can be
> communicated to broker via JoinGroupRequest?


I am not too sure. I tend to lean toward server-side configs because they
are easier to evolve. If we have to add something to the protocol, then
we'll be stuck with it forever.

2. Assuming that for static members, LEAVE_GROUP request will not trigger a
> rebalance immediately either, similar to session timeout, but only the
> longer member expiration timeout, can we remove the internal "
> internal.leave.group.on.close" config, which is a quick walk-around then?


Yeah, I hope we can ultimately get rid of it, but we may need it for
compatibility with older brokers. A related question is what should be the
behavior of the consumer if `member.name` is provided but the broker does
not support it? We could either fail or silently downgrade to dynamic
membership.

-Jason


On Fri, Aug 24, 2018 at 11:44 AM, Guozhang Wang  wrote:

> Hey Jason,
>
> I like your idea to simplify the upgrade protocol to allow co-exist of
> static and dynamic members. Admittedly it may make the coordinator-side
> logic a bit more complex, but I think it worth doing it.
>
> Originally I was trying to kill more birds with one stone with KIP-345,
> e.g. to fix the multi-rebalance issue on starting up / shutting down a
> multi-instance client (mentioned as case 1)/2) in my early email), and
> hence proposing to have a pure static-membership protocol. But thinking
> twice about it I now feel it may be too ambitious and worth fixing in
> another KIP. With that, I think what you've proposed here is a good way to
> go for KIP-345 itself.
>
> Note there are a few details in your proposal we'd still need to figure
> out:
>
> 1. How this longish static member expiration timeout defined? Is it via a
> broker, hence global config, or via a client config which can be
> communicated to broker via JoinGroupRequest?
>
> 2. Assuming that for static members, LEAVE_GROUP request will not trigger a
> rebalance immediately either, similar to session timeout, but only the
> longer member expiration timeout, can we remove the internal "
> internal.leave.group.on.close" config, which is a quick walk-around then?
>
>
>
> Guozhang
>
>
> On Fri, Aug 24, 2018 at 11:14 AM, Jason Gustafson 
> wrote:
>
> > Hey All,
> >
> > Nice to see some solid progress on this. It sounds like one of the
> > complications is allowing static and dynamic registration to coexist. I'm
> > wondering if we can do something like the following:
> >
> > 1. Statically registered members (those joining the group with a
> non-null `
> > member.name`) maintain a session with the coordinator just like dynamic
> > members.
> > 2. If a session is active for a static member when a rebalance begins,
> then
> > basically we'll keep the current behavior. The rebalance will await the
> > static member joining the group.
> > 3. If a static member does not have an active session, then the
> coordinator
> > will not wait for it to join, but will still include it in the rebalance.
> > The coordinator will forward the cached subscription information to the
> > leader and will cache the assignment after the rebalance completes. (Note
> > that we still have the generationId to fence offset commits from a static
> > zombie if the assignment changes.)
> > 4. When a static member leaves the group or has its session expire, no
> > rebalance is triggered. Instead, we can begin a timer to expire the
> static
> > registration. This would be a longish timeout (like 30 minutes say).
> >
> > So basically static members participate in all rebalances regardless
> > whether they have an active session. In a given rebalance, some of the
> > members may be static and some dynamic. The group leader can
> differentiate
> > the two based on the presence of the `member.name` (we have to add this
> to
> > the JoinGroupResponse). Generally speaking, we would choose leaders
> > preferentially from the active members that support the latest JoinGroup
> > protocol and are using static membership. If we have to choose a leader
> > with an old version, however, it would see all members in the group
> (static
> > or dynamic) as dynamic members and perform the assignment as usual.
> >
> > Would that work?
> >
> > -Jason
> >
> >
> > On Thu, Aug 23, 2018 at 5:26 PM, Guozhang Wang 
>

Build failed in Jenkins: kafka-trunk-jdk8 #2923

2018-08-24 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-5975; No response when deleting topics and

[github] MINOR: replace deprecated remove with delete (#5565)

--
[...truncated 2.47 MB...]

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testRegisterPersistentStore STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testRegisterPersistentStore PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered 
STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered 
PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldRestoreStoreWithSinglePutRestoreSpecification STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldRestoreStoreWithSinglePutRestoreSpecification PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldNotChangeOffsetsIfAckedOffsetsIsNull STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldNotChangeOffsetsIfAckedOffsetsIsNull PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCloseStateManagerWithOffsets STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCloseStateManagerWithOffsets PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForTopic STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForTopic PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeProcessorTopology STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeProcessorTopology PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeContext STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeContext PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCheckpointOffsetsWhenStateIsFlushed STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCheckpointOffsetsWhenStateIsFlushed PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenKeyDeserializationFails STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenKeyDeserializationFails PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeStateManager STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeStateManager PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForOtherTopic STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForOtherTopic PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenValueDeserializationFails STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenValueDeserializationFails PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenValueDeserializationFails STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenValueDeserializationFails PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testThroughputMetrics STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testThroughputMetrics PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testLatencyMetrics STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testLatencyMetrics PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testRemoveSensor STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testRemoveSensor PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testRemoveNullSensor STARTED

org.apache

Build failed in Jenkins: kafka-trunk-jdk10 #434

2018-08-24 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-7240: -total metrics in Streams are incorrect (#5467)

--
[...truncated 1.53 MB...]

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAppendNewMetadataToLogOnAddPartitionsWhenPartitionsAdded PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestOnEndTxnWhenTransactionalIdIsNull STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestOnEndTxnWhenTransactionalIdIsNull PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithCoordinatorLoadInProgressOnEndTxnWhenCoordinatorIsLoading 
STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithCoordinatorLoadInProgressOnEndTxnWhenCoordinatorIsLoading 
PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithSuccessOnAddPartitionsWhenStateIsCompleteAbort STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithSuccessOnAddPartitionsWhenStateIsCompleteAbort PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareAbortState
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareAbortState
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithErrorsNoneOnAddPartitionWhenNoErrorsAndPartitionsTheSame 
STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithErrorsNoneOnAddPartitionWhenNoErrorsAndPartitionsTheSame PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestOnEndTxnWhenTransactionalIdIsEmpty STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestOnEndTxnWhenTransactionalIdIsEmpty PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsEmpty
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsEmpty
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAppendPrepareAbortToLogOnEndTxnWhenStatusIsOngoingAndResultIsAbort STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAppendPrepareAbortToLogOnEndTxnWhenStatusIsOngoingAndResultIsAbort PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAcceptInitPidAndReturnNextPidWhenTransactionalIdIsNull STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAcceptInitPidAndReturnNextPidWhenTransactionalIdIsNull PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRemoveTransactionsForPartitionOnEmigration STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRemoveTransactionsForPartitionOnEmigration PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareCommitState
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareCommitState
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAbortExpiredTransactionsInOngoingStateAndBumpEpoch STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAbortExpiredTransactionsInOngoingStateAndBumpEpoch PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnInvalidTxnRequestOnEndTxnRequestWhenStatusIsCompleteCommitAndResultIsNotCommit
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnInvalidTxnRequestOnEndTxnRequestWhenStatusIsCompleteCommitAndResultIsNotCommit
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnOkOnEndTxnWhenStatusIsCompleteCommitAndResultIsCommit STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnOkOnEndTxnWhenStatusIsCompleteCommitAndResultIsCommit PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionsOnAddPartitionsWhenStateIsPrepareCommit 
STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionsOnAddPartitionsWhenStateIsPrepareCommit 
PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldIncrementEpochAndUpdateMetadataOnHandleInitPidWhenExistingCompleteTransaction
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldIncrementEpochAndUpdateMetadataOnHandleInitPidWhenExistingCompleteTransaction
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldGenerateNewProducerIdIfEpochsEx

Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-08-24 Thread Mike Freyberger
Jason,

Regarding step 4 in your proposal which suggests beginning a long timer (30 
minutes) when a static member leaves the group, would there also be the ability 
for an admin to force a static membership expiration?

I’m thinking that during particular types of outages or upgrades users would 
want forcefully remove a static member from the group. 

So the user would shut the consumer down normally, which wouldn’t trigger a 
rebalance. Then the user could use an admin CLI tool to force remove that 
consumer from the group, so the TopicPartitions that were previously owned by 
that consumer can be released.

At a high level, we need consumer groups to gracefully handle intermittent 
failures and permanent failures. Currently, the consumer group protocol handles 
permanent failures well, but does not handle intermittent failures well (it 
creates unnecessary rebalances). I want to make sure the overall solution here 
handles both intermittent failures and permanent failures, rather than 
sacrificing support for permanent failures in order to provide support for 
intermittent failures. 

Mike

Sent from my iPhone

> On Aug 24, 2018, at 3:03 PM, Jason Gustafson  wrote:
> 
> Hey Guozhang,
> 
> Responses below:
> 
> Originally I was trying to kill more birds with one stone with KIP-345,
>> e.g. to fix the multi-rebalance issue on starting up / shutting down a
>> multi-instance client (mentioned as case 1)/2) in my early email), and
>> hence proposing to have a pure static-membership protocol. But thinking
>> twice about it I now feel it may be too ambitious and worth fixing in
>> another KIP.
> 
> 
> I was considering an extension to support pre-initialization of the static
> members of the group, but I agree we should probably leave this problem for
> future work.
> 
> 1. How this longish static member expiration timeout defined? Is it via a
>> broker, hence global config, or via a client config which can be
>> communicated to broker via JoinGroupRequest?
> 
> 
> I am not too sure. I tend to lean toward server-side configs because they
> are easier to evolve. If we have to add something to the protocol, then
> we'll be stuck with it forever.
> 
> 2. Assuming that for static members, LEAVE_GROUP request will not trigger a
>> rebalance immediately either, similar to session timeout, but only the
>> longer member expiration timeout, can we remove the internal "
>> internal.leave.group.on.close" config, which is a quick walk-around then?
> 
> 
> Yeah, I hope we can ultimately get rid of it, but we may need it for
> compatibility with older brokers. A related question is what should be the
> behavior of the consumer if `member.name` is provided but the broker does
> not support it? We could either fail or silently downgrade to dynamic
> membership.
> 
> -Jason
> 
> 
>> On Fri, Aug 24, 2018 at 11:44 AM, Guozhang Wang  wrote:
>> 
>> Hey Jason,
>> 
>> I like your idea to simplify the upgrade protocol to allow co-exist of
>> static and dynamic members. Admittedly it may make the coordinator-side
>> logic a bit more complex, but I think it worth doing it.
>> 
>> Originally I was trying to kill more birds with one stone with KIP-345,
>> e.g. to fix the multi-rebalance issue on starting up / shutting down a
>> multi-instance client (mentioned as case 1)/2) in my early email), and
>> hence proposing to have a pure static-membership protocol. But thinking
>> twice about it I now feel it may be too ambitious and worth fixing in
>> another KIP. With that, I think what you've proposed here is a good way to
>> go for KIP-345 itself.
>> 
>> Note there are a few details in your proposal we'd still need to figure
>> out:
>> 
>> 1. How this longish static member expiration timeout defined? Is it via a
>> broker, hence global config, or via a client config which can be
>> communicated to broker via JoinGroupRequest?
>> 
>> 2. Assuming that for static members, LEAVE_GROUP request will not trigger a
>> rebalance immediately either, similar to session timeout, but only the
>> longer member expiration timeout, can we remove the internal "
>> internal.leave.group.on.close" config, which is a quick walk-around then?
>> 
>> 
>> 
>> Guozhang
>> 
>> 
>> On Fri, Aug 24, 2018 at 11:14 AM, Jason Gustafson 
>> wrote:
>> 
>>> Hey All,
>>> 
>>> Nice to see some solid progress on this. It sounds like one of the
>>> complications is allowing static and dynamic registration to coexist. I'm
>>> wondering if we can do something like the following:
>>> 
>>> 1. Statically registered members (those joining the group with a
>> non-null `
>>> member.name`) maintain a session with the coordinator just like dynamic
>>> members.
>>> 2. If a session is active for a static member when a rebalance begins,
>> then
>>> basically we'll keep the current behavior. The rebalance will await the
>>> static member joining the group.
>>> 3. If a static member does not have an active session, then the
>> coordinator
>>> will not wait for it to join, but will still includ

Re: [VOTE] KIP-336: Consolidate ExtendedSerializer/Serializer and ExtendedDeserializer/Deserializer

2018-08-24 Thread Jason Gustafson
Hey Viktor,

Good summary. I agree that option 1) seems like the simplest choice and, as
you note, we can always add the default implementation later. I'll leave
Ismael to make a case for the circular forwarding approach ;)

-Jason

On Fri, Aug 24, 2018 at 3:02 AM, Viktor Somogyi-Vass <
viktorsomo...@gmail.com> wrote:

> I think in the first draft I didn't provide an implementation for them as
> it seemed very simple and straightforward. I looked up a couple of
> implementations of the ExtendedSerializers on github and the general
> behavior seems to be that they delegate to the 2 argument (headerless)
> method:
>
> https://github.com/khoitnm/practice-spring-kafka-grpc/blob/
> a6fc9b3395762c4889807baedd822f4653d5dcdd/kafka-common/src/
> main/java/org/tnmk/common/kafka/serialization/protobuf/
> ProtobufSerializer.java
> https://github.com/hong-zhu/nxgen/blob/5cf1427d4e1a8f5c7fab47955af32a
> 0d4f4873af/nxgen-kafka-client/src/main/java/nxgen/kafka/
> client/event/serdes/EventSerializer.java
> https://github.com/jerry-jx/spring-kafka/blob/
> ac323ec5b8b9a0ca975db2f7322ff6878fce481a/spring-kafka/src/
> main/java/org/springframework/kafka/support/serializer/JsonSerializer.java
> https://github.com/enzobonggio/nonblocking-kafka/blob/
> bc1a379b2d9593b46cf9604063bc5b38e2785d19/src/main/java/com/
> example/kafka/producer/CustomJsonSerializer.java
>
> Of course 4 example is not representative but it shows that these users
> usually delegate to the "headerless" (2 argument) method. I've tried to
> look it up on other code search sites but haven't had much luck so far.
> Given these examples and the way they implement them I'd say it's more
> common to delegate to the headerless method, that's why I think it's a good
> approach for us too. Now having a default implementation for that is again
> a good question. I think current use cases wouldn't change in either case
> (unless we deprecate the headerless one).
> For the new use cases it depends what do we want to propagate going
> forward. Do we want only one method to exist or two? As Ismael highlighted
> it might be confusing if we have 2 methods, both with default
> implementation and in this case we want to push the 3 argument one for
> users.
>
> So I see three possible ways:
> 1.) Don't provide a default implementation for the headerless method. This
> supports the current implementations and encourages the delegation style in
> future implementations. This might be the simplest option.
> 2.) Provide a default implementation for the headerless method. This would
> be a bit confusing, so we'd likely push the use of the 3 parameter method
> and deprecate the headerless. This would however further litter the code
> base with deprecation warnings as we're using the headerless method in a
> lot of places (think of the current serializers/deserializers). So in this
> case we would want to clean up the code base a little where we can and may
> remove the headerless method entirely in Kafka 3. But they would hang
> around until that point. I think in this case the implementation for the
> headerless is a detail question as that is deprecated so we don't expect
> new implementations to use that method.
> If we decide to move this way, we have explored two options so far:
> returning null / empty array or throwing exceptions. (And I honestly
> started to like the latter as calling that with no real implementation is
> really a programming error.)
> 3.) We can do it in multiple steps. In the first step we do 1 and later 2.
> I think it would also make sense as the Kafka code base heavily uses the
> headerless method still (think of the existing serializers/deserializers)
> and it would give us time to eliminate/change those use cases.
>
> Cheers,
> Viktor
>
> On Thu, Aug 23, 2018 at 11:55 PM Jason Gustafson 
> wrote:
>
> > To clarify, what I am suggesting is to only remove the default
> > implementation for these methods. So users would be required to implement
> > serialize(topic, data) and deserialize(topic, data).
> >
> > -Jason
> >
> > On Thu, Aug 23, 2018 at 1:48 PM, Jason Gustafson 
> > wrote:
> >
> > > Hey Viktor,
> > >
> > > Thinking about it a little more, I wonder if we should just not
> provide a
> > > default method for serialize(topic, data) and deserialize(topic, data).
> > > Implementing these methods is a trivial burden for users and it feels
> > like
> > > there's no good solution which allows both methods to have default
> > > implementations.
> > >
> > > Also, ack on KIP-331. Thanks for the pointer.
> > >
> > > -Jason
> > >
> > > On Thu, Aug 23, 2018 at 12:30 PM, Viktor Somogyi-Vass <
> > > viktorsomo...@gmail.com> wrote:
> > >
> > >> Hi Ismael,
> > >>
> > >> Regarding the deprecation of the 2 parameter method: should we do this
> > >> with
> > >> the Serializer interface as well?
> > >>
> > >> I've updated the "Rejected Alternatives" with a few.
> > >> I've added this circular reference one too but actually there's a way
> > >> (pretty heavyweight) by a

Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-08-24 Thread Jason Gustafson
Hey Mike,

Yeah, that's a good point. A long "registration timeout" may not be a great
idea. Perhaps in practice you'd set it long enough to be able to detect a
failure and provision a new instance. Maybe on the order of 10 minutes is
more reasonable.

In any case, it's probably a good idea to have an administrative way to
force deregistration. One option is to extend the DeleteGroups API with a
list of members names.

-Jason



On Fri, Aug 24, 2018 at 2:21 PM, Mike Freyberger 
wrote:

> Jason,
>
> Regarding step 4 in your proposal which suggests beginning a long timer
> (30 minutes) when a static member leaves the group, would there also be the
> ability for an admin to force a static membership expiration?
>
> I’m thinking that during particular types of outages or upgrades users
> would want forcefully remove a static member from the group.
>
> So the user would shut the consumer down normally, which wouldn’t trigger
> a rebalance. Then the user could use an admin CLI tool to force remove that
> consumer from the group, so the TopicPartitions that were previously owned
> by that consumer can be released.
>
> At a high level, we need consumer groups to gracefully handle intermittent
> failures and permanent failures. Currently, the consumer group protocol
> handles permanent failures well, but does not handle intermittent failures
> well (it creates unnecessary rebalances). I want to make sure the overall
> solution here handles both intermittent failures and permanent failures,
> rather than sacrificing support for permanent failures in order to provide
> support for intermittent failures.
>
> Mike
>
> Sent from my iPhone
>
> > On Aug 24, 2018, at 3:03 PM, Jason Gustafson  wrote:
> >
> > Hey Guozhang,
> >
> > Responses below:
> >
> > Originally I was trying to kill more birds with one stone with KIP-345,
> >> e.g. to fix the multi-rebalance issue on starting up / shutting down a
> >> multi-instance client (mentioned as case 1)/2) in my early email), and
> >> hence proposing to have a pure static-membership protocol. But thinking
> >> twice about it I now feel it may be too ambitious and worth fixing in
> >> another KIP.
> >
> >
> > I was considering an extension to support pre-initialization of the
> static
> > members of the group, but I agree we should probably leave this problem
> for
> > future work.
> >
> > 1. How this longish static member expiration timeout defined? Is it via a
> >> broker, hence global config, or via a client config which can be
> >> communicated to broker via JoinGroupRequest?
> >
> >
> > I am not too sure. I tend to lean toward server-side configs because they
> > are easier to evolve. If we have to add something to the protocol, then
> > we'll be stuck with it forever.
> >
> > 2. Assuming that for static members, LEAVE_GROUP request will not
> trigger a
> >> rebalance immediately either, similar to session timeout, but only the
> >> longer member expiration timeout, can we remove the internal "
> >> internal.leave.group.on.close" config, which is a quick walk-around
> then?
> >
> >
> > Yeah, I hope we can ultimately get rid of it, but we may need it for
> > compatibility with older brokers. A related question is what should be
> the
> > behavior of the consumer if `member.name` is provided but the broker
> does
> > not support it? We could either fail or silently downgrade to dynamic
> > membership.
> >
> > -Jason
> >
> >
> >> On Fri, Aug 24, 2018 at 11:44 AM, Guozhang Wang 
> wrote:
> >>
> >> Hey Jason,
> >>
> >> I like your idea to simplify the upgrade protocol to allow co-exist of
> >> static and dynamic members. Admittedly it may make the coordinator-side
> >> logic a bit more complex, but I think it worth doing it.
> >>
> >> Originally I was trying to kill more birds with one stone with KIP-345,
> >> e.g. to fix the multi-rebalance issue on starting up / shutting down a
> >> multi-instance client (mentioned as case 1)/2) in my early email), and
> >> hence proposing to have a pure static-membership protocol. But thinking
> >> twice about it I now feel it may be too ambitious and worth fixing in
> >> another KIP. With that, I think what you've proposed here is a good way
> to
> >> go for KIP-345 itself.
> >>
> >> Note there are a few details in your proposal we'd still need to figure
> >> out:
> >>
> >> 1. How this longish static member expiration timeout defined? Is it via
> a
> >> broker, hence global config, or via a client config which can be
> >> communicated to broker via JoinGroupRequest?
> >>
> >> 2. Assuming that for static members, LEAVE_GROUP request will not
> trigger a
> >> rebalance immediately either, similar to session timeout, but only the
> >> longer member expiration timeout, can we remove the internal "
> >> internal.leave.group.on.close" config, which is a quick walk-around
> then?
> >>
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Fri, Aug 24, 2018 at 11:14 AM, Jason Gustafson 
> >> wrote:
> >>
> >>> Hey All,
> >>>
> >>> Nice to see some solid progress on

Re: [VOTE] KIP-336: Consolidate ExtendedSerializer/Serializer and ExtendedDeserializer/Deserializer

2018-08-24 Thread Ismael Juma
I'm OK with 1 too. It makes me a bit sad that we don't have a path for
removing the method without headers, but it seems like the simplest and
least confusing option (I am assuming that headers are not needed in the
serializers in the common case).

Ismael

On Fri, Aug 24, 2018 at 2:42 PM Jason Gustafson  wrote:

> Hey Viktor,
>
> Good summary. I agree that option 1) seems like the simplest choice and, as
> you note, we can always add the default implementation later. I'll leave
> Ismael to make a case for the circular forwarding approach ;)
>
> -Jason
>
> On Fri, Aug 24, 2018 at 3:02 AM, Viktor Somogyi-Vass <
> viktorsomo...@gmail.com> wrote:
>
> > I think in the first draft I didn't provide an implementation for them as
> > it seemed very simple and straightforward. I looked up a couple of
> > implementations of the ExtendedSerializers on github and the general
> > behavior seems to be that they delegate to the 2 argument (headerless)
> > method:
> >
> > https://github.com/khoitnm/practice-spring-kafka-grpc/blob/
> > a6fc9b3395762c4889807baedd822f4653d5dcdd/kafka-common/src/
> > main/java/org/tnmk/common/kafka/serialization/protobuf/
> > ProtobufSerializer.java
> > https://github.com/hong-zhu/nxgen/blob/5cf1427d4e1a8f5c7fab47955af32a
> > 0d4f4873af/nxgen-kafka-client/src/main/java/nxgen/kafka/
> > client/event/serdes/EventSerializer.java
> > https://github.com/jerry-jx/spring-kafka/blob/
> > ac323ec5b8b9a0ca975db2f7322ff6878fce481a/spring-kafka/src/
> >
> main/java/org/springframework/kafka/support/serializer/JsonSerializer.java
> > https://github.com/enzobonggio/nonblocking-kafka/blob/
> > bc1a379b2d9593b46cf9604063bc5b38e2785d19/src/main/java/com/
> > example/kafka/producer/CustomJsonSerializer.java
> >
> > Of course 4 example is not representative but it shows that these users
> > usually delegate to the "headerless" (2 argument) method. I've tried to
> > look it up on other code search sites but haven't had much luck so far.
> > Given these examples and the way they implement them I'd say it's more
> > common to delegate to the headerless method, that's why I think it's a
> good
> > approach for us too. Now having a default implementation for that is
> again
> > a good question. I think current use cases wouldn't change in either case
> > (unless we deprecate the headerless one).
> > For the new use cases it depends what do we want to propagate going
> > forward. Do we want only one method to exist or two? As Ismael
> highlighted
> > it might be confusing if we have 2 methods, both with default
> > implementation and in this case we want to push the 3 argument one for
> > users.
> >
> > So I see three possible ways:
> > 1.) Don't provide a default implementation for the headerless method.
> This
> > supports the current implementations and encourages the delegation style
> in
> > future implementations. This might be the simplest option.
> > 2.) Provide a default implementation for the headerless method. This
> would
> > be a bit confusing, so we'd likely push the use of the 3 parameter method
> > and deprecate the headerless. This would however further litter the code
> > base with deprecation warnings as we're using the headerless method in a
> > lot of places (think of the current serializers/deserializers). So in
> this
> > case we would want to clean up the code base a little where we can and
> may
> > remove the headerless method entirely in Kafka 3. But they would hang
> > around until that point. I think in this case the implementation for the
> > headerless is a detail question as that is deprecated so we don't expect
> > new implementations to use that method.
> > If we decide to move this way, we have explored two options so far:
> > returning null / empty array or throwing exceptions. (And I honestly
> > started to like the latter as calling that with no real implementation is
> > really a programming error.)
> > 3.) We can do it in multiple steps. In the first step we do 1 and later
> 2.
> > I think it would also make sense as the Kafka code base heavily uses the
> > headerless method still (think of the existing serializers/deserializers)
> > and it would give us time to eliminate/change those use cases.
> >
> > Cheers,
> > Viktor
> >
> > On Thu, Aug 23, 2018 at 11:55 PM Jason Gustafson 
> > wrote:
> >
> > > To clarify, what I am suggesting is to only remove the default
> > > implementation for these methods. So users would be required to
> implement
> > > serialize(topic, data) and deserialize(topic, data).
> > >
> > > -Jason
> > >
> > > On Thu, Aug 23, 2018 at 1:48 PM, Jason Gustafson 
> > > wrote:
> > >
> > > > Hey Viktor,
> > > >
> > > > Thinking about it a little more, I wonder if we should just not
> > provide a
> > > > default method for serialize(topic, data) and deserialize(topic,
> data).
> > > > Implementing these methods is a trivial burden for users and it feels
> > > like
> > > > there's no good solution which allows both methods to have default
> > > > implementations.
> 

Jenkins build is back to normal : kafka-trunk-jdk10 #435

2018-08-24 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk8 #2924

2018-08-24 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-7240: -total metrics in Streams are incorrect (#5467)

--
[...truncated 2.47 MB...]

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered 
PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldRestoreStoreWithSinglePutRestoreSpecification STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldRestoreStoreWithSinglePutRestoreSpecification PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldNotChangeOffsetsIfAckedOffsetsIsNull STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldNotChangeOffsetsIfAckedOffsetsIsNull PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCloseStateManagerWithOffsets STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCloseStateManagerWithOffsets PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForTopic STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForTopic PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeProcessorTopology STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeProcessorTopology PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeContext STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeContext PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCheckpointOffsetsWhenStateIsFlushed STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCheckpointOffsetsWhenStateIsFlushed PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenKeyDeserializationFails STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenKeyDeserializationFails PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeStateManager STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeStateManager PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForOtherTopic STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForOtherTopic PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenValueDeserializationFails STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenValueDeserializationFails PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenValueDeserializationFails STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenValueDeserializationFails PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testThroughputMetrics STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testThroughputMetrics PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testTotalMetricDoesntDecrease STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testTotalMetricDoesntDecrease PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testLatencyMetrics STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testLatencyMetrics PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testRemoveSensor STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testRemoveSensor PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testRemoveNullSensor STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testRemoveNullSensor PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testNullMetrics STARTED

org.apache.kafka.streams.pro

Re: [VOTE] KIP-363: Make FunctionConversions private

2018-08-24 Thread Attila Sasvári
Hi there,

There is a conflicting KIP with the same number, see
https://cwiki.apache.org/confluence/display/KAFKA/KIP-363%3A+Allow+performance+tools+to+print+final+results+to+output+file

Its discussion was started earlier, on August 23
https://www.mail-archive.com/dev@kafka.apache.org/msg91132.html and KIP
page already includes it:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

Please update KIP number to resolve the conflict.

Apart from this, +1 (non-binding) and thanks for the KIP!

Regards,
- Attila


Guozhang Wang  (időpont: 2018. aug. 24., P, 20:26) ezt
írta:

> +1 from me (binding).
>
> On Fri, Aug 24, 2018 at 11:24 AM, Joan Goyeau  wrote:
>
> > Hi,
> >
> > As pointed out in this comment #5539 (comment)
> >  "This
> > class was already defaulted to public visibility, and we can't retract it
> > now, without a KIP.", the object FunctionConversions is only of internal
> > use and therefore should be private to the lib only so that we can do
> > changes without going through KIP like this one.
> >
> > Please make your vote.
> >
> > On Fri, 24 Aug 2018 at 19:14 John Roesler  wrote:
> >
> > > I'm also in favor of this. I don't think it's controversial either.
> > Should
> > > we just move to a vote?
> > >
> > > On Thu, Aug 23, 2018 at 7:01 PM Guozhang Wang 
> > wrote:
> > >
> > > > +1.
> > > >
> > > > On Thu, Aug 23, 2018 at 12:47 PM, Ted Yu 
> wrote:
> > > >
> > > > > +1
> > > > >
> > > > > In the Motivation section, you can quote the comment from pull
> > request
> > > so
> > > > > that reader doesn't have to click through.
> > > > >
> > > > > Cheers
> > > > >
> > > > > On Thu, Aug 23, 2018 at 12:13 PM Joan Goyeau 
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > As pointed out in this comment #5539 (comment)
> > > > > >  >
> > > the
> > > > > > object FunctionConversions is only of internal use and therefore
> > > should
> > > > > be
> > > > > > private to the lib only so that we can do changes without going
> > > through
> > > > > KIP
> > > > > > like this one.
> > > > > >
> > > > > > KIP:
> > > > > >
> > > > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-363%3A+Make+
> > > > > FunctionConversions+private
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-363: Make FunctionConversions private

2018-08-24 Thread Ted Yu
+1

On Fri, Aug 24, 2018 at 5:17 PM Attila Sasvári  wrote:

> Hi there,
>
> There is a conflicting KIP with the same number, see
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-363%3A+Allow+performance+tools+to+print+final+results+to+output+file
>
> Its discussion was started earlier, on August 23
> https://www.mail-archive.com/dev@kafka.apache.org/msg91132.html and KIP
> page already includes it:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
>
> Please update KIP number to resolve the conflict.
>
> Apart from this, +1 (non-binding) and thanks for the KIP!
>
> Regards,
> - Attila
>
>
> Guozhang Wang  (időpont: 2018. aug. 24., P, 20:26) ezt
> írta:
>
> > +1 from me (binding).
> >
> > On Fri, Aug 24, 2018 at 11:24 AM, Joan Goyeau  wrote:
> >
> > > Hi,
> > >
> > > As pointed out in this comment #5539 (comment)
> > > 
> "This
> > > class was already defaulted to public visibility, and we can't retract
> it
> > > now, without a KIP.", the object FunctionConversions is only of
> internal
> > > use and therefore should be private to the lib only so that we can do
> > > changes without going through KIP like this one.
> > >
> > > Please make your vote.
> > >
> > > On Fri, 24 Aug 2018 at 19:14 John Roesler  wrote:
> > >
> > > > I'm also in favor of this. I don't think it's controversial either.
> > > Should
> > > > we just move to a vote?
> > > >
> > > > On Thu, Aug 23, 2018 at 7:01 PM Guozhang Wang 
> > > wrote:
> > > >
> > > > > +1.
> > > > >
> > > > > On Thu, Aug 23, 2018 at 12:47 PM, Ted Yu 
> > wrote:
> > > > >
> > > > > > +1
> > > > > >
> > > > > > In the Motivation section, you can quote the comment from pull
> > > request
> > > > so
> > > > > > that reader doesn't have to click through.
> > > > > >
> > > > > > Cheers
> > > > > >
> > > > > > On Thu, Aug 23, 2018 at 12:13 PM Joan Goyeau 
> > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > As pointed out in this comment #5539 (comment)
> > > > > > > <
> https://github.com/apache/kafka/pull/5539#discussion_r212380648
> > >
> > > > the
> > > > > > > object FunctionConversions is only of internal use and
> therefore
> > > > should
> > > > > > be
> > > > > > > private to the lib only so that we can do changes without going
> > > > through
> > > > > > KIP
> > > > > > > like this one.
> > > > > > >
> > > > > > > KIP:
> > > > > > >
> > > > > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-363%3A+Make+
> > > > > > FunctionConversions+private
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: Request for contributor permissions

2018-08-24 Thread LEE, Tung-Yu
Thanks!

Jun Rao  於 2018年8月25日 週六 01:40 寫道:

> Hi, Tung-Yu,
>
> Thanks for your interest. Gave you permission to both jira and wiki.
>
> Jun
>
> On Fri, Aug 24, 2018 at 12:41 AM, LEE, Tung-Yu  wrote:
>
>> JIRA ID: leetungyu
>> Cwiki ID: leetungyu
>>
>> Thanks a lot!
>>
>
>


Re: unable to build schema registry

2018-08-24 Thread Ted Yu
The first error was complaining about this class from Kafka clients:

clients/src/main/java/org/apache/kafka/common/utils/Timer.java

It seems you haven't installed kafka clients jar into local maven repo.
You can run the following command under kafka working dir:

./gradlew installAll

I just built schema-registry successfully (by first installing common and
rest-utils)

FYI

On Fri, Aug 24, 2018 at 10:30 AM Simon Nunn  wrote:

>
> Getting the following error when trying to build the schema registry.  I
> have tried various versions of kafka, but not sure what I need to do.  Any
> help would be appreciated.
>
>
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-compiler-plugin:3.6.1:compile
> (default-compile) on project kafka-schema-registry: Compilation failure:
> Compilation failure:
> [ERROR]
> /C:/projects/schema-registry/core/src/main/java/io/confluent/kafka/schemaregistry/masterelector/kafka/SchemaRegistryCoordinator.java:[25,37]
> cannot find symbol
> [ERROR]   symbol:   class Timer
> [ERROR]   location: package org.apache.kafka.common.utils
> [ERROR]
> /C:/projects/schema-registry/core/src/main/java/io/confluent/kafka/schemaregistry/masterelector/kafka/SchemaRegistryCoordinator.java:[207,57]
> cannot find symbol
> [ERROR]   symbol:   class Timer
> [ERROR]   location: class
> io.confluent.kafka.schemaregistry.masterelector.kafka.SchemaRegistryCoordinator
> [ERROR]
> /C:/projects/schema-registry/core/src/main/java/io/confluent/kafka/schemaregistry/masterelector/kafka/SchemaRegistryCoordinator.java:[98,36]
> cannot find symbol
> [ERROR]   symbol:   method timer(long)
> [ERROR]   location: variable time of type
> org.apache.kafka.common.utils.Time
> [ERROR]
> /C:/projects/schema-registry/core/src/main/java/io/confluent/kafka/schemaregistry/masterelector/kafka/SchemaRegistryCoordinator.java:[114,23]
> cannot find symbol
> [ERROR]   symbol:   method timer(long)
> [ERROR]   location: variable time of type
> org.apache.kafka.common.utils.Time
> [ERROR] -> [Help 1]
>


Jenkins build is back to normal : kafka-trunk-jdk8 #2925

2018-08-24 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-357: Add support to list ACLs per principal

2018-08-24 Thread Manikumar
Hi Satish,

Thanks for the review.

1. Currently we are just printing toSring() of Acl class. I prefer to have
similar output with or without "--principal" option.
2. Yes, we can accept multiple principals. Updated the KIP.

@all

If there are no more comments, I will start vote on this KIP early next week.


Thanks,

On Fri, Aug 24, 2018 at 11:29 AM Satish Duggana 
wrote:

> Hi Mani,
> Just a minor comment on the output of the command as given in KIP-357, you
> may want to remove "User:User1 has " as it is redundant  for each ACL.
> It may be good to accept multiple principals option to avoid running this
> script multiple times with each principal to achieve the same.
>
>
> >> sh kafka-acls.sh
> 
> --authorizer-properties zookeeper.connect=localhost:
> <
> https://cwiki.apache.org/confluence/display/KAFKA/zookeeper.connect=localhost
> :>
> 2181 --list --principal User:User1
> ACLs for principal `User:User1`
> Current ACLs for resource `Group:PREFIXED:TEST_GROUP`:
> User:User1 has Allow permission for operations: Read from hosts: *
>
> Current ACLs for resource `Topic:PREFIXED:TEST_TOPIC`:
> User:User1 has Allow permission for operations: Read from hosts: *
> User:User1 has Allow permission for operations: Create from hosts: *
> User:User1 has Allow permission for operations: Write from hosts: *
> User:User1 has Allow permission for operations: Describe from hosts: *
>
> Thanks,
> Satish.
>
>
> On Fri, Aug 24, 2018 at 1:13 AM, Harsha  wrote:
>
> > +1 (binding)
> >
> > Thanks,
> > Harsha
> >
> > On Wed, Aug 22, 2018, at 9:15 AM, Manikumar wrote:
> > > Hi Viktor,
> > > We already have a method in Authorizer interface to get acls for a
> given
> > > principal.
> > > We will use this method to fetch acls and filter the results for
> > requested
> > > Resources.
> > > Authorizer {
> > >def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]]
> > > }
> > > Currently AdminClient API doesn't have a API to fetch acls for a given
> > > principal.
> > > So while using AclCommand with AdminClient API (KIP-332), we just
> filter
> > > the results returned
> > > from describeAcls API. We can add new AdminClient API/new
> > > DescribeAclsRequest if required in future.
> > >
> > > Updated the KIP. Thanks for the review.
> > >
> > > Thanks,
> > >
> > > On Wed, Aug 22, 2018 at 5:53 PM Viktor Somogyi-Vass <
> > viktorsomo...@gmail.com>
> > > wrote:
> > >
> > > > Hi Manikumar,
> > > >
> > > > Implementation-wise is it just a filter over the returned ACL listing
> > or do
> > > > you plan to add new methods to the Authorizer as well?
> > > >
> > > > Thanks,
> > > > Viktor
> > > >
> > > > On Fri, Aug 17, 2018 at 9:18 PM Priyank Shah 
> > > > wrote:
> > > >
> > > > > +1(non-binding)
> > > > >
> > > > > Thanks.
> > > > > Priyank
> > > > >
> > > > > On 8/16/18, 6:01 AM, "Manikumar" 
> wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I have created a minor KIP to add support to list ACLs per
> > principal
> > > > > using
> > > > > AclCommand (kafka-acls.sh)
> > > > >
> > > > >
> > > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 357%3A++Add+support+to+list+ACLs+per+principal
> > > > >
> > > > > Please take a look.
> > > > >
> > > > > Thanks,
> > > > >
> > > > >
> > > > >
> > > >
> >
>