Re: [VOTE] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-29 Thread Bruno Cadonna

Hi all,

I did two minor modifications to the KIP.

- I removed the rather strict guarantee "Dead stream threads are removed 
from a Kafka Streams client at latest after the next call to 
KafkaStreams#addStreamThread() or KafkaStreams#removeStreamThread() 
following the transition to state DEAD."
Dead stream threads will be still removed, but the behavior will be less 
strict.


- Added a sentence that states that the Kafka Streams client will 
transit to ERROR if the last alive stream thread dies exceptionally. 
This corresponds to the current behavior.


I will not restart voting and keep the votes so far.

Best,
Bruno

On 22.09.20 01:19, John Roesler wrote:

I’m +1 also. Thanks, Bruno!
-John

On Mon, Sep 21, 2020, at 17:08, Guozhang Wang wrote:

Thanks Bruno. I'm +1 on the KIP.

On Mon, Sep 21, 2020 at 2:49 AM Bruno Cadonna  wrote:


Hi,

I would like to restart from zero the voting on KIP-663 that proposes to
add methods to the Kafka Streams client to add and remove stream threads
during execution.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads

Matthias, if you are still +1, please vote again.

Best,
Bruno

On 04.09.20 23:12, John Roesler wrote:

Hi Sophie,

Uh, oh, it's never a good sign when the discussion moves
into the vote thread :)

I agree with you, it seems like a good touch for
removeStreamThread() to return the name of the thread that
got removed, rather than a boolean flag. Maybe the return
value would be `null` if there is no thread to remove.

If we go that way, I'd suggest that addStreamThread() also
return the name of the newly created thread, or null if no
thread can be created right now.

I'm not completely sure if I think that callers of this
method would know exactly how many threads there are. Sure,
if a human being is sitting there looking at the metrics or
logs and decides to call the method, it would work out, but
I'd expect this kind of method to find its way into
automated tooling that reacts to things like current system
load or resource saturation. Those kinds of toolchains often
are part of a distributed system, and it's probably not that
easy to guarantee that the thread count they observe is
fully consistent with the number of threads that are
actually running. Therefore, an in-situ `int
numStreamThreads()` method might not be a bad idea. Then
again, it seems sort of optional. A caller can catch an
exception or react to a `null` return value just the same
either way. Having both add/remove methods behave similarly
is probably more valuable.

Thanks,
-John


On Thu, 2020-09-03 at 12:15 -0700, Sophie Blee-Goldman
wrote:

Hey, sorry for the late reply, I just have one minor suggestion. Since

we

don't
make any guarantees about which thread gets removed or allow the user to
specify, I think we should return either the index or full name of the
thread
that does get removed by removeThread().

I know you just updated the KIP to return true/false if there

are/aren't any

threads to be removed, but I think this would be more appropriate as an
exception than as a return type. I think it's reasonable to expect

users to

have some sense to how many threads are remaining, and not try to remove
a thread when there is none left. To me, that indicates something wrong
with the user application code and should be treated as an exceptional

case.

I don't think the same code clarify argument applies here as to the
addStreamThread() case, as there's no reason for an application to be
looping and retrying removeStreamThread()  since if that fails, it's

because

there are no threads left and thus it will continue to always fail. And

if

the
user actually wants to shut down all threads, they should just close the
whole application rather than call removeStreamThread() in a loop.

While I generally think it should be straightforward for users to track

how

many stream threads they have running, maybe it would be nice to add
a small utility method that does this for them. Something like

// Returns the number of currently alive threads
boolean runningStreamThreads();

On Thu, Sep 3, 2020 at 7:41 AM Matthias J. Sax 

wrote:



+1 (binding)

On 9/3/20 6:16 AM, Bruno Cadonna wrote:

Hi,

I would like to start the voting on KIP-663 that proposes to add

methods

to the Kafka Streams client to add and remove stream threads during
execution.





https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads


Best,
Bruno







--
-- Guozhang



Re: [DISCUSS] KIP-671: Shutdown Streams Application when appropriate exception is thrown

2020-09-29 Thread Bruno Cadonna

Hi John,

I totally agree with you and Walker. I also think that we should leave 
this as a problem for the future and that we should document this 
limitation.


Best,
Bruno

On 24.09.20 16:51, John Roesler wrote:

Hello all,

Thanks for bringing this up, Bruno. It’s a really good point that a 
disconnected node would miss the signal and then resurrect a single-node 
“zombie cluster” when it reconnects.

Offhand, I can’t think of a simple and reliable way to distinguish this case 
from one in which an operator starts a node manually after a prior shutdown 
signal. Can you? Right now, I’m inclined to agree with Walker that we should 
leave this as a problem for the future.

It should certainly be mentioned in the kip, and it also deserves special 
mention in our javadoc and html docs for this feature.

Thanks!
John

On Wed, Sep 23, 2020, at 17:49, Walker Carlson wrote:

Bruno,

I think that we can't guarantee that the message will get
propagated perfectly in every case of, say network partitioning, though it
will work for many cases. So I would say it's best effort and I will
mention it in the kip.

As for when to use it I think we can discuss if this will be
sufficient when we come to it, as long as we document its capabilities.

I hope this answers your question,

Walker

On Tue, Sep 22, 2020 at 12:33 AM Bruno Cadonna  wrote:


Walker,

I am sorry, but I still have a comment on the KIP although you have
already started voting.

What happens when a consumer of the group skips the rebalancing that
propagates the shutdown request? Do you give a guarantee that all Kafka
Streams clients are shutdown or is it best effort? If it is best effort,
I guess the proposed method might not be used in critical cases where
stopping record consumption may prevent or limit damage. I am not saying
that it must be a guarantee, but this question should be answered in the
KIP, IMO.

Best,
Bruno

On 22.09.20 01:14, Walker Carlson wrote:

The error code right now is the assignor error, 2 is coded for shutdown
but it could be expanded to encode the causes or for other errors that

need

to be communicated. For example we can add error code 3 to close the

thread

but leave the client in an error state if we choose to do so in the

future.


On Mon, Sep 21, 2020 at 3:43 PM Boyang Chen 
wrote:


Thanks for the KIP Walker.

In the KIP we mentioned "In order to communicate the shutdown request

from

one client to the others we propose to update the SubcriptionInfoData to
include a short field which will encode an error code.", is there a
dedicated error code that we should define here, or it is case-by-case?

On Mon, Sep 21, 2020 at 1:38 PM Walker Carlson 
wrote:


I am changing the name to "Add method to Shutdown entire Streams
Application" since we are no longer using an Exception, it seems more
appropriate.

Also it looks like the discussion is pretty much finished so I will be
calling it to a vote.

thanks,
Walker

On Sat, Sep 19, 2020 at 7:49 PM Guozhang Wang 

wrote:



Sounds good to me. I also feel that this call should be non-blocking

but

I

guess I was confused from the discussion thread that the API is

designed

in

a blocking fashion which contradicts with my perspective and hence I

asked

for clarification :)

Guozhang


On Wed, Sep 16, 2020 at 12:32 PM Walker Carlson <

wcarl...@confluent.io



wrote:


Hello Guozhang,

As for the logging I plan on having three logs. First, the client log

that

it is requesting an application shutdown, second, the leader log

processId

of the invoker, third, then the StreamRebalanceListener it logs that

it

is

closing because of an `stream.appShutdown`. Hopefully this will be

enough

to make the cause of the close clear.

I see what you mean about the name being dependent on the behavior of

the

method so I will try to clarify.  This is how I currently envision

the

call

working.

It is not an option to directly initiate a shutdown through a

StreamThread

object from a KafkaStreams object because "KafkaConsumer is not safe

for

multi-threaded access". So how it works is that the method in

KafkaStreams

finds the first alive thread and sets a flag in the StreamThread. The
StreamThread will receive the flag in its runloop then set the error

code

and trigger a rebalance, afterwards it will stop processing. After

the

KafkaStreams has set the flag it will return true and continue

running.

If

there are no alive threads the shutdown will fail and return false.

What do you think the blocking behavior should be? I think that the
StreamThread should definitely stop to prevent any of the corruption

we

are

trying to avoid by shutting down, but I don't see any advantage of

the

KafkaStreams call blocking.

You are correct to be concerned about the uncaught exception handler.

If

there are no live StreamThreads the rebalance will not be started at

all

and this would be a problem. However the user should be aware of this
because of the return of false and react appropri

Re: [DISCUSS] KIP-671: Shutdown Streams Application when appropriate exception is thrown

2020-09-29 Thread Bruno Cadonna

Hi Walker,

Thanks for updating the KIP!

1. I would add response REPLACE_STREAM_THREAD to the 
StreamsUncaughtExceptionHandlerResponse enum to start a new stream 
thread that replaces the failed one. I suspect you did not add it 
because it depends on KIP-663. A dependency to another unfinished KIP 
should not stop you from adding this response.


2. Why does the Kafka Streams client transit to NOT_RUNNING when it is 
shutdown due to SHUTDOWN_KAFKA_STREAMS_CLIENT and 
SHUTDOWN_KAFKA_STREAMS_APPLICATION? I would rather expect that it 
transits to ERROR, since we are exclusively talking about error cases 
now. I would also not emulate the current behavior of close(), since 
close() is not intended to be used in the error case due to deadlocks 
you could run into.


3. Since the motivation of the KIP changed quite a lot, I think you 
should remove KAFKA-4748 from the motivation or make it clear that this 
KIP does only cover the shutdown of the Kafka Streams application in the 
error case.


4. I would just overload method setUncaughtExceptionHandler() and not 
introduce a method with a new name.


5. I agree with Guozhang that we should deprecate the overload with the 
Java-specific handler. I am sure you wanted to deprecate the method and 
just forgot about it.


6. I agree with Guozhang that the RocksDB metrics recording thread 
should also be shut down. To be fair, when Walker asked me about it, I 
thought it is not strictly necessary to shut it down, but thinking about 
it again, it also does not make a lot of sense to keep it running, 
because the RocksDB metrics would have all be removed at that point.


7. I think we should provide a default implementation of the handler. 
However, the default implementation should just return 
SHUTDOWN_STREAM_THREAD which corresponds to the current behavior. If we 
want to provide a more elaborated default handler, I would propose to 
discuss that on a separate KIP to not block this KIP on that discussion.


Best,
Bruno

On 29.09.20 05:35, Guozhang Wang wrote:

Hello Walker,

Thanks for the updated KIP proposal. A few more comments below:

1. "The RocksDB metrics recording thread is not shutdown." Why it should
not be shut down in either client or application shutdown cases?

2. Should we deprecate the existing overloaded function with the java
UncaughtExceptionHandler?

3. Should we consider providing a default implementation of this handler
interface which is automatically set if not overridden by users, e.g. one
that would choose to SHUTDOWN_KAFKA_STREAMS_APPLICATION upon
MissingSourceTopicException in KIP-662.


Guozhang


On Mon, Sep 28, 2020 at 3:57 PM Walker Carlson 
wrote:


I think that Guozhang and Matthias make good points.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Introduce+Kafka+Streams+Specific+Uncaught+Exception+Handler

I have updated the kip to include a StreamsUncaughtExceptionHandler



On Sun, Sep 27, 2020 at 7:28 PM Guozhang Wang  wrote:


I think single-threaded clients may be common in practice, and what
Matthias raised is a valid concern.

We had a related discussion in KIP-663, that maybe we can tweak the
`UncaughtExceptionExceptionHandler` a bit such that instead of just
registered users' function into the individual threads, we trigger them
BEFORE the thread dies in the "catch (Exception)" block. It was proposed
originally to make sure that in the function if a user calls
localThreadMetadata() the dying / throwing thread would still be

included,

but maybe we could consider this reason as well.


Guozhang


On Fri, Sep 25, 2020 at 3:20 PM Matthias J. Sax 

wrote:



I am wondering about the usage pattern of this new method.

As already discussed, the method only works if there is at least one
running thread... Do we have any sense how many apps actually run
multi-threaded vs single-threaded? It seems that the feature might be
quite limited without having a handler that is called _before_ the
thread dies? However, for this case, I am wondering if it might be
easier to just return a enum type from such a handler instead of

calling

`KakfaStreams#initiateClosingAllClients()`?

In general, it seems that there is some gap between the case of

stopping

all instances from "outside" (as proposed in the KIP), vs from "inside"
(what I though was the original line of thinking for this KIP?).

For the network partitioning case, should we at least shutdown all

local

threads? It might be sufficient that only one thread sends the

"shutdown

signal" while all others just shut down directly? Why should the other
thread wait for shutdown signal for a rebalance? Or should we recommend
to call `initiateClosingAllClients()` followed to `close()` to make

sure

that at least the local threads stop (what might be a little bit odd)?

-Matthias

On 9/24/20 7:51 AM, John Roesler wrote:

Hello all,

Thanks for bringing this up, Bruno. It’s a really good point that a

disconnected node would miss the signal and then resurrect a

single-nod

Re: [VOTE] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-29 Thread John Roesler
Thanks, Bruno, this sounds good to me. 
-John

On Tue, Sep 29, 2020, at 03:13, Bruno Cadonna wrote:
> Hi all,
> 
> I did two minor modifications to the KIP.
> 
> - I removed the rather strict guarantee "Dead stream threads are removed 
> from a Kafka Streams client at latest after the next call to 
> KafkaStreams#addStreamThread() or KafkaStreams#removeStreamThread() 
> following the transition to state DEAD."
> Dead stream threads will be still removed, but the behavior will be less 
> strict.
> 
> - Added a sentence that states that the Kafka Streams client will 
> transit to ERROR if the last alive stream thread dies exceptionally. 
> This corresponds to the current behavior.
> 
> I will not restart voting and keep the votes so far.
> 
> Best,
> Bruno
> 
> On 22.09.20 01:19, John Roesler wrote:
> > I’m +1 also. Thanks, Bruno!
> > -John
> > 
> > On Mon, Sep 21, 2020, at 17:08, Guozhang Wang wrote:
> >> Thanks Bruno. I'm +1 on the KIP.
> >>
> >> On Mon, Sep 21, 2020 at 2:49 AM Bruno Cadonna  wrote:
> >>
> >>> Hi,
> >>>
> >>> I would like to restart from zero the voting on KIP-663 that proposes to
> >>> add methods to the Kafka Streams client to add and remove stream threads
> >>> during execution.
> >>>
> >>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads
> >>>
> >>> Matthias, if you are still +1, please vote again.
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 04.09.20 23:12, John Roesler wrote:
>  Hi Sophie,
> 
>  Uh, oh, it's never a good sign when the discussion moves
>  into the vote thread :)
> 
>  I agree with you, it seems like a good touch for
>  removeStreamThread() to return the name of the thread that
>  got removed, rather than a boolean flag. Maybe the return
>  value would be `null` if there is no thread to remove.
> 
>  If we go that way, I'd suggest that addStreamThread() also
>  return the name of the newly created thread, or null if no
>  thread can be created right now.
> 
>  I'm not completely sure if I think that callers of this
>  method would know exactly how many threads there are. Sure,
>  if a human being is sitting there looking at the metrics or
>  logs and decides to call the method, it would work out, but
>  I'd expect this kind of method to find its way into
>  automated tooling that reacts to things like current system
>  load or resource saturation. Those kinds of toolchains often
>  are part of a distributed system, and it's probably not that
>  easy to guarantee that the thread count they observe is
>  fully consistent with the number of threads that are
>  actually running. Therefore, an in-situ `int
>  numStreamThreads()` method might not be a bad idea. Then
>  again, it seems sort of optional. A caller can catch an
>  exception or react to a `null` return value just the same
>  either way. Having both add/remove methods behave similarly
>  is probably more valuable.
> 
>  Thanks,
>  -John
> 
> 
>  On Thu, 2020-09-03 at 12:15 -0700, Sophie Blee-Goldman
>  wrote:
> > Hey, sorry for the late reply, I just have one minor suggestion. Since
> >>> we
> > don't
> > make any guarantees about which thread gets removed or allow the user to
> > specify, I think we should return either the index or full name of the
> > thread
> > that does get removed by removeThread().
> >
> > I know you just updated the KIP to return true/false if there
> >>> are/aren't any
> > threads to be removed, but I think this would be more appropriate as an
> > exception than as a return type. I think it's reasonable to expect
> >>> users to
> > have some sense to how many threads are remaining, and not try to remove
> > a thread when there is none left. To me, that indicates something wrong
> > with the user application code and should be treated as an exceptional
> >>> case.
> > I don't think the same code clarify argument applies here as to the
> > addStreamThread() case, as there's no reason for an application to be
> > looping and retrying removeStreamThread()  since if that fails, it's
> >>> because
> > there are no threads left and thus it will continue to always fail. And
> >>> if
> > the
> > user actually wants to shut down all threads, they should just close the
> > whole application rather than call removeStreamThread() in a loop.
> >
> > While I generally think it should be straightforward for users to track
> >>> how
> > many stream threads they have running, maybe it would be nice to add
> > a small utility method that does this for them. Something like
> >
> > // Returns the number of currently alive threads
> > boolean runningStreamThreads();
> >
> > On Thu, Sep 3, 2020 at 7:41 AM Matthias J. Sax 
> >>> wrote:
> >
> >> +1 (binding)
> >>
> >> On 9/3/20 6

[jira] [Resolved] (KAFKA-9514) The protocol generator generated useless condition when a field is made nullable and flexible version is used

2020-09-29 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-9514.

Fix Version/s: 2.7.0
   Resolution: Fixed

> The protocol generator generated useless condition when a field is made 
> nullable and flexible version is used
> -
>
> Key: KAFKA-9514
> URL: https://issues.apache.org/jira/browse/KAFKA-9514
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7.0
>
>
> The protocol generator generates useless conditions when a field of type 
> string is made nullable after the request has been converted to using 
> optional fields.
> As an example, we have make the field `ProtocolName` nullable in the 
> `JoinGroupResponse`. The `JoinGroupResponse` supports optional fields since 
> version 6 and the field is nullable since version 7. Under these conditions, 
> the generator generates the following code:
> {code:java}
> if (protocolName == null) {
>  if (_version >= 7) {
>if (_version >= 6) {
>  _writable.writeUnsignedVarint(0);
>} else {
>  _writable.writeShort((short) -1);
>   }
>  } else {
>throw new NullPointerException();
>  }
> }
> {code}
> spotbugs raises an `UC_USELESS_CONDITION` because `_version >= 6` is always 
> true.  
> We could optimise the generator to handle this.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is back to normal : Kafka » kafka-trunk-jdk11 #98

2020-09-29 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-671: Shutdown Streams Application when appropriate exception is thrown

2020-09-29 Thread Walker Carlson
Thank you for the feedback Guozhang and Bruno. See the responses below.

I have updated the kip accordingly

Thanks,
Walker

On Tue, Sep 29, 2020 at 1:59 AM Bruno Cadonna  wrote:

> Hi Walker,
>
> Thanks for updating the KIP!
>
> 1. I would add response REPLACE_STREAM_THREAD to the
> StreamsUncaughtExceptionHandlerResponse enum to start a new stream
> thread that replaces the failed one. I suspect you did not add it
> because it depends on KIP-663. A dependency to another unfinished KIP
> should not stop you from adding this response.
>

Yes I was unsure about adding this as your kip is still under discussion.
We can add it to 663 if you would prefer then there will not be any
dependency on it. We can always add it separately or we can do a partial
implementation. Whichever you think is best


>
> 2. Why does the Kafka Streams client transit to NOT_RUNNING when it is
> shutdown due to SHUTDOWN_KAFKA_STREAMS_CLIENT and
> SHUTDOWN_KAFKA_STREAMS_APPLICATION? I would rather expect that it
> transits to ERROR, since we are exclusively talking about error cases
> now. I would also not emulate the current behavior of close(), since
> close() is not intended to be used in the error case due to deadlocks
> you could run into.
>

We can change it to state Error.


>
> 3. Since the motivation of the KIP changed quite a lot, I think you
> should remove KAFKA-4748 from the motivation or make it clear that this
> KIP does only cover the shutdown of the Kafka Streams application in the
> error case.
>
> This is a fair point. I will remove it.


> 4. I would just overload method setUncaughtExceptionHandler() and not
> introduce a method with a new name.
>

Alright. It is the same reason I hadn't deprecated it but I think we can
just go a head a do it.


>
> 5. I agree with Guozhang that we should deprecate the overload with the
> Java-specific handler. I am sure you wanted to deprecate the method and
> just forgot about it.
>

I actually had but removed it because I felt that we are not replacing it
completely, but we might as well depreate it.


>
> 6. I agree with Guozhang that the RocksDB metrics recording thread
> should also be shut down. To be fair, when Walker asked me about it, I
> thought it is not strictly necessary to shut it down, but thinking about
> it again, it also does not make a lot of sense to keep it running,
> because the RocksDB metrics would have all be removed at that point.
>

We can change that


>
> 7. I think we should provide a default implementation of the handler.
> However, the default implementation should just return
> SHUTDOWN_STREAM_THREAD which corresponds to the current behavior. If we
> want to provide a more elaborated default handler, I would propose to
> discuss that on a separate KIP to not block this KIP on that discussion.
>

This is what I am currently doing. Before it is set it defaults to a lambda
to just SHUTDOWN_STREAM_THREAD and if they reset if by passing null it will
return to the default.

I agree that a default that changes the behavior of the default might want
to wait.


>
> Best,
> Bruno
>
> On 29.09.20 05:35, Guozhang Wang wrote:
> > Hello Walker,
> >
> > Thanks for the updated KIP proposal. A few more comments below:
> >
> > 1. "The RocksDB metrics recording thread is not shutdown." Why it should
> > not be shut down in either client or application shutdown cases?
>

see above, it's been fixed


> >
> > 2. Should we deprecate the existing overloaded function with the java
> > UncaughtExceptionHandler?
>

If we are going to depreciate it then yes. I have updated it


> >
> > 3. Should we consider providing a default implementation of this handler
> > interface which is automatically set if not overridden by users, e.g. one
> > that would choose to SHUTDOWN_KAFKA_STREAMS_APPLICATION upon
> > MissingSourceTopicException in KIP-662.
>

We could but I don't think it's strictly necessary though.


> >
> >
> > Guozhang
> >
>
>


[jira] [Resolved] (KAFKA-9546) Make FileStreamSourceTask extendable with generic streams

2020-09-29 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-9546.
--
Resolution: Won't Fix

I'm going to close this as WONTFIX, per my previous comment.

> Make FileStreamSourceTask extendable with generic streams
> -
>
> Key: KAFKA-9546
> URL: https://issues.apache.org/jira/browse/KAFKA-9546
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Csaba Galyo
>Assignee: Csaba Galyo
>Priority: Major
>  Labels: connect-api, needs-kip
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> Use case: I want to read a ZIP compressed text file with a file connector and 
> send it to Kafka.
> Currently, we have FileStreamSourceConnector which reads a \n delimited text 
> file. This connector always returns a task of type FileStreamSourceTask.
> The FileStreamSourceTask reads from stdio or opens a file InputStream. The 
> issue with this approach is that the input needs to be a text file, otherwise 
> it won't work. 
> The code should be modified so that users could change the default 
> InputStream to eg. ZipInputStream, or any other format. The code is currently 
> written in such a way that it's not possible to extend it, we cannot use a 
> different input stream. 
> See example here where the code got copy-pasted just so it could read from a 
> ZstdInputStream (which reads ZSTD compressed files): 
> [https://github.com/gcsaba2/kafka-zstd/tree/master/src/main/java/org/apache/kafka/connect/file]
>  
> I suggest 2 changes:
>  # FileStreamSourceConnector should be extendable to return tasks of 
> different types. These types would be input by the user through the 
> configuration map
>  # FileStreamSourceTask should be modified so it could be extended and child 
> classes could define different input streams.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: Kafka » kafka-trunk-jdk15 #130

2020-09-29 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-8360: Docs do not mention RequestQueueSize JMX metric (#9325)


--
[...truncated 6.70 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairs 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopi

Re: [DISCUSS] KIP-671: Shutdown Streams Application when appropriate exception is thrown

2020-09-29 Thread Guozhang Wang
Thanks for the updates Walker. They all lgtm.

On Tue, Sep 29, 2020 at 8:33 AM Walker Carlson 
wrote:

> Thank you for the feedback Guozhang and Bruno. See the responses below.
>
> I have updated the kip accordingly
>
> Thanks,
> Walker
>
> On Tue, Sep 29, 2020 at 1:59 AM Bruno Cadonna  wrote:
>
> > Hi Walker,
> >
> > Thanks for updating the KIP!
> >
> > 1. I would add response REPLACE_STREAM_THREAD to the
> > StreamsUncaughtExceptionHandlerResponse enum to start a new stream
> > thread that replaces the failed one. I suspect you did not add it
> > because it depends on KIP-663. A dependency to another unfinished KIP
> > should not stop you from adding this response.
> >
>
> Yes I was unsure about adding this as your kip is still under discussion.
> We can add it to 663 if you would prefer then there will not be any
> dependency on it. We can always add it separately or we can do a partial
> implementation. Whichever you think is best
>
>
> >
> > 2. Why does the Kafka Streams client transit to NOT_RUNNING when it is
> > shutdown due to SHUTDOWN_KAFKA_STREAMS_CLIENT and
> > SHUTDOWN_KAFKA_STREAMS_APPLICATION? I would rather expect that it
> > transits to ERROR, since we are exclusively talking about error cases
> > now. I would also not emulate the current behavior of close(), since
> > close() is not intended to be used in the error case due to deadlocks
> > you could run into.
> >
>
> We can change it to state Error.
>
>
> >
> > 3. Since the motivation of the KIP changed quite a lot, I think you
> > should remove KAFKA-4748 from the motivation or make it clear that this
> > KIP does only cover the shutdown of the Kafka Streams application in the
> > error case.
> >
> > This is a fair point. I will remove it.
>
>
> > 4. I would just overload method setUncaughtExceptionHandler() and not
> > introduce a method with a new name.
> >
>
> Alright. It is the same reason I hadn't deprecated it but I think we can
> just go a head a do it.
>
>
> >
> > 5. I agree with Guozhang that we should deprecate the overload with the
> > Java-specific handler. I am sure you wanted to deprecate the method and
> > just forgot about it.
> >
>
> I actually had but removed it because I felt that we are not replacing it
> completely, but we might as well depreate it.
>
>
> >
> > 6. I agree with Guozhang that the RocksDB metrics recording thread
> > should also be shut down. To be fair, when Walker asked me about it, I
> > thought it is not strictly necessary to shut it down, but thinking about
> > it again, it also does not make a lot of sense to keep it running,
> > because the RocksDB metrics would have all be removed at that point.
> >
>
> We can change that
>
>
> >
> > 7. I think we should provide a default implementation of the handler.
> > However, the default implementation should just return
> > SHUTDOWN_STREAM_THREAD which corresponds to the current behavior. If we
> > want to provide a more elaborated default handler, I would propose to
> > discuss that on a separate KIP to not block this KIP on that discussion.
> >
>
> This is what I am currently doing. Before it is set it defaults to a lambda
> to just SHUTDOWN_STREAM_THREAD and if they reset if by passing null it will
> return to the default.
>
> I agree that a default that changes the behavior of the default might want
> to wait.
>
>
> >
> > Best,
> > Bruno
> >
> > On 29.09.20 05:35, Guozhang Wang wrote:
> > > Hello Walker,
> > >
> > > Thanks for the updated KIP proposal. A few more comments below:
> > >
> > > 1. "The RocksDB metrics recording thread is not shutdown." Why it
> should
> > > not be shut down in either client or application shutdown cases?
> >
>
> see above, it's been fixed
>
>
> > >
> > > 2. Should we deprecate the existing overloaded function with the java
> > > UncaughtExceptionHandler?
> >
>
> If we are going to depreciate it then yes. I have updated it
>
>
> > >
> > > 3. Should we consider providing a default implementation of this
> handler
> > > interface which is automatically set if not overridden by users, e.g.
> one
> > > that would choose to SHUTDOWN_KAFKA_STREAMS_APPLICATION upon
> > > MissingSourceTopicException in KIP-662.
> >
>
> We could but I don't think it's strictly necessary though.
>
>
> > >
> > >
> > > Guozhang
> > >
> >
> >
>


-- 
-- Guozhang


[jira] [Created] (KAFKA-10533) Add log flush semantics to simulation test

2020-09-29 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10533:
---

 Summary: Add log flush semantics to simulation test
 Key: KAFKA-10533
 URL: https://issues.apache.org/jira/browse/KAFKA-10533
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Jason Gustafson






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10534) Modify the originals parameter type of the AbstractConfig class to avoid redundant judgments in the code

2020-09-29 Thread linenwei (Jira)
linenwei created KAFKA-10534:


 Summary: Modify the originals parameter type of the AbstractConfig 
class to avoid redundant judgments in the code
 Key: KAFKA-10534
 URL: https://issues.apache.org/jira/browse/KAFKA-10534
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.6.0
Reporter: linenwei


{code:java}
 @SuppressWarnings("unchecked")
  public AbstractConfig(ConfigDef definition, Map originals,  Map configProviderProps, boolean doLog){ 
 /* check that all the keys are really strings */
  for (Map.Entry entry : originals.entrySet())
  if (!(entry.getKey() instanceof String))   
  throw new ConfigException(entry.getKey().toString(),   
entry.getValue(), "Key must be a string.");


{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10535:


 Summary: KIP-478: Implement StateStoreContext and Record
 Key: KAFKA-10535
 URL: https://issues.apache.org/jira/browse/KAFKA-10535
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10536) KIP-478: Implement KStream changes

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10536:


 Summary: KIP-478: Implement KStream changes
 Key: KAFKA-10536
 URL: https://issues.apache.org/jira/browse/KAFKA-10536
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10537) Convert KStreamImpl filters to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10537:


 Summary: Convert KStreamImpl filters to new PAPI
 Key: KAFKA-10537
 URL: https://issues.apache.org/jira/browse/KAFKA-10537
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10538) Convert KStreamImpl maps to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10538:


 Summary: Convert KStreamImpl maps to new PAPI
 Key: KAFKA-10538
 URL: https://issues.apache.org/jira/browse/KAFKA-10538
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10539) Convert KStreamImpl joins to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10539:


 Summary: Convert KStreamImpl joins to new PAPI
 Key: KAFKA-10539
 URL: https://issues.apache.org/jira/browse/KAFKA-10539
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10540) Convert KStream aggregations to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10540:


 Summary: Convert KStream aggregations to new PAPI
 Key: KAFKA-10540
 URL: https://issues.apache.org/jira/browse/KAFKA-10540
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10543) Convert KTable joins to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10543:


 Summary: Convert KTable joins to new PAPI
 Key: KAFKA-10543
 URL: https://issues.apache.org/jira/browse/KAFKA-10543
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10542) Convert KTable maps to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10542:


 Summary: Convert KTable maps to new PAPI
 Key: KAFKA-10542
 URL: https://issues.apache.org/jira/browse/KAFKA-10542
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10544) Convert KTable aggregations to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10544:


 Summary: Convert KTable aggregations to new PAPI
 Key: KAFKA-10544
 URL: https://issues.apache.org/jira/browse/KAFKA-10544
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10541) Convert KTable filters to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10541:


 Summary: Convert KTable filters to new PAPI
 Key: KAFKA-10541
 URL: https://issues.apache.org/jira/browse/KAFKA-10541
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-584: Versioning scheme for features

2020-09-29 Thread Jun Rao
Hi, Kowshik,

Thanks for the update. Regarding enabling a single rolling restart in the
future, could we sketch out a bit how this will work by treating IBP as a
feature? For example, IBP currently uses the release version and this KIP
uses an integer for versions. How do we bridge the gap between the two?
Does min.version still make sense for IBP as a feature?

Thanks,

Jun

On Fri, Sep 25, 2020 at 5:57 PM Kowshik Prakasam 
wrote:

> Hi Colin,
>
> Thanks for the feedback. Those are very good points. I have made the
> following changes to the KIP as you had suggested:
> 1. Included the `timeoutMs` field in the `UpdateFeaturesRequest` schema.
> The initial implementation won't be making use of the field, but we can
> always use it in the future as the need arises.
> 2. Modified the `FinalizedFeaturesEpoch` field in `ApiVersionsResponse` to
> use int64. This is to avoid overflow problems in the future once ZK is
> gone.
>
> I have also incorporated these changes into the versioning write path PR
> that is currently under review: https://github.com/apache/kafka/pull/9001.
>
>
> Cheers,
> Kowshik
>
>
>
> On Fri, Sep 25, 2020 at 4:57 PM Kowshik Prakasam 
> wrote:
>
> > Hi Jun,
> >
> > Thanks for the feedback. It's a very good point. I have now modified the
> > KIP-584 write-up "goals" section a bit. It now mentions one of the goals
> as
> > enabling rolling upgrades using a single restart (instead of 2). Also I
> > have removed the text explicitly aiming for deprecation of IBP. Note that
> > previously under "Potential features in Kafka" the IBP was mentioned
> under
> > point (4) as a possible coarse-grained feature. Hopefully, now the 2
> > sections of the KIP align with each other well.
> >
> >
> > Cheers,
> > Kowshik
> >
> >
> > On Fri, Sep 25, 2020 at 2:03 PM Colin McCabe  wrote:
> >
> >> On Tue, Sep 22, 2020, at 00:43, Kowshik Prakasam wrote:
> >> > Hi all,
> >> >
> >> > I wanted to let you know that I have made the following changes to the
> >> > KIP-584 write up. The purpose is to ensure the design is correct for a
> >> few
> >> > things which came up during implementation:
> >> >
> >>
> >> Hi Kowshik,
> >>
> >> Thanks for the updates.
> >>
> >> >
> >> > 1. Per FeatureUpdate error code: The UPDATE_FEATURES controller API is
> >> no
> >> > longer transactional. Going forward, we allow for individual
> >> FeatureUpdate
> >> > to succeed/fail in the request. As a result, the response schema now
> >> > contains an error code per FeatureUpdate as well as a top-level error
> >> code.
> >> > Overall this is a better design because it better represents the
> nature
> >> of
> >> > the API: each FeatureUpdate in the request is independent of the other
> >> > updates, and the controller can process/apply these independently to
> ZK.
> >> > When an UPDATE_FEATURES request fails, this new design provides better
> >> > clarity to the caller on which FeatureUpdate could not be applied (via
> >> the
> >> > individual error codes). In the previous design, we were unable to
> >> achieve
> >> > such an increased level of clarity in communicating the error codes.
> >> >
> >>
> >> OK
> >>
> >> >
> >> > 2. Due to #1, there were some minor changes required to the proposed
> >> Admin
> >> > APIs (describeFeatures and updateFeatures). A few unnecessary public
> >> APIs
> >> > have been removed, and couple essential ones have been added. The
> latest
> >> > changes now represent the latest design.
> >> >
> >> > 3. The timeoutMs field has been removed from the the UPDATE_FEATURES
> API
> >> > request, since it was not found to be required during implementation.
> >> >
> >>
> >> Please don't get rid of timeoutMs.  timeoutMs is required if you want to
> >> implement the ability to timeout the call if the controller can't get
> to it
> >> in time.  This is important for avoiding congestion collapse where the
> >> controller collapses under the weight of lots of retries of the same
> set of
> >> calls.
> >>
> >> We may not be able to do it in the initial implementation, but we will
> >> eventually implement this for all the controller-bound RPCs.
> >>
> >> > >
> >> > > 2. Finalized feature version epoch data type has been made to be
> int32
> >> > > (instead of int64). The reason is that the epoch value is the value
> >> of ZK
> >> > > node version, whose data type is int32.
> >> > >
> >>
> >> Sorry, I missed this earlier.  Using 16 bit feature levels seems fine.
> >> However, please don't use a 32-bit epoch here.  We deliberately made the
> >> epoch 64 bits to avoid overflow problems in the future once ZK is gone.
> >>
> >> best,
> >> Colin
> >>
> >> > > 3. Introduced a new 'status' field in the '/features' ZK node
> schema.
> >> The
> >> > > purpose is to implement Colin's earlier point for the strategy for
> >> > > transitioning from not having a /features znode to having one. An
> >> > > explanation has been provided in the following section of the KIP
> >> detailing
> >> > > the different cases:
> >> > >
> >>
> https://cwiki.apa

[jira] [Created] (KAFKA-10546) KIP-478: Deprecate old PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10546:


 Summary: KIP-478: Deprecate old PAPI
 Key: KAFKA-10546
 URL: https://issues.apache.org/jira/browse/KAFKA-10546
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler


Can't be done until after the DSL internals are migrated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10545) Create Topic IDs and Propagate to Brokers

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10545:
--

 Summary: Create Topic IDs and Propagate to Brokers
 Key: KAFKA-10545
 URL: https://issues.apache.org/jira/browse/KAFKA-10545
 Project: Kafka
  Issue Type: Improvement
Reporter: Justine Olshan
Assignee: Justine Olshan


First step for KIP-516

The goals are:
 * Create and store topic IDs in a ZK Node and controller memory.
 * Propagate topic ID to brokers with updated LeaderAndIsrRequest
 * Store topic ID in memory on broker, persistent file in log



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10547) Add topic IDs to MetadataResponse, UpdateMetadata, and Fetch

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10547:
--

 Summary: Add topic IDs to MetadataResponse, UpdateMetadata, and 
Fetch
 Key: KAFKA-10547
 URL: https://issues.apache.org/jira/browse/KAFKA-10547
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


Prevent reads from deleted topics

Will be able to use TopicDescription to identify the topic ID



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10548) Implement Type field and logic for LeaderAndIsrRequests

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10548:
--

 Summary: Implement Type field and logic for LeaderAndIsrRequests
 Key: KAFKA-10548
 URL: https://issues.apache.org/jira/browse/KAFKA-10548
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


This will allow for specialized deletion logic when receiving 
LeaderAndIsrRequests

Will also create and utilize delete.stale.topic.delay.ms configuration option



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10549) Add topic ID support to DeleteTopics,ListOffsets, OffsetForLeaders, StopReplica

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10549:
--

 Summary: Add topic ID support to DeleteTopics,ListOffsets, 
OffsetForLeaders, StopReplica
 Key: KAFKA-10549
 URL: https://issues.apache.org/jira/browse/KAFKA-10549
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


ListOffsets, OffsetForLeaders, and StopReplica protocols will replace topic 
name with topic ID and will be used to prevent reads from deleted topics

Delete topics will be changed to support topic ids and delete sooner.

This may be split into two or more issues if necessary.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-673: Emit JSONs with new auto-generated schema

2020-09-29 Thread Anastasia Vela
Hi Tom,

I'm glad it makes more sense. I modified the KIP just to make it a little
more clear as well.

Is the concern that deserialization would have no guarantee as to the order
it appears? Since ObjectNodes hold the tree structure in a LinkedHashMap,
the iterating order to deserialize would be defined in the order it was
inserted into the map.

I understand the performance concern. However, this would only be logged
when the trace level logging is turned on, so this logging isn't done
otherwise. Plus, the existing text-based logging is already very expensive,
so it might not be that significant of a change.

Thanks,
Anastasia

On Mon, Sep 28, 2020 at 3:06 AM Tom Bentley  wrote:

> Hi Anastasia,
>
> Thanks for those changes. I eventually figured out that the
> XYZJsonDataConverter are the new classes from the factoring-out of the
> Jackson dependency (KAFKA-10384), after which the proposal made much more
> sense to me. Apologies for being slow on the uptake there.
>
> I can see now that property order in the JSON in the log would be
> determined by the declared field in the RPC JSON definition. That might not
> always be ideal since it can be hard to visually parse large unindented
> JSON, but maybe that could be dealt with once we knew there was a real
> problem.
>
> It's an implementation detail, but I wonder whether constructing a whole
> tree of JsonNodes might cause performance issues. It would be more work,
> but the XYZJsonDataConverter could be generated to have a method which took
> a JsonGenerator, thus avoiding the need to instantiate the nodes just for
> the purposes of logging.
>
> Kind regards,
>
> Tom
>
> On Fri, Sep 25, 2020 at 7:05 PM Anastasia Vela  wrote:
>
> > Hi Tom,
> >
> > Thanks for your input!
> >
> > 1. I'll add more details for the RequestConvertToJson and
> > XYZJsonDataConverter classes. Hopefully it will be more clear, but just
> to
> > answer your question, RequestConvertToJson does not return a
> > XYZJsonDataConverter, but rather it returns a JsonNode which will be
> > serialized. The JsonDataConverter is the new auto-generated schema for
> each
> > request/response type that contains the method to return the JsonNode to
> be
> > serialized.
> >
> > 2. There is no defined order of the properties, rather it's in the order
> > that it is set in. So if you first set key B, then key A, the properties
> > would appear with key B first. JsonNodes when serialized does not sort
> the
> > keys.
> >
> > 3. Yes, serialization is done via Jackson databind.
> >
> > Thanks again,
> > Anastasia
> >
> > On Fri, Sep 25, 2020 at 1:15 AM Tom Bentley  wrote:
> >
> > > Hi Anastasia,
> > >
> > > Thanks for the KIP, I can certainly see the benefit of this. I have a
> few
> > > questions:
> > >
> > > 1. I think it would be helpful to readers to explicitly illustrate the
> > > RequestConvertToJson and XYZJsonDataConverter classes (e.g. with method
> > > signatures for one or two methods), because currently it's not clear
> (to
> > me
> > > at least) exactly what's being proposed. Does the RequestConvertToJson
> > > return a XYZJsonDataConverter?
> > >
> > > 2. Does the serialization have a defined order of properties
> (alphabetic
> > > perhaps)? My concern here is that properties appearing in order
> according
> > > to how they are iterated in a hash map might harm human readability of
> > the
> > > logs.
> > >
> > > 3. Would the serialization be done via the Jackson databinding?
> > >
> > > Many thanks,
> > >
> > > Tom
> > >
> > > On Thu, Sep 24, 2020 at 11:49 PM Anastasia Vela 
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'd like to discuss KIP-673:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-673%3A+Emit+JSONs+with+new+auto-generated+schema
> > > >
> > > > This is a proposal to change the format of request and response
> traces
> > to
> > > > JSON, which would be easier to load and parse, because the current
> > format
> > > > is only JSON-like and not easily parsable.
> > > >
> > > > Let me know what you think,
> > > > Anastasia
> > > >
> > >
> >
>


[jira] [Created] (KAFKA-10550) Update kafka-topics.sh to support Topic IDs

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10550:
--

 Summary: Update kafka-topics.sh to support Topic IDs
 Key: KAFKA-10550
 URL: https://issues.apache.org/jira/browse/KAFKA-10550
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


 Make changes to kafka-topics.sh --describe so a user can specify a topic name 
to describe with the --topic parameter, or alternatively the user can supply a 
topic ID with the --topic_id parameter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-671: Add method to Shutdown entire Streams Application

2020-09-29 Thread Walker Carlson
Hello all,

I made some changes to the KIP the descriptions are on the discussion
thread. If you have already voted I would ask you to confirm your vote.

Otherwise please vote so we can get this feature in.

Thanks,
Walker

On Thu, Sep 24, 2020 at 4:36 PM John Roesler  wrote:

> Thanks for the KIP, Walker!
>
> I’m +1 (binding)
>
> -John
>
> On Mon, Sep 21, 2020, at 17:04, Guozhang Wang wrote:
> > Thanks for finalizing the KIP. +1 (binding)
> >
> >
> > Guozhang
> >
> > On Mon, Sep 21, 2020 at 1:38 PM Walker Carlson 
> > wrote:
> >
> > > Hello all,
> > >
> > > I would like to start a thread to vote for KIP-671 to add a method to
> close
> > > all clients in a kafka streams application.
> > >
> > > KIP:
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown
> > >
> > > Discussion thread: *here
> > > <
> > >
> https://mail-archives.apache.org/mod_mbox/kafka-dev/202009.mbox/%3CCAC55fuh3HAGCxz-PzxTJraczy6T-os2oiCV328PBeuJQSVYASg%40mail.gmail.com%3E
> > > >*
> > >
> > > Thanks,
> > > -Walker
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


[jira] [Resolved] (KAFKA-10479) Throw exception if users try to update configs of existent listeners

2020-09-29 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10479.
-
Fix Version/s: 2.7.0
   Resolution: Fixed

> Throw exception if users try to update configs of existent listeners
> 
>
> Key: KAFKA-10479
> URL: https://issues.apache.org/jira/browse/KAFKA-10479
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
> Fix For: 2.7.0
>
>
> {code}
> def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): 
> Map[String, AnyRef] = {
>   newConfig.originals.asScala.filter { case (key, _) =>
> key.startsWith(prefix) && !DynamicSecurityConfigs.contains(key)
>   }
> }
> {code}
> We don't actually compare new configs to origin configs so the suitable 
> exception is not thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10551) Support topic IDs in Produce request

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10551:
--

 Summary: Support topic IDs in Produce request
 Key: KAFKA-10551
 URL: https://issues.apache.org/jira/browse/KAFKA-10551
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


Replace the topic name with the topic ID so the request is smaller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10552) Update directory structure to use topic IDs

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10552:
--

 Summary: Update directory structure to use topic IDs
 Key: KAFKA-10552
 URL: https://issues.apache.org/jira/browse/KAFKA-10552
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


This change will probably coincide with a major release.

Topic names will be removed from the directory structure and replaced with 
topic IDs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: Kafka » kafka-trunk-jdk11 #99

2020-09-29 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR; Preserve ThrottlingQuotaExceededException when request timeouts 
after being retried due to a quota violation (KIP-599) (#9344)


--
[...truncated 6.71 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testToString STARTED

org.apache.kafka.streams.test.TestRecordTest > testToString PASSED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords STARTED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords PASSED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
STARTED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher STARTED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher PASSED

org.apache.kafka.streams.test.TestRecordTest > testFields STARTED

org.apache.kafka.streams.test.TestRecordTest > testFields PASSED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode STARTED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithInMemoryStore STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithInMemoryStore PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithLogging STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithLogging PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithCaching STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithCaching PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithPersistentStore STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithPersistentStore PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldDeleteAndReturnPlainValue STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldDeleteAndReturnPlainValue PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutAllWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutAllWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldReturnIsPersistent STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutIfAbsentWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutIfAbsentWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardClose 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardInit 
PASSED

> Task :streams:upgrade-system-tests-0100:spotbugsMain NO-SOURCE
> Task :streams:upgrade-system-tests-0100:test
> Task :streams:upgrade-system-tests-0101:compileJava NO-SOURCE
> Task :streams:upgrade-system-tests-0101:processResources NO-SOURCE
> Task :streams:upgrade-system-tests-0101:classes UP-TO-DATE
> Task :streams:upgrade-system-tests-0101:checkstyleMain NO-SOURCE
> Task :streams:upgrade-system-tests-0101:compileTestJava
> Task :streams:upgrade-syste

Build failed in Jenkins: Kafka » kafka-trunk-jdk15 #131

2020-09-29 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR; Preserve ThrottlingQuotaExceededException when request timeouts 
after being retried due to a quota violation (KIP-599) (#9344)


--
[...truncated 6.70 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairs 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicNameAndTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicNameAndTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
s

Re: [VOTE] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-29 Thread Matthias J. Sax
+1 (binding)

I am not super happy with the impact on the client state. For example, I
don't understand why it's ok to scale out if we lose one thread out of
four, but why it's not ok to scale out if we lose one thread out of one
(for this case, we would enter ERROR state and cannot add new threads
afterwards).

However, this might be an issue for a follow up KIP.


-Matthias

On 9/29/20 7:20 AM, John Roesler wrote:
> Thanks, Bruno, this sounds good to me. 
> -John
> 
> On Tue, Sep 29, 2020, at 03:13, Bruno Cadonna wrote:
>> Hi all,
>>
>> I did two minor modifications to the KIP.
>>
>> - I removed the rather strict guarantee "Dead stream threads are removed 
>> from a Kafka Streams client at latest after the next call to 
>> KafkaStreams#addStreamThread() or KafkaStreams#removeStreamThread() 
>> following the transition to state DEAD."
>> Dead stream threads will be still removed, but the behavior will be less 
>> strict.
>>
>> - Added a sentence that states that the Kafka Streams client will 
>> transit to ERROR if the last alive stream thread dies exceptionally. 
>> This corresponds to the current behavior.
>>
>> I will not restart voting and keep the votes so far.
>>
>> Best,
>> Bruno
>>
>> On 22.09.20 01:19, John Roesler wrote:
>>> I’m +1 also. Thanks, Bruno!
>>> -John
>>>
>>> On Mon, Sep 21, 2020, at 17:08, Guozhang Wang wrote:
 Thanks Bruno. I'm +1 on the KIP.

 On Mon, Sep 21, 2020 at 2:49 AM Bruno Cadonna  wrote:

> Hi,
>
> I would like to restart from zero the voting on KIP-663 that proposes to
> add methods to the Kafka Streams client to add and remove stream threads
> during execution.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads
>
> Matthias, if you are still +1, please vote again.
>
> Best,
> Bruno
>
> On 04.09.20 23:12, John Roesler wrote:
>> Hi Sophie,
>>
>> Uh, oh, it's never a good sign when the discussion moves
>> into the vote thread :)
>>
>> I agree with you, it seems like a good touch for
>> removeStreamThread() to return the name of the thread that
>> got removed, rather than a boolean flag. Maybe the return
>> value would be `null` if there is no thread to remove.
>>
>> If we go that way, I'd suggest that addStreamThread() also
>> return the name of the newly created thread, or null if no
>> thread can be created right now.
>>
>> I'm not completely sure if I think that callers of this
>> method would know exactly how many threads there are. Sure,
>> if a human being is sitting there looking at the metrics or
>> logs and decides to call the method, it would work out, but
>> I'd expect this kind of method to find its way into
>> automated tooling that reacts to things like current system
>> load or resource saturation. Those kinds of toolchains often
>> are part of a distributed system, and it's probably not that
>> easy to guarantee that the thread count they observe is
>> fully consistent with the number of threads that are
>> actually running. Therefore, an in-situ `int
>> numStreamThreads()` method might not be a bad idea. Then
>> again, it seems sort of optional. A caller can catch an
>> exception or react to a `null` return value just the same
>> either way. Having both add/remove methods behave similarly
>> is probably more valuable.
>>
>> Thanks,
>> -John
>>
>>
>> On Thu, 2020-09-03 at 12:15 -0700, Sophie Blee-Goldman
>> wrote:
>>> Hey, sorry for the late reply, I just have one minor suggestion. Since
> we
>>> don't
>>> make any guarantees about which thread gets removed or allow the user to
>>> specify, I think we should return either the index or full name of the
>>> thread
>>> that does get removed by removeThread().
>>>
>>> I know you just updated the KIP to return true/false if there
> are/aren't any
>>> threads to be removed, but I think this would be more appropriate as an
>>> exception than as a return type. I think it's reasonable to expect
> users to
>>> have some sense to how many threads are remaining, and not try to remove
>>> a thread when there is none left. To me, that indicates something wrong
>>> with the user application code and should be treated as an exceptional
> case.
>>> I don't think the same code clarify argument applies here as to the
>>> addStreamThread() case, as there's no reason for an application to be
>>> looping and retrying removeStreamThread()  since if that fails, it's
> because
>>> there are no threads left and thus it will continue to always fail. And
> if
>>> the
>>> user actually wants to shut down all threads, they should just close the
>>> whole application rather than call removeStreamThread() in a loop.
>>>
>>> While I generally think it sh

[jira] [Created] (KAFKA-10553) Track handling of topic deletion during reassignment

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10553:
--

 Summary: Track handling of topic deletion during reassignment
 Key: KAFKA-10553
 URL: https://issues.apache.org/jira/browse/KAFKA-10553
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


Currently deletion is blocked during partition reassignment, but KIP-516 can 
allow deletions to proceed without blocking. 

In addition we should be able to avoid blocking deletion when a replica is down.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-671: Add method to Shutdown entire Streams Application

2020-09-29 Thread Leah Thomas
Hey Walker,

Thanks for the KIP! I'm +1, non-binding.

Cheers,
Leah

On Tue, Sep 29, 2020 at 1:56 PM Walker Carlson 
wrote:

> Hello all,
>
> I made some changes to the KIP the descriptions are on the discussion
> thread. If you have already voted I would ask you to confirm your vote.
>
> Otherwise please vote so we can get this feature in.
>
> Thanks,
> Walker
>
> On Thu, Sep 24, 2020 at 4:36 PM John Roesler  wrote:
>
> > Thanks for the KIP, Walker!
> >
> > I’m +1 (binding)
> >
> > -John
> >
> > On Mon, Sep 21, 2020, at 17:04, Guozhang Wang wrote:
> > > Thanks for finalizing the KIP. +1 (binding)
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Sep 21, 2020 at 1:38 PM Walker Carlson 
> > > wrote:
> > >
> > > > Hello all,
> > > >
> > > > I would like to start a thread to vote for KIP-671 to add a method to
> > close
> > > > all clients in a kafka streams application.
> > > >
> > > > KIP:
> > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown
> > > >
> > > > Discussion thread: *here
> > > > <
> > > >
> >
> https://mail-archives.apache.org/mod_mbox/kafka-dev/202009.mbox/%3CCAC55fuh3HAGCxz-PzxTJraczy6T-os2oiCV328PBeuJQSVYASg%40mail.gmail.com%3E
> > > > >*
> > > >
> > > > Thanks,
> > > > -Walker
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


Re: [VOTE] KIP-671: Add method to Shutdown entire Streams Application

2020-09-29 Thread Guozhang Wang
+1 again on the KIP.

On Tue, Sep 29, 2020 at 1:51 PM Leah Thomas  wrote:

> Hey Walker,
>
> Thanks for the KIP! I'm +1, non-binding.
>
> Cheers,
> Leah
>
> On Tue, Sep 29, 2020 at 1:56 PM Walker Carlson 
> wrote:
>
> > Hello all,
> >
> > I made some changes to the KIP the descriptions are on the discussion
> > thread. If you have already voted I would ask you to confirm your vote.
> >
> > Otherwise please vote so we can get this feature in.
> >
> > Thanks,
> > Walker
> >
> > On Thu, Sep 24, 2020 at 4:36 PM John Roesler 
> wrote:
> >
> > > Thanks for the KIP, Walker!
> > >
> > > I’m +1 (binding)
> > >
> > > -John
> > >
> > > On Mon, Sep 21, 2020, at 17:04, Guozhang Wang wrote:
> > > > Thanks for finalizing the KIP. +1 (binding)
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Sep 21, 2020 at 1:38 PM Walker Carlson <
> wcarl...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hello all,
> > > > >
> > > > > I would like to start a thread to vote for KIP-671 to add a method
> to
> > > close
> > > > > all clients in a kafka streams application.
> > > > >
> > > > > KIP:
> > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown
> > > > >
> > > > > Discussion thread: *here
> > > > > <
> > > > >
> > >
> >
> https://mail-archives.apache.org/mod_mbox/kafka-dev/202009.mbox/%3CCAC55fuh3HAGCxz-PzxTJraczy6T-os2oiCV328PBeuJQSVYASg%40mail.gmail.com%3E
> > > > > >*
> > > > >
> > > > > Thanks,
> > > > > -Walker
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>


-- 
-- Guozhang


Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread John Roesler
Hello again, all,

Thanks for the latest round of discussion. I've taken the
recent feedback and come up with an updated KIP that seems
actually quite a bit nicer than the prior proposal.

The specific diff on the KIP is here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14

These changes are implemented in this POC PR:
https://github.com/apache/kafka/pull/9346

The basic idea is that, building on the recent conversaion,
we would transition away from the current API where we get
only key/value in the process() method and other "data"
comes in the ProcessorContext along with the "metadata".

Instead, we formalize what is "data" and what is "metadata",
and pass it all in to the process method:
Processor#process(Record, Optional)

Also, you forward the whole data class instead of mutating
the ProcessorContext fields and also calling forward:
ProcessorContext#forward(Record)

The Record class itself ships with methods like
record#withValue(NewV newValue)
that make a shallow copy of the input Record, enabling
Processors to safely handle the record without polluting the
context of their parents and siblings.

This proposal has a number of key benefits:
* As we've discovered in KAFKA-9584, it's unsafe to mutate
the Headers via the ProcessorContext. This proposal offers a
way to safely forward changes only to downstream processors.
* The new API has symmetry (each processor's input is the
output of its parent processor)
* The API makes clear that the record metadata isn't always
defined (for example, in a punctuation, there is no current
topic/partition/offset)
* The API enables punctuators to forward well defined
headers downstream, which is currently not possible.

Unless their are objections, I'll go ahead and re-finalize
this KIP and update that PR to a mergeable state.

Thanks, all,
-John


On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote:
> Interesting proposal. However, I am not totally convinced, because I see
> a fundamental difference between "data" and "metadata".
> 
> Topic/partition/offset are "metadata" in the strong sense and they are
> immutable.
> 
> On the other hand there is "primary" data like key and value, as well as
> "secondary" data like timestamp and headers. The issue seems that we
> treat "secondary data" more like metadata atm?
> 
> Thus, promoting timestamp and headers into a first class citizen roll
> make sense to me (my original proposal about `RecordContext` would still
> fall short with this regard). However, putting both (data and metadata)
> into a `Record` abstraction might go too far?
> 
> I am also a little bit concerned about `Record.copy()` because it might
> be a trap: Users might assume it does a full deep copy of the record,
> however, it would not. It would only create a new `Record` object as
> wrapper that points to the same key/value/header objects as the input
> record.
> 
> With the current `context.forward(key, value)` we don't have this "deep
> copy" issue -- it's pretty clear what is happening.
> 
> Instead of `To.all().withTimestamp()` we could also add
> `context.forward(key, value, timestamp)` etc (just wondering about the
> exposition in overload)?
> 
> Also, `Record.withValue` etc sounds odd? Should a record not be
> immutable? So, we could have something like
> 
> `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`.
> But it looks rather verbose?
> 
> The other question is of course, to what extend to we want to keep the
> distinction between "primary" and "secondary" data? To me, it's a
> question of easy of use?
> 
> Just putting all this out to move the discussion forward. Don't have a
> concrete proposal atm.
> 
> 
> -Matthias
> 
> 
> On 9/14/20 9:24 AM, John Roesler wrote:
> > Thanks for this thought, Matthias!
> > 
> > To be honest, it's bugged me quite a bit that _all_ the
> > record information hasn't been an argument to `process`. I
> > suppose I was trying to be conservative in this proposal,
> > but then again, if we're adding new Processor and
> > ProcessorContext interfaces, then this is the time to make
> > such a change.
> > 
> > To be unambiguous, I think this is what we're talking about:
> > ProcessorContext:
> > * applicationId
> > * taskId
> > * appConfigs
> > * appConfigsWithPrefix
> > * keySerde
> > * valueSerde
> > * stateDir
> > * metrics
> > * schedule
> > * commit
> > * forward
> > 
> > StateStoreContext:
> > * applicationId
> > * taskId
> > * appConfigs
> > * appConfigsWithPrefix
> > * keySerde
> > * valueSerde
> > * stateDir
> > * metrics
> > * register
> > 
> > 
> > RecordContext
> > * topic
> > * partition
> > * offset
> > * timestamp
> > * headers
> > 
> > 
> > Your proposal sounds good to me as-is. Just to cover the
> > bases, though, I'm wondering if we should push the idea just
> > a little farther. Instead of decomposing key,value,context,
> > we could just keep them all in one object, like this:
> > 
> >

[jira] [Resolved] (KAFKA-9061) StreamStreamJoinIntegrationTest flaky test failures

2020-09-29 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9061.
--
Resolution: Cannot Reproduce

> StreamStreamJoinIntegrationTest flaky test failures
> ---
>
> Key: KAFKA-9061
> URL: https://issues.apache.org/jira/browse/KAFKA-9061
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Chris Pettitt
>Priority: Major
>  Labels: flaky-test
> Attachments: stacktraces-3269.txt
>
>
> It looks like one test timed out during cleanup and all other tests failed 
> due to lack of cleanup.
> See attachment for detailed stack traces.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: Kafka » kafka-trunk-jdk11 #100

2020-09-29 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10479; Throw exception if users try to update non-reconfigurable 
configs of existing listeners (#9284)


--
[...truncated 3.35 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.Top

Re: [VOTE] KIP-671: Add method to Shutdown entire Streams Application

2020-09-29 Thread Bill Bejeck
Thanks for the KIP Walker.

+1 (binding)

-Bill

On Tue, Sep 29, 2020 at 4:59 PM Guozhang Wang  wrote:

> +1 again on the KIP.
>
> On Tue, Sep 29, 2020 at 1:51 PM Leah Thomas  wrote:
>
> > Hey Walker,
> >
> > Thanks for the KIP! I'm +1, non-binding.
> >
> > Cheers,
> > Leah
> >
> > On Tue, Sep 29, 2020 at 1:56 PM Walker Carlson 
> > wrote:
> >
> > > Hello all,
> > >
> > > I made some changes to the KIP the descriptions are on the discussion
> > > thread. If you have already voted I would ask you to confirm your vote.
> > >
> > > Otherwise please vote so we can get this feature in.
> > >
> > > Thanks,
> > > Walker
> > >
> > > On Thu, Sep 24, 2020 at 4:36 PM John Roesler 
> > wrote:
> > >
> > > > Thanks for the KIP, Walker!
> > > >
> > > > I’m +1 (binding)
> > > >
> > > > -John
> > > >
> > > > On Mon, Sep 21, 2020, at 17:04, Guozhang Wang wrote:
> > > > > Thanks for finalizing the KIP. +1 (binding)
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Sep 21, 2020 at 1:38 PM Walker Carlson <
> > wcarl...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hello all,
> > > > > >
> > > > > > I would like to start a thread to vote for KIP-671 to add a
> method
> > to
> > > > close
> > > > > > all clients in a kafka streams application.
> > > > > >
> > > > > > KIP:
> > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown
> > > > > >
> > > > > > Discussion thread: *here
> > > > > > <
> > > > > >
> > > >
> > >
> >
> https://mail-archives.apache.org/mod_mbox/kafka-dev/202009.mbox/%3CCAC55fuh3HAGCxz-PzxTJraczy6T-os2oiCV328PBeuJQSVYASg%40mail.gmail.com%3E
> > > > > > >*
> > > > > >
> > > > > > Thanks,
> > > > > > -Walker
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-653: Upgrade log4j to log4j2

2020-09-29 Thread Dongjin Lee
Hi All,

As you can see in the PR, I eliminated all compatibility breaks caused by
the root logger name change between log4j and log4j2. (i.e., "root" → "")
Plus, I rebased the PR onto the latest trunk, with migrating raft module
into log4j2.

Please have a look. And please note that now we have limited time only.

Thanks,
Dongjin

On Tue, Sep 29, 2020 at 1:07 AM Dongjin Lee  wrote:

> > 3. ... For the same reason I call that a bug, I think the description in
> the KIP is
> incorrect.
>
> Agree, the description in the KIP is written before you open a PR (
> https://github.com/apache/kafka/pull/9266) - As you remember, I am
> participating the review. I think it is a bug and should be fixed. (And it
> seems like it will be.)
>
> > In any case I think some careful testing to ensure compatibility would
> be very beneficial.
>
> Yes, I am now adding some additional verifications to make sure. It is
> almost done, and I will update the KIP as soon as I complete them.
>
> Don't hesitate to give me additional comments if it is necessary.
>
> Best,
> Dongjin
>
> On Mon, Sep 28, 2020 at 8:03 PM Tom Bentley  wrote:
>
>> Hi Dongjin,
>>
>> Sorry for the late reply.
>>
>> 1. I think translating the name "root" to "" would work fine.
>>
>> 2. The second bullet in the Connect section of the KIP seems to need some
>> translation between null and OFF, similar to the name translation.
>>
>> 3. The third bullet seems to be about logger inheritance. As you know, I
>> have an open PR (https://github.com/apache/kafka/pull/9266) to fix a bug
>> where the broker/connect reports the root logger's level rather than
>> respecting the actual (inherited) level of a logger in the hierarchy. For
>> the same reason I call that a bug, I think the description in the KIP is
>> incorrect. The behaviour change described would seem to be incompatible
>> whether the PR was merged or not.
>>
>> I'm not an expert in log4j, but my understanding is as follows:
>>
>> * In original log4j, a logger and its configuration were both represented
>> by a Logger instance. A Logger could be instantiated and configured
>> according to the config file (or programmatically). If it was created by a
>> call to the LogManager (e.g. in order to log something) its configuration
>> would be inherited. This meant there was only one place to look for a
>> loggers level: The Logger itself. This meant that getting or setting a
>> logger's level was easy.
>>
>> * In log4j2 a LoggerConfig (the thing created by the config file) is a
>> separate thing from the Logger (the thing on which you call warn(),
>> debug()
>> etc) itself and I think this makes it harder to provide compatibility with
>> the log4j v1 behaviour for things like getting a logger's level, because
>> AFAIK log4j2 doesn't provide a convenient API for doing so. Instead when
>> finding a logger's level you have to look for both a LoggerConfig and a
>> Logger, because the level could be set in either. This is all based on
>> what
>> I learned when I was looking at the log4j2 switch (before I knew you were
>> already looking at it, if you recall). I have some code from then [1]
>> which
>> may be of use though it's in a bit of a rough state. In any case I think
>> some careful testing to ensure compatibility would be very beneficial.
>>
>> Kind regards,
>>
>> Tom
>>
>> [1]:
>>
>> https://github.com/tombentley/kafka/blob/KAFKA-1368-log4j2/core/src/main/scala/kafka/utils/Log4j2Controller.scala
>>
>>
>>
>>
>>
>> On Wed, Sep 23, 2020 at 1:50 PM Dongjin Lee  wrote:
>>
>> > Hi Tom,
>> >
>> > Thanks for the detailed analysis. Recently, I was also thinking about
>> API
>> > compatibility. I initially thought that the difference between the root
>> > logger name would break the compatibility (as the KIP states), it seems
>> > like I found a workaround:
>> >
>> > 1. When the user requests arrive, regard the logger name 'root' as an
>> empty
>> > string. (i.e., translate the request into the log4j2 equivalent.)
>> > 2. When generating the response, change the logger name '' into 'root'.
>> > (i.e., translate the response into the log4j equivalent.)
>> > 3. Remove (or make reverse) the workaround above when we make log4j2
>> > default.
>> >
>> > In short, it seems like we can handle the API incompatibility
>> introduced by
>> > the root logger name change by adding a facade.
>> >
>> > How do you think?
>> >
>> > Thanks,
>> > Dongjin
>> >
>> > On Wed, Sep 23, 2020 at 7:36 PM Tom Bentley 
>> wrote:
>> >
>> > > Hi Dongjin,
>> > >
>> > > I'd like to see this feature, but if I understand correctly, the KIP
>> in
>> > its
>> > > current form breaks a couple of Kafka APIs. For Kafka Connect it says
>> > "From
>> > > log4j2, the name of the root logger becomes empty string from 'root'.
>> It
>> > > impacts Kafka connect's dynamic logging control feature. (And should
>> be
>> > > documented.)". This seems to imply that any tooling that a user might
>> > have
>> > > written about logging in Kafka Connect will break because the client

Build failed in Jenkins: Kafka » kafka-trunk-jdk8 #98

2020-09-29 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10479; Throw exception if users try to update non-reconfigurable 
configs of existing listeners (#9284)


--
[...truncated 6.65 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDri

Jenkins build is back to normal : Kafka » kafka-trunk-jdk15 #132

2020-09-29 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread Paul Whalen
Looks pretty good to me, though the Processor#process(Record,
Optional) signature caught my eye.  There's some debate (
https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments)
about whether to use Optionals in arguments, and while that's a bit of a
religious debate in the abstract, it did make me wonder whether it makes
sense in this specific case.  When is it actually not present?  I was under
the impression that we should always have access to it in process(), and
that the concern about metadata being undefined was about having access to
record metadata in the ProcessorContext held for use inside a Punctuator.

If that's not the case and it is truly optional in process(), is there an
opportunity for an alternate interface for the cases when we don't get it,
rather than force the branching on implementers of the interface?

Apologies if I've missed something, I took a look at the PR and I didn't
see any spots where I thought it would be empty.  Perhaps an example of a
Punctuator using (and not using) the new API would clear things up.

Best,
Paul

On Tue, Sep 29, 2020 at 4:10 PM John Roesler  wrote:

> Hello again, all,
>
> Thanks for the latest round of discussion. I've taken the
> recent feedback and come up with an updated KIP that seems
> actually quite a bit nicer than the prior proposal.
>
> The specific diff on the KIP is here:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14
>
> These changes are implemented in this POC PR:
> https://github.com/apache/kafka/pull/9346
>
> The basic idea is that, building on the recent conversaion,
> we would transition away from the current API where we get
> only key/value in the process() method and other "data"
> comes in the ProcessorContext along with the "metadata".
>
> Instead, we formalize what is "data" and what is "metadata",
> and pass it all in to the process method:
> Processor#process(Record, Optional)
>
> Also, you forward the whole data class instead of mutating
> the ProcessorContext fields and also calling forward:
> ProcessorContext#forward(Record)
>
> The Record class itself ships with methods like
> record#withValue(NewV newValue)
> that make a shallow copy of the input Record, enabling
> Processors to safely handle the record without polluting the
> context of their parents and siblings.
>
> This proposal has a number of key benefits:
> * As we've discovered in KAFKA-9584, it's unsafe to mutate
> the Headers via the ProcessorContext. This proposal offers a
> way to safely forward changes only to downstream processors.
> * The new API has symmetry (each processor's input is the
> output of its parent processor)
> * The API makes clear that the record metadata isn't always
> defined (for example, in a punctuation, there is no current
> topic/partition/offset)
> * The API enables punctuators to forward well defined
> headers downstream, which is currently not possible.
>
> Unless their are objections, I'll go ahead and re-finalize
> this KIP and update that PR to a mergeable state.
>
> Thanks, all,
> -John
>
>
> On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote:
> > Interesting proposal. However, I am not totally convinced, because I see
> > a fundamental difference between "data" and "metadata".
> >
> > Topic/partition/offset are "metadata" in the strong sense and they are
> > immutable.
> >
> > On the other hand there is "primary" data like key and value, as well as
> > "secondary" data like timestamp and headers. The issue seems that we
> > treat "secondary data" more like metadata atm?
> >
> > Thus, promoting timestamp and headers into a first class citizen roll
> > make sense to me (my original proposal about `RecordContext` would still
> > fall short with this regard). However, putting both (data and metadata)
> > into a `Record` abstraction might go too far?
> >
> > I am also a little bit concerned about `Record.copy()` because it might
> > be a trap: Users might assume it does a full deep copy of the record,
> > however, it would not. It would only create a new `Record` object as
> > wrapper that points to the same key/value/header objects as the input
> > record.
> >
> > With the current `context.forward(key, value)` we don't have this "deep
> > copy" issue -- it's pretty clear what is happening.
> >
> > Instead of `To.all().withTimestamp()` we could also add
> > `context.forward(key, value, timestamp)` etc (just wondering about the
> > exposition in overload)?
> >
> > Also, `Record.withValue` etc sounds odd? Should a record not be
> > immutable? So, we could have something like
> >
> >
> `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`.
> > But it looks rather verbose?
> >
> > The other question is of course, to what extend to we want to keep the
> > distinction between "primary" and "secondary" data? To me, it's a
> > question of easy of use?
> >
> > Just putting all this

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-09-29 Thread Jose Garcia Sancio
Hi Jun and Colin,

Some comments below.

> 62.3 We added some configs in KIP-595 prefixed with "quorum" and we plan to
> add some controller specific configs prefixed with "controller". KIP-630
> plans to add some other controller specific configs with no prefix. Should
> we standardize all controller specific configs with the same prefix?

I agree that consistency in all of the new properties is really
important to improve the user's experience with Kafka. After some
discussion in the review of KIP-630, the configuration names start
with "metadata".

metadata.snapshot.min.cleanable.ratio
metadata.lbo.lag.time.max.ms

The reason for the change is that this configuration affects both the
controller component and the metadata cache component as users of the
"cluster metadata" topic partition. I think the new names matches
Kafka's existing configuration pattern for the transaction and
consumer offset topic partitions.

Thanks!
-- 
-Jose


Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-09-29 Thread Jason Gustafson
Hey Colin,

Thanks for the hard work on this proposal.

I'm gradually coming over to the idea of the controllers having separate
IDs. One of the benefits is that it allows us to separate the notion of
controller liveness from broker liveness, which has always been a tricky
detail. I think it's fair to say that the liveness of the controller is
tracked through the Raft protocol while the liveness of the broker is
tracked by the heartbeating mechanism in this KIP. This saves from having
to deal with tricky cases like electing a controller which is not actually
live from the perspective of heartbeating. I suspect you are right that it
will be simpler internally if we treat them distinctly, though it will take
some getting used to for users. It also seemingly makes it easier to
migrate to a dedicated controller setup once a cluster gets large enough
for it.

With that said, one detail which is not very clear to me is how these two
IDs interact with the metadata quorum. Let's say we have a broker which is
running as both a broker and a controller. I think it must send fetches
using `controller.id` so that the leader can track its progress and it can
be eligible for election. Will it also fetch using `broker.id`? If we could
avoid replicating the metadata twice, that would be preferable, but it
seems like that will break the isolation between the controller and broker
at some level.

There is another option I was thinking about for the sake of discussion.
Suppose we say that controllers always persist the metadata log and brokers
never do. A broker would always start from the latest snapshot, but we can
allow it to fetch from the "nearest" controller (which might be local if
`process.roles` is set to both controller and broker). If users want to
have the metadata log replicated to all nodes, then they can make each node
a controller. It's fine to have controllers that are not voters since they
can be observers. They replicate and persist the metadata log, but they
do not take part in elections, though they would be available for observer
promotion when we get around to completing the raft reassignment work.

On a related note, I'm still slightly in favor of unifying the controller
listener into the existing `listeners` configuration. I think we would
agree that separating the controller listener should at least be considered
a best practice in a secure configuration, but I am not sure about the case
for mandating it. I'm sympathetic to the idea that we should be opinionated
about this, but it would be helpful if the KIP documented what exactly we
are getting out of it. My concern is basically that it hurts usability.

By the way, in KIP-595 we tended to favor `quorum` as the prefix for
configurations related to the management of the metadata quorum. None of
these configs have been exposed yet, so we can still change them. I think
`controller` is probably more descriptive, so if you're in agreement, we
can amend KIP-595 so that it uses the `controller` prefix. However, I think
it's important to redefine `controller.connect` so that it is an explicit
definition of the set of metadata voters. Right now it reads more like a
bootstrap url. Since we decided to take dynamic quorum configuration out of
the initial version, we need the voter set to be defined explicitly in
static configuration. We used `quorum.voters` in KIP-595.

Best,
Jason

On Tue, Sep 29, 2020 at 5:21 PM Jose Garcia Sancio 
wrote:

> Hi Jun and Colin,
>
> Some comments below.
>
> > 62.3 We added some configs in KIP-595 prefixed with "quorum" and we plan
> to
> > add some controller specific configs prefixed with "controller". KIP-630
> > plans to add some other controller specific configs with no prefix.
> Should
> > we standardize all controller specific configs with the same prefix?
>
> I agree that consistency in all of the new properties is really
> important to improve the user's experience with Kafka. After some
> discussion in the review of KIP-630, the configuration names start
> with "metadata".
>
> metadata.snapshot.min.cleanable.ratio
> metadata.lbo.lag.time.max.ms
>
> The reason for the change is that this configuration affects both the
> controller component and the metadata cache component as users of the
> "cluster metadata" topic partition. I think the new names matches
> Kafka's existing configuration pattern for the transaction and
> consumer offset topic partitions.
>
> Thanks!
> --
> -Jose
>


Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread John Roesler
Thanks for the review, Paul!

I had read some of that debate before. There seems to be some subtext there, 
because they advise against using Optional in cases like this, but there 
doesn’t seem to be a specific reason why it’s inappropriate. I got the 
impression they were just afraid that people would go overboard and make 
everything Optional. 

I could also make two methods, but it seemed like it might be an unfortunate 
way to handle the issue, since Processor is just about a Function as-is, but 
the two-method approach would require people to implement both methods.

To your question, this is something that’s only recently became clear to me. 
Imagine you have a parent processor that calls forward both from process and a 
punctuator. The child will have process() invoked in both cases, and won’t be 
able to distinguish them. However, the record metadata is only defined when the 
parent forwards while processing a real record, not when it calls forward from 
the punctuator.

This is why I wanted to make the metadata Optional, to advertise that the 
metadata might be undefined if any ancestor processor ever calls forward from a 
punctuator. We could remove the Optional and instead just document that the 
argument might be null.

With that context in place, what’s your take?

Thanks,
John

On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote:
> Looks pretty good to me, though the Processor#process(Record,
> Optional) signature caught my eye.  There's some debate 
> (
> https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments)
> about whether to use Optionals in arguments, and while that's a bit of a
> religious debate in the abstract, it did make me wonder whether it makes
> sense in this specific case.  When is it actually not present?  I was 
> under
> the impression that we should always have access to it in process(), and
> that the concern about metadata being undefined was about having access 
> to
> record metadata in the ProcessorContext held for use inside a 
> Punctuator.
> 
> If that's not the case and it is truly optional in process(), is there an
> opportunity for an alternate interface for the cases when we don't get it,
> rather than force the branching on implementers of the interface?
> 
> Apologies if I've missed something, I took a look at the PR and I didn't
> see any spots where I thought it would be empty.  Perhaps an example of a
> Punctuator using (and not using) the new API would clear things up.
> 
> Best,
> Paul
> 
> On Tue, Sep 29, 2020 at 4:10 PM John Roesler  wrote:
> 
> > Hello again, all,
> >
> > Thanks for the latest round of discussion. I've taken the
> > recent feedback and come up with an updated KIP that seems
> > actually quite a bit nicer than the prior proposal.
> >
> > The specific diff on the KIP is here:
> >
> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14
> >
> > These changes are implemented in this POC PR:
> > https://github.com/apache/kafka/pull/9346
> >
> > The basic idea is that, building on the recent conversaion,
> > we would transition away from the current API where we get
> > only key/value in the process() method and other "data"
> > comes in the ProcessorContext along with the "metadata".
> >
> > Instead, we formalize what is "data" and what is "metadata",
> > and pass it all in to the process method:
> > Processor#process(Record, Optional)
> >
> > Also, you forward the whole data class instead of mutating
> > the ProcessorContext fields and also calling forward:
> > ProcessorContext#forward(Record)
> >
> > The Record class itself ships with methods like
> > record#withValue(NewV newValue)
> > that make a shallow copy of the input Record, enabling
> > Processors to safely handle the record without polluting the
> > context of their parents and siblings.
> >
> > This proposal has a number of key benefits:
> > * As we've discovered in KAFKA-9584, it's unsafe to mutate
> > the Headers via the ProcessorContext. This proposal offers a
> > way to safely forward changes only to downstream processors.
> > * The new API has symmetry (each processor's input is the
> > output of its parent processor)
> > * The API makes clear that the record metadata isn't always
> > defined (for example, in a punctuation, there is no current
> > topic/partition/offset)
> > * The API enables punctuators to forward well defined
> > headers downstream, which is currently not possible.
> >
> > Unless their are objections, I'll go ahead and re-finalize
> > this KIP and update that PR to a mergeable state.
> >
> > Thanks, all,
> > -John
> >
> >
> > On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote:
> > > Interesting proposal. However, I am not totally convinced, because I see
> > > a fundamental difference between "data" and "metadata".
> > >
> > > Topic/partition/offset are "metadata" in the strong sense and they are
> > > immutable.
> > >
> > > On 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread John Roesler
Oh, I guess one other thing I should have mentioned is that I’ve recently 
discovered that in cases where the context is undefined, we currently just fill 
in dummy values for the context. So there’s a good chance that real 
applications in use are depending on undefined context without even realizing 
it. What I’m hoping to do is just make the situation explicit and get rid of 
the dummy values. 

Thanks,
John

On Tue, Sep 29, 2020, at 20:01, John Roesler wrote:
> Thanks for the review, Paul!
> 
> I had read some of that debate before. There seems to be some subtext 
> there, because they advise against using Optional in cases like this, 
> but there doesn’t seem to be a specific reason why it’s inappropriate. 
> I got the impression they were just afraid that people would go 
> overboard and make everything Optional. 
> 
> I could also make two methods, but it seemed like it might be an 
> unfortunate way to handle the issue, since Processor is just about a 
> Function as-is, but the two-method approach would require people to 
> implement both methods.
> 
> To your question, this is something that’s only recently became clear 
> to me. Imagine you have a parent processor that calls forward both from 
> process and a punctuator. The child will have process() invoked in both 
> cases, and won’t be able to distinguish them. However, the record 
> metadata is only defined when the parent forwards while processing a 
> real record, not when it calls forward from the punctuator.
> 
> This is why I wanted to make the metadata Optional, to advertise that 
> the metadata might be undefined if any ancestor processor ever calls 
> forward from a punctuator. We could remove the Optional and instead 
> just document that the argument might be null.
> 
> With that context in place, what’s your take?
> 
> Thanks,
> John
> 
> On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote:
> > Looks pretty good to me, though the Processor#process(Record,
> > Optional) signature caught my eye.  There's some debate 
> > (
> > https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments)
> > about whether to use Optionals in arguments, and while that's a bit of a
> > religious debate in the abstract, it did make me wonder whether it makes
> > sense in this specific case.  When is it actually not present?  I was 
> > under
> > the impression that we should always have access to it in process(), and
> > that the concern about metadata being undefined was about having access 
> > to
> > record metadata in the ProcessorContext held for use inside a 
> > Punctuator.
> > 
> > If that's not the case and it is truly optional in process(), is there an
> > opportunity for an alternate interface for the cases when we don't get it,
> > rather than force the branching on implementers of the interface?
> > 
> > Apologies if I've missed something, I took a look at the PR and I didn't
> > see any spots where I thought it would be empty.  Perhaps an example of a
> > Punctuator using (and not using) the new API would clear things up.
> > 
> > Best,
> > Paul
> > 
> > On Tue, Sep 29, 2020 at 4:10 PM John Roesler  wrote:
> > 
> > > Hello again, all,
> > >
> > > Thanks for the latest round of discussion. I've taken the
> > > recent feedback and come up with an updated KIP that seems
> > > actually quite a bit nicer than the prior proposal.
> > >
> > > The specific diff on the KIP is here:
> > >
> > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14
> > >
> > > These changes are implemented in this POC PR:
> > > https://github.com/apache/kafka/pull/9346
> > >
> > > The basic idea is that, building on the recent conversaion,
> > > we would transition away from the current API where we get
> > > only key/value in the process() method and other "data"
> > > comes in the ProcessorContext along with the "metadata".
> > >
> > > Instead, we formalize what is "data" and what is "metadata",
> > > and pass it all in to the process method:
> > > Processor#process(Record, Optional)
> > >
> > > Also, you forward the whole data class instead of mutating
> > > the ProcessorContext fields and also calling forward:
> > > ProcessorContext#forward(Record)
> > >
> > > The Record class itself ships with methods like
> > > record#withValue(NewV newValue)
> > > that make a shallow copy of the input Record, enabling
> > > Processors to safely handle the record without polluting the
> > > context of their parents and siblings.
> > >
> > > This proposal has a number of key benefits:
> > > * As we've discovered in KAFKA-9584, it's unsafe to mutate
> > > the Headers via the ProcessorContext. This proposal offers a
> > > way to safely forward changes only to downstream processors.
> > > * The new API has symmetry (each processor's input is the
> > > output of its parent processor)
> > > * The API makes clear that the record metadata isn't always
> > > defined (for exa

Re: [VOTE] KIP-671: Add method to Shutdown entire Streams Application

2020-09-29 Thread Matthias J. Sax
Thanks Walker. The proposed API changes LGTM.

+1 (binding)

One minor nit: you should also mention the global-thread that also needs
to be shutdown if requested by the user.

Minor side question: should we actually terminate a thread and create a
new one, or instead revive the existing thread (reusing its existing ID)?


-Matthias

On 9/29/20 2:39 PM, Bill Bejeck wrote:
> Thanks for the KIP Walker.
> 
> +1 (binding)
> 
> -Bill
> 
> On Tue, Sep 29, 2020 at 4:59 PM Guozhang Wang  wrote:
> 
>> +1 again on the KIP.
>>
>> On Tue, Sep 29, 2020 at 1:51 PM Leah Thomas  wrote:
>>
>>> Hey Walker,
>>>
>>> Thanks for the KIP! I'm +1, non-binding.
>>>
>>> Cheers,
>>> Leah
>>>
>>> On Tue, Sep 29, 2020 at 1:56 PM Walker Carlson 
>>> wrote:
>>>
 Hello all,

 I made some changes to the KIP the descriptions are on the discussion
 thread. If you have already voted I would ask you to confirm your vote.

 Otherwise please vote so we can get this feature in.

 Thanks,
 Walker

 On Thu, Sep 24, 2020 at 4:36 PM John Roesler 
>>> wrote:

> Thanks for the KIP, Walker!
>
> I’m +1 (binding)
>
> -John
>
> On Mon, Sep 21, 2020, at 17:04, Guozhang Wang wrote:
>> Thanks for finalizing the KIP. +1 (binding)
>>
>>
>> Guozhang
>>
>> On Mon, Sep 21, 2020 at 1:38 PM Walker Carlson <
>>> wcarl...@confluent.io>
>> wrote:
>>
>>> Hello all,
>>>
>>> I would like to start a thread to vote for KIP-671 to add a
>> method
>>> to
> close
>>> all clients in a kafka streams application.
>>>
>>> KIP:
>>>
>>>
>

>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown
>>>
>>> Discussion thread: *here
>>> <
>>>
>

>>>
>> https://mail-archives.apache.org/mod_mbox/kafka-dev/202009.mbox/%3CCAC55fuh3HAGCxz-PzxTJraczy6T-os2oiCV328PBeuJQSVYASg%40mail.gmail.com%3E
 *
>>>
>>> Thanks,
>>> -Walker
>>>
>>
>>
>> --
>> -- Guozhang
>>
>

>>>
>>
>>
>> --
>> -- Guozhang
>>
> 


Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread Sophie Blee-Goldman
>
> However, the record metadata is only defined when the parent forwards
> while processing a

real record, not when it calls forward from the punctuator


Can we take a step back for a second...why wouldn't you be required to set
the RecordContext
yourself when calling forward from a Punctuator? I think I agree with Paul
here, it seems kind of
absurd not to enforce that the RecordContext be present inside the
process() method.

The original problem with Punctuators, as I understood it, was that all of
the RecordContext
fields were exposed automatically to both the Processor and any Punctuator,
due to being
direct methods on the ProcessorContext. We can't control which
ProcessorContext methods
someone will call from with a Punctuator vs from a Processor. The best we
could do was
set these "nonsense" fields to null when inside a Punctuator, or set them
to some dummy
values as you pointed out.

But then you proposed the solution of a separate RecordContext which is not
attached to the
ProcessorContext at all. This seemed to solve the above problem very
neatly: we only pass
in the RecordContext to the process() method, so we don't have to worry
about people trying
to access these fields from within a Punctuator. The fields aren't
accessible unless they're
defined.

So what happens when someone wants to forward something from within a
Punctuator? I
don't think it's reasonable to let the timestamp field be undefined, ever.
What if the Punctuator
forwards directly to a sink, or directly to some windowing logic. Are we
supposed to add
handling for the RecordContext == null case to every processor? Or are we
just going to
assume the implicit restriction that users will only forward records from a
Punctuator to
downstream processors that know how to handle and/or set the RecordContext
if it's
undefined. That seems to throw away a lot of the awesome safety added in
this KIP

Apologies for the rant. But I feel pretty strongly that allowing to forward
records from a
Punctuator without a defined RecordContext would be asking for trouble.
Imo, if you
want to forward from a Punctuator, you need to store the info you need in
order to
set the timestamp, or make one up yourself

(the one alternative I can think of here is that maybe we could pass in the
current
partition time, so users can at least put in a reasonable estimate for the
timestamp
that won't cause it to get dropped and won't potentially lurch the
streamtime far into
the future. This would be similar to what we do in the TimestampExtractor)

On Tue, Sep 29, 2020 at 6:06 PM John Roesler  wrote:

> Oh, I guess one other thing I should have mentioned is that I’ve recently
> discovered that in cases where the context is undefined, we currently just
> fill in dummy values for the context. So there’s a good chance that real
> applications in use are depending on undefined context without even
> realizing it. What I’m hoping to do is just make the situation explicit and
> get rid of the dummy values.
>
> Thanks,
> John
>
> On Tue, Sep 29, 2020, at 20:01, John Roesler wrote:
> > Thanks for the review, Paul!
> >
> > I had read some of that debate before. There seems to be some subtext
> > there, because they advise against using Optional in cases like this,
> > but there doesn’t seem to be a specific reason why it’s inappropriate.
> > I got the impression they were just afraid that people would go
> > overboard and make everything Optional.
> >
> > I could also make two methods, but it seemed like it might be an
> > unfortunate way to handle the issue, since Processor is just about a
> > Function as-is, but the two-method approach would require people to
> > implement both methods.
> >
> > To your question, this is something that’s only recently became clear
> > to me. Imagine you have a parent processor that calls forward both from
> > process and a punctuator. The child will have process() invoked in both
> > cases, and won’t be able to distinguish them. However, the record
> > metadata is only defined when the parent forwards while processing a
> > real record, not when it calls forward from the punctuator.
> >
> > This is why I wanted to make the metadata Optional, to advertise that
> > the metadata might be undefined if any ancestor processor ever calls
> > forward from a punctuator. We could remove the Optional and instead
> > just document that the argument might be null.
> >
> > With that context in place, what’s your take?
> >
> > Thanks,
> > John
> >
> > On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote:
> > > Looks pretty good to me, though the Processor#process(Record,
> > > Optional) signature caught my eye.  There's some
> debate
> > > (
> > >
> https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments
> )
> > > about whether to use Optionals in arguments, and while that's a bit of
> a
> > > religious debate in the abstract, it did make me wonder whether it
> makes
> > > sense in this specific case.  When is it actu

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread John Roesler
Thanks for the reply, Sophie,

I think I may have summarized too much in my prior reply.

In the currently proposed KIP, any caller of forward() must
supply a Record, which consists of:
* key
* value
* timestamp
* headers (with a convenience constructor that sets empty
headers)

These aren't what I was referring to as potentially being
undefined downstream, since thanks to the introduction of
Record, they are, as you're advocating, required to be
defined everywhere, even when forwarding from a punctuator.

So to be clear, the intent of this change is actually to
_enforce_ that timestamp would never be undefined (which it
currently can be). Also, since punctuators _are_ going to
have to "make up" a timestamp going forward, we should note
that the "punctuate" method currently passes in a good
timestamp that they can use: for system-time punctuations,
they receive the current system time, and for stream-time
punctuations, they get the current stream time.

The potentially undefined RecordMetadata only contains these
fields:
* topic
* partition
* offset

These fields aren't required (or even used) in a Sink, and
it doesn't seem like they would be important to many
applications. Furthermore, it doesn't _seem_ like you'd even
want to set these fields. They seem purely informational and
only useful in the context when you are actually processing
a real input record. It doesn't sound like you were asking
for it, but just to put it on the record, I think if we were
to require values for the metadata from punctuators, people
would mostly just make up their own dummy values, to no
one's benefit.

I should also note that with the current
Record/RecordMetadata split, we will have the freedom to
move fields into the Record class (or even add new fields)
if we want them to become "data" as opposed to "metadata" in
the future.

Thanks for your reply; I was similarly floored when I
realized the true nature of the current situation. Does my
reply address your concerns?

Thanks,
-John

On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
wrote:
> > However, the record metadata is only defined when the parent forwards
> > while processing a
> 
> real record, not when it calls forward from the punctuator
> 
> 
> Can we take a step back for a second...why wouldn't you be required to set
> the RecordContext
> yourself when calling forward from a Punctuator? I think I agree with Paul
> here, it seems kind of
> absurd not to enforce that the RecordContext be present inside the
> process() method.
> 
> The original problem with Punctuators, as I understood it, was that all of
> the RecordContext
> fields were exposed automatically to both the Processor and any Punctuator,
> due to being
> direct methods on the ProcessorContext. We can't control which
> ProcessorContext methods
> someone will call from with a Punctuator vs from a Processor. The best we
> could do was
> set these "nonsense" fields to null when inside a Punctuator, or set them
> to some dummy
> values as you pointed out.
> 
> But then you proposed the solution of a separate RecordContext which is not
> attached to the
> ProcessorContext at all. This seemed to solve the above problem very
> neatly: we only pass
> in the RecordContext to the process() method, so we don't have to worry
> about people trying
> to access these fields from within a Punctuator. The fields aren't
> accessible unless they're
> defined.
> 
> So what happens when someone wants to forward something from within a
> Punctuator? I
> don't think it's reasonable to let the timestamp field be undefined, ever.
> What if the Punctuator
> forwards directly to a sink, or directly to some windowing logic. Are we
> supposed to add
> handling for the RecordContext == null case to every processor? Or are we
> just going to
> assume the implicit restriction that users will only forward records from a
> Punctuator to
> downstream processors that know how to handle and/or set the RecordContext
> if it's
> undefined. That seems to throw away a lot of the awesome safety added in
> this KIP
> 
> Apologies for the rant. But I feel pretty strongly that allowing to forward
> records from a
> Punctuator without a defined RecordContext would be asking for trouble.
> Imo, if you
> want to forward from a Punctuator, you need to store the info you need in
> order to
> set the timestamp, or make one up yourself
> 
> (the one alternative I can think of here is that maybe we could pass in the
> current
> partition time, so users can at least put in a reasonable estimate for the
> timestamp
> that won't cause it to get dropped and won't potentially lurch the
> streamtime far into
> the future. This would be similar to what we do in the TimestampExtractor)
> 
> On Tue, Sep 29, 2020 at 6:06 PM John Roesler  wrote:
> 
> > Oh, I guess one other thing I should have mentioned is that I’ve recently
> > discovered that in cases where the context is undefined, we currently just
> > fill in dummy values for the context. So there’s a go

[DISCUSS] Release Deadlines

2020-09-29 Thread Matthias J. Sax
Hi,

when we introduced time based releases, we added certain deadlines to
streamline the release process and to make sure we can ship the release
on time. Based on early experience, we adjusted those deadlines and
introduced new deadlines which improved the situation.

However, we still have the issue that it often takes very long to
stabilize a release branch and the release was delayed by several weeks.

Thus, I am wondering if we should adjust those deadlines again.
Currently, we have

 - KIP freeze
 - Feature freeze (+1 week)
 - Code freeze (+2 weeks)
 - Target release date (+2 weeks)

I would like to propose to extend the deadlines as follows:

 - KIP freeze
 - Feature freeze (+2 weeks)
 - Code freeze (+2 weeks)
 - Target release date (+3 weeks)

This would give us 2 more weeks. Note, that we would not put the target
release date 2 week later, but put KIP freeze 2 weeks earlier.

It does of course not come for free. In particular, having 2 weeks
(instead of 1 week) between feature freeze and code freeze implies a
longer period when PR needs to be double committed. However, from my
personal experience, I don't think that this burden on committers it too
high.

Looking forward to your feedback.


-Matthias


Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread Sophie Blee-Goldman
>
> Does my reply address your concerns?


Yes; also, I definitely misread part of the proposal earlier and thought
you had put
the timestamp field in RecordMetadata. Sorry for not giving things a closer
look
before responding! I'm not sure my original message made much sense given
the misunderstanding, but thanks for responding anyway :P

Having given the proposal a second pass, I agree, it's very elegant. +1

On Tue, Sep 29, 2020 at 6:50 PM John Roesler  wrote:

> Thanks for the reply, Sophie,
>
> I think I may have summarized too much in my prior reply.
>
> In the currently proposed KIP, any caller of forward() must
> supply a Record, which consists of:
> * key
> * value
> * timestamp
> * headers (with a convenience constructor that sets empty
> headers)
>
> These aren't what I was referring to as potentially being
> undefined downstream, since thanks to the introduction of
> Record, they are, as you're advocating, required to be
> defined everywhere, even when forwarding from a punctuator.
>
> So to be clear, the intent of this change is actually to
> _enforce_ that timestamp would never be undefined (which it
> currently can be). Also, since punctuators _are_ going to
> have to "make up" a timestamp going forward, we should note
> that the "punctuate" method currently passes in a good
> timestamp that they can use: for system-time punctuations,
> they receive the current system time, and for stream-time
> punctuations, they get the current stream time.
>
> The potentially undefined RecordMetadata only contains these
> fields:
> * topic
> * partition
> * offset
>
> These fields aren't required (or even used) in a Sink, and
> it doesn't seem like they would be important to many
> applications. Furthermore, it doesn't _seem_ like you'd even
> want to set these fields. They seem purely informational and
> only useful in the context when you are actually processing
> a real input record. It doesn't sound like you were asking
> for it, but just to put it on the record, I think if we were
> to require values for the metadata from punctuators, people
> would mostly just make up their own dummy values, to no
> one's benefit.
>
> I should also note that with the current
> Record/RecordMetadata split, we will have the freedom to
> move fields into the Record class (or even add new fields)
> if we want them to become "data" as opposed to "metadata" in
> the future.
>
> Thanks for your reply; I was similarly floored when I
> realized the true nature of the current situation. Does my
> reply address your concerns?
>
> Thanks,
> -John
>
> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
> wrote:
> > > However, the record metadata is only defined when the parent forwards
> > > while processing a
> >
> > real record, not when it calls forward from the punctuator
> >
> >
> > Can we take a step back for a second...why wouldn't you be required to
> set
> > the RecordContext
> > yourself when calling forward from a Punctuator? I think I agree with
> Paul
> > here, it seems kind of
> > absurd not to enforce that the RecordContext be present inside the
> > process() method.
> >
> > The original problem with Punctuators, as I understood it, was that all
> of
> > the RecordContext
> > fields were exposed automatically to both the Processor and any
> Punctuator,
> > due to being
> > direct methods on the ProcessorContext. We can't control which
> > ProcessorContext methods
> > someone will call from with a Punctuator vs from a Processor. The best we
> > could do was
> > set these "nonsense" fields to null when inside a Punctuator, or set them
> > to some dummy
> > values as you pointed out.
> >
> > But then you proposed the solution of a separate RecordContext which is
> not
> > attached to the
> > ProcessorContext at all. This seemed to solve the above problem very
> > neatly: we only pass
> > in the RecordContext to the process() method, so we don't have to worry
> > about people trying
> > to access these fields from within a Punctuator. The fields aren't
> > accessible unless they're
> > defined.
> >
> > So what happens when someone wants to forward something from within a
> > Punctuator? I
> > don't think it's reasonable to let the timestamp field be undefined,
> ever.
> > What if the Punctuator
> > forwards directly to a sink, or directly to some windowing logic. Are we
> > supposed to add
> > handling for the RecordContext == null case to every processor? Or are we
> > just going to
> > assume the implicit restriction that users will only forward records
> from a
> > Punctuator to
> > downstream processors that know how to handle and/or set the
> RecordContext
> > if it's
> > undefined. That seems to throw away a lot of the awesome safety added in
> > this KIP
> >
> > Apologies for the rant. But I feel pretty strongly that allowing to
> forward
> > records from a
> > Punctuator without a defined RecordContext would be asking for trouble.
> > Imo, if you
> > want to forward from a Punctuator, you need to store the 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread Sophie Blee-Goldman
FWIW, while I'm really not a fan of Optional in general, I agree that its
usage
here seems appropriate. Even for those rare software developers who
carefully
read all the docs several times over, I think it wouldn't be too hard to
miss a
note about the RecordMetadata possibly being null.

Especially because it's not that obvious why at first glance, and takes a
bit of
thinking to realize that records originating from a Punctuator wouldn't
have a
"current record". This  is something that has definitely confused users
today.

It's on us to improve the education here -- and an Optional
would naturally raise awareness of this subtlety

On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman 
wrote:

> Does my reply address your concerns?
>
>
> Yes; also, I definitely misread part of the proposal earlier and thought
> you had put
> the timestamp field in RecordMetadata. Sorry for not giving things a
> closer look
> before responding! I'm not sure my original message made much sense given
> the misunderstanding, but thanks for responding anyway :P
>
> Having given the proposal a second pass, I agree, it's very elegant. +1
>
> On Tue, Sep 29, 2020 at 6:50 PM John Roesler  wrote:
>
>> Thanks for the reply, Sophie,
>>
>> I think I may have summarized too much in my prior reply.
>>
>> In the currently proposed KIP, any caller of forward() must
>> supply a Record, which consists of:
>> * key
>> * value
>> * timestamp
>> * headers (with a convenience constructor that sets empty
>> headers)
>>
>> These aren't what I was referring to as potentially being
>> undefined downstream, since thanks to the introduction of
>> Record, they are, as you're advocating, required to be
>> defined everywhere, even when forwarding from a punctuator.
>>
>> So to be clear, the intent of this change is actually to
>> _enforce_ that timestamp would never be undefined (which it
>> currently can be). Also, since punctuators _are_ going to
>> have to "make up" a timestamp going forward, we should note
>> that the "punctuate" method currently passes in a good
>> timestamp that they can use: for system-time punctuations,
>> they receive the current system time, and for stream-time
>> punctuations, they get the current stream time.
>>
>> The potentially undefined RecordMetadata only contains these
>> fields:
>> * topic
>> * partition
>> * offset
>>
>> These fields aren't required (or even used) in a Sink, and
>> it doesn't seem like they would be important to many
>> applications. Furthermore, it doesn't _seem_ like you'd even
>> want to set these fields. They seem purely informational and
>> only useful in the context when you are actually processing
>> a real input record. It doesn't sound like you were asking
>> for it, but just to put it on the record, I think if we were
>> to require values for the metadata from punctuators, people
>> would mostly just make up their own dummy values, to no
>> one's benefit.
>>
>> I should also note that with the current
>> Record/RecordMetadata split, we will have the freedom to
>> move fields into the Record class (or even add new fields)
>> if we want them to become "data" as opposed to "metadata" in
>> the future.
>>
>> Thanks for your reply; I was similarly floored when I
>> realized the true nature of the current situation. Does my
>> reply address your concerns?
>>
>> Thanks,
>> -John
>>
>> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
>> wrote:
>> > > However, the record metadata is only defined when the parent forwards
>> > > while processing a
>> >
>> > real record, not when it calls forward from the punctuator
>> >
>> >
>> > Can we take a step back for a second...why wouldn't you be required to
>> set
>> > the RecordContext
>> > yourself when calling forward from a Punctuator? I think I agree with
>> Paul
>> > here, it seems kind of
>> > absurd not to enforce that the RecordContext be present inside the
>> > process() method.
>> >
>> > The original problem with Punctuators, as I understood it, was that all
>> of
>> > the RecordContext
>> > fields were exposed automatically to both the Processor and any
>> Punctuator,
>> > due to being
>> > direct methods on the ProcessorContext. We can't control which
>> > ProcessorContext methods
>> > someone will call from with a Punctuator vs from a Processor. The best
>> we
>> > could do was
>> > set these "nonsense" fields to null when inside a Punctuator, or set
>> them
>> > to some dummy
>> > values as you pointed out.
>> >
>> > But then you proposed the solution of a separate RecordContext which is
>> not
>> > attached to the
>> > ProcessorContext at all. This seemed to solve the above problem very
>> > neatly: we only pass
>> > in the RecordContext to the process() method, so we don't have to worry
>> > about people trying
>> > to access these fields from within a Punctuator. The fields aren't
>> > accessible unless they're
>> > defined.
>> >
>> > So what happens when someone wants to forward something from within a
>> > Punctuator? I
>> > don't think

Re: [DISCUSS] KIP-598: Augment TopologyDescription with store and source / sink serde information

2020-09-29 Thread Sophie Blee-Goldman
Hey Guozhang, what's the status of this KIP?

I was recently digging through a particularly opaque Streams application and
it occurred to me that it might also be useful to print the kind of store
attached
to each node (eg RocksDBWindowStore, InMemoryKeyValueStore, custom,
etc). That made me think of this KIP so I was just wondering where it ended
up. And if you want to pick it up again, WDYT about including some minor
store information in the augmented description?

On Tue, May 19, 2020 at 1:22 PM Guozhang Wang  wrote:

> We already has a Serdes actually, which is a factory class. What we really
> need is to add new functions to `Serde`, `Serializer` and `Deserializer`
> interfaces, but since we already dropped Java7 backward compatibility may
> not be a big issue anyways, let me think about it a bit more.
>
> On Tue, May 19, 2020 at 12:01 PM Matthias J. Sax  wrote:
>
> > Thanks Guozhang.
> >
> > This makes sense. I am still wondering about wrapped serdes:
> >
> > > and if it is a wrapper serde, also print its inner
> > >>> serde name
> >
> > How can our default implementation of `TopologyDescriber` know if it's a
> > wrapped serde or not? Furthermore, how do wrapped serdes expose their
> > inner serdes?
> >
> > I am also not sure what the purpose of TopologyDescriber is? Would it
> > mabye be better to add new interface `Serdes` can implement instead?
> >
> >
> > -Matthias
> >
> >
> >
> > On 5/18/20 9:24 PM, Guozhang Wang wrote:
> > > Bruno, Matthias:
> > >
> > > Thanks for your inputs. After some thoughts I've decide to update my
> > > proposal in the following way:
> > >
> > > 1. Store#serdes() would return a "Map"
> > >
> > > 2. Topology's description would be independent of whether it is
> generated
> > > from `StreamsBuilder#build(props)` or `StreamsBuilder#build()`, and if
> > the
> > > serde is not known we would use "" as the default value.
> > >
> > > 3. Add `List TopologyDescription#sourceTopics() / sinkTopics()
> /
> > > repartitionTopics() / changelogTopics()` and for pattern /
> > topic-extractor
> > > we would use fixed format of "" and
> > > "".
> > >
> > >
> > > I will try to implement this in my existing PR and after I've confirmed
> > it
> > > works, I will update the final wiki for voting.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, May 18, 2020 at 9:13 PM Guozhang Wang 
> > wrote:
> > >
> > >> Hello Andy,
> > >>
> > >> Thanks a lot for your comments! I do not mind at all :)
> > >>
> > >> I think that's a valid point, what I have in mind is to expose an
> > >> interface which can be optionally overridden in the overridden
> > describe()
> > >> call:
> > >>
> > >> Topology#describe(final TopologyDescriber)
> > >>
> > >> Interface TopologyDescriber {
> > >>
> > >> default describeSerde(final Serde);
> > >>
> > >> default describeSerializer(final Serializer);
> > >>
> > >> default describeDeserializer(final Serializer);
> > >> }
> > >>
> > >> And we would expose a DefaultTopologyDescriber class that just print
> the
> > >> serde class names -- and if it is a wrapper serde, also print its
> inner
> > >> serde name.
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Mon, May 11, 2020 at 12:13 PM Andy Coates 
> wrote:
> > >>
> > >>> Hi Guozhang,
> > >>>
> > >>> Thanks for writing this up. I’m very interested to see this, so I
> hope
> > >>> you don’t mind me commenting.
> > >>>
> > >>> I’ve only really one comment to make, and that’s on the text printed
> > for
> > >>> the serde classes:
> > >>>
> > >>> As I understand it, the name will either come from the passed in
> > config,
> > >>> or may default to “unknown”, or may be obtained from the instances
> > passed
> > >>> while building the topology. It’s this latter case that interests me.
> > >>> Where you have an actual serde instance could we not output more
> > >>> information?
> > >>>
> > >>> The examples use simple (de)serialization classes such as
> > >>> `LongDeseriailizer` where the name alone imparts all the information
> > the
> > >>> user is likely to need. However, users may provide there own custom
> > >>> serialisers and such serialisers may contain state that is important,
> > e.g.
> > >>> the serialiser may know the schema of the data being serialized.  May
> > there
> > >>> be benefit from taking the `toString()` representation of the
> > serialiser?
> > >>>
> > >>> Of course, this would require adding suitable `toString` impls to our
> > own
> > >>> stock serialisers, but may ultimately prove more versatile in the
> > future.
> > >>> The downside is potential to corrupt the topology description, e.g. a
> > >>> toString that includes new lines etc.
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Andy
> > >>>
> > >>>
> > >>>
> >  On 4 May 2020, at 19:27, Bruno Cadonna  wrote:
> > 
> >  Hi Guozhang,
> > 
> >  Thank you for the KIP!
> > 
> >  Exposing also the inner types of the wrapper serdes would be
> >  important. For debugging as Matthias has already mentioned and to
> see
> > 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread Paul Whalen
John, I totally agree that adding a method to Processor is cumbersome and
not a good path.  I was imagining maybe a separate interface that could be
used in the appropriate context, but I don't think that makes too much
sense - it's just too far away from what Kafka Streams is.  I was
originally more interested in the "why" Optional than the "how" (I think my
original reply overplayed the "optional as an argument" concern).  But
you've convinced me that there is a perfectly legitimate "why".  We should
make sure that it's clear why it's Optional, but I suppose that goes
without saying.  It's a nice opportunity to make the API reflect more what
is actually going on under the hood.

Thanks!
Paul

On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman 
wrote:

> FWIW, while I'm really not a fan of Optional in general, I agree that its
> usage
> here seems appropriate. Even for those rare software developers who
> carefully
> read all the docs several times over, I think it wouldn't be too hard to
> miss a
> note about the RecordMetadata possibly being null.
>
> Especially because it's not that obvious why at first glance, and takes a
> bit of
> thinking to realize that records originating from a Punctuator wouldn't
> have a
> "current record". This  is something that has definitely confused users
> today.
>
> It's on us to improve the education here -- and an Optional
> would naturally raise awareness of this subtlety
>
> On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman 
> wrote:
>
> > Does my reply address your concerns?
> >
> >
> > Yes; also, I definitely misread part of the proposal earlier and thought
> > you had put
> > the timestamp field in RecordMetadata. Sorry for not giving things a
> > closer look
> > before responding! I'm not sure my original message made much sense given
> > the misunderstanding, but thanks for responding anyway :P
> >
> > Having given the proposal a second pass, I agree, it's very elegant. +1
> >
> > On Tue, Sep 29, 2020 at 6:50 PM John Roesler 
> wrote:
> >
> >> Thanks for the reply, Sophie,
> >>
> >> I think I may have summarized too much in my prior reply.
> >>
> >> In the currently proposed KIP, any caller of forward() must
> >> supply a Record, which consists of:
> >> * key
> >> * value
> >> * timestamp
> >> * headers (with a convenience constructor that sets empty
> >> headers)
> >>
> >> These aren't what I was referring to as potentially being
> >> undefined downstream, since thanks to the introduction of
> >> Record, they are, as you're advocating, required to be
> >> defined everywhere, even when forwarding from a punctuator.
> >>
> >> So to be clear, the intent of this change is actually to
> >> _enforce_ that timestamp would never be undefined (which it
> >> currently can be). Also, since punctuators _are_ going to
> >> have to "make up" a timestamp going forward, we should note
> >> that the "punctuate" method currently passes in a good
> >> timestamp that they can use: for system-time punctuations,
> >> they receive the current system time, and for stream-time
> >> punctuations, they get the current stream time.
> >>
> >> The potentially undefined RecordMetadata only contains these
> >> fields:
> >> * topic
> >> * partition
> >> * offset
> >>
> >> These fields aren't required (or even used) in a Sink, and
> >> it doesn't seem like they would be important to many
> >> applications. Furthermore, it doesn't _seem_ like you'd even
> >> want to set these fields. They seem purely informational and
> >> only useful in the context when you are actually processing
> >> a real input record. It doesn't sound like you were asking
> >> for it, but just to put it on the record, I think if we were
> >> to require values for the metadata from punctuators, people
> >> would mostly just make up their own dummy values, to no
> >> one's benefit.
> >>
> >> I should also note that with the current
> >> Record/RecordMetadata split, we will have the freedom to
> >> move fields into the Record class (or even add new fields)
> >> if we want them to become "data" as opposed to "metadata" in
> >> the future.
> >>
> >> Thanks for your reply; I was similarly floored when I
> >> realized the true nature of the current situation. Does my
> >> reply address your concerns?
> >>
> >> Thanks,
> >> -John
> >>
> >> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
> >> wrote:
> >> > > However, the record metadata is only defined when the parent
> forwards
> >> > > while processing a
> >> >
> >> > real record, not when it calls forward from the punctuator
> >> >
> >> >
> >> > Can we take a step back for a second...why wouldn't you be required to
> >> set
> >> > the RecordContext
> >> > yourself when calling forward from a Punctuator? I think I agree with
> >> Paul
> >> > here, it seems kind of
> >> > absurd not to enforce that the RecordContext be present inside the
> >> > process() method.
> >> >
> >> > The original problem with Punctuators, as I understood it, was that
> all
> >> of
> >> > the R

Re: [VOTE] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2020-09-29 Thread Sophie Blee-Goldman
There are two cases where you need to specify the window size -- directly
using a
Consumer (eg the console consumer) or reading as an input topic within
Streams.
We need a config for the first case, since you can't pass a Deserializer
object to the
console consumer. In the Streams case, the reverse is true, and you have to
pass in
an actual Serde object.

Imo we should keep these two cases separate and not use the config for the
Streams
case at all. But that's hard to enforce (we'd have to strip the config out
of the user's
StreamsConfig if they tried to use it within Streams, for example) and it
also puts us
in an awkward position due to the  default.windowed.inner.serde.class
configs. If
they can specify the inner serde class through their Streams app config,
they
should be able to specify the window size through config as well. Otherwise
we
either force a mix-and-match as Matthias described, or you just always have
to
specify both the inner class and the window size in the constructor, at
which
point, why even have the default.windowed.inner.serde.class config at all?

...
that's not a rhetorical question, actually. Personally I do think we should
deprecate the default.windowed.serde.inner.class and replace it with
separate
windowed.serializer.inner.class/windowed.deserializer.inner.class configs.
This
way it's immediately obvious that the configs are only for the
Consumer/Producer,
and you should construct your own TimeWindowedSerde with all the necessary
parameters for use in your Streams app.

That might be too radical, and maybe the problem isn't worth the burden of
forcing users to change their code and replace the config with actual Serde
objects. But it should be an easy change to make, and if it isn't, that's
probably a good sign that you're using the serde incorrectly somewhere.

If we don't deprecate the default.windowed.serde.inner.class, then it's
less clear
to me how to proceed. The only really consistent thing to do seems to be to
name and position the new window size config as a default config and allow
it to be used similar to the default inner class configs. Which, as
established
throughout this discussion, seems very very wrong

So yes, I think we should just stick with the original name window.size.ms.
Or
better yet, call it windowed.deserializer.window.size.ms and name the
default.windowed.serde.inner.class replacements
windowed.deserializer.inner.class and windowed.serializer.inner.class

On Tue, Sep 8, 2020 at 2:07 PM Matthias J. Sax  wrote:

> From my understanding, the KIP aims for the case when a user does not
> control the code, eg, when using the command line consumer (or similar
> tools).
>
> If the user writes code, we should always encourage her to instantiate
> the deserializer explicitly and not relying on reflection+config.
>
> I also don't think that the `default` prefix does make sense, as it
> indicates that there might be a non-default. However, IMHO, we should
> not allow "overwrite semantics" but rather throw an exception if the
> config is set and a window size is provided via the constructor. We
> should not allow to mix-and-match both and should stick to a strict
> either-or pattern.
>
>
> -Matthias
>
> On 9/8/20 11:52 AM, Guozhang Wang wrote:
> > Hi Sophie,
> >
> > Seems I do have some mis-understanding of the KIP's motivation here :)
> Just
> > for clarification my reasoning is that:
> >
> > 1) today Streams itself never uses a windowed deserializer itself since
> its
> > built-in operators only need the serializer and users do not need to
> > override it, plus standby / restore active tasks would just copy the
> bytes
> > directly. So this KIP's motivation is not for Stream's own code anyways.
> >
> > 2) It is only when user specified serde is missing the window size, which
> > is either when a) one is trying to read a source topic as windowed
> records
> > in Streams, this is a big blocker for KIP-300, and when b) one is trying
> to
> > read a topic as windowed records in Consumer, we would have issues if
> users
> > fail to use the appropriate serde constructs.
> >
> > I thought the main motivation of this KIP is for 2.a), in which we would
> > just encourage the users to use the right constructor with the window
> size
> > by deprecating the other constructs. But I'm not sure how this would help
> > with 2.b) since the proposal is on adding to StreamsConfig. If it is the
> > case, then I agree that probably we can just not add an extra config but
> > just deprecating the constructs.
> >
> >
> > Guozhang
> >
> >
> >
> >
> >
> > On Tue, Sep 8, 2020 at 10:50 AM Sophie Blee-Goldman  >
> > wrote:
> >
> >> Hey Guozhang & Leah,
> >>
> >> I want to push back a bit on the assumption that we would fall back on
> this
> >> config
> >> in the case of an unspecified window size in a Streams serde. I don't
> think
> >> it should
> >> be a default at all, either in name or in effect. To borrow the
> rhetorical
> >> question that
> >> John raised earlier: what is the defa