Re: [VOTE] KIP-399: Extend ProductionExceptionHandler to cover serialization exceptions

2020-01-19 Thread Anna McDonald
Hi All,
With +1 binding votes from Guozhang, Matthias and Bill and non-binding
+1 votes from Mitch, and M. Manna, the vote passes

Thanks all,
anna

On Wed, Jan 15, 2020 at 9:54 PM Anna McDonald  wrote:
>
> Done. Thanks for pointing that out.
>
> anna
>
> On Wed, Jan 15, 2020 at 8:52 PM Guozhang Wang  wrote:
> >
> > Hi Anna,
> >
> > Just a minor comment on the wiki page itself:
> >
> > ```
> >
> > The new method, handleSerializationException, in ProductionExceptionHandler
> > will be invoked when
> >
> >1. ClassCastException is thrown while serializing record key / value. We
> >will continue to throw this exception and not invoke the new method.
> >
> > ```
> >
> > I think you meant to say that when ClassCastException is thrown, we would
> > NOT trigger the handler method. But at the beginning it mentioned "will be
> > invoked when.." which sounds a bit conflicting with itself. Could you
> > update the wiki page?
> >
> > Otherwise, I'm +1 on this.
> >
> >
> > Guozhang.
> >
> >
> > On Wed, Jan 15, 2020 at 1:42 PM Matthias J. Sax 
> > wrote:
> >
> > > Thanks for pushing this KIP over the finish line!
> > >
> > > +1 (binding)
> > >
> > >
> > > -Matthias
> > >
> > > On 1/15/20 12:57 PM, Bill Bejeck wrote:
> > > > Thanks for the KIP.
> > > >
> > > > +1 (binding)
> > > >
> > > > -Bill
> > > >
> > > > On Wed, Jan 15, 2020 at 3:45 PM M. Manna  wrote:
> > > >
> > > >> +1 (non-binding)
> > > >>
> > > >> Thanks for this KIP
> > > >>
> > > >> Regards,
> > > >>
> > > >> On Wed, 15 Jan 2020 at 20:35, Mitchell  wrote:
> > > >>
> > > >>> +1(non-binding)
> > > >>>
> > > >>> Very useful
> > > >>> -mitch
> > > >>>
> > > >>> On Wed, Jan 15, 2020, 3:29 PM Anna McDonald 
> > > >>> wrote:
> > > >>>
> > >  Greetings,
> > >  I would like to propose a vote on KIP-399, extending the
> > >  ProductionExceptionHandler to cover serialization exceptions. This 
> > >  KIP
> > >  is aimed at improving the error-handling semantics in Kafka Streams
> > >  when Kafka Streams fails to serialize a message to the downstream
> > >  sink.
> > > 
> > >  KIP details located here:
> > > 
> > > 
> > > >>>
> > > >>
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions
> > > 
> > >  Discussion Thread:
> > > 
> > > 
> > > >>>
> > > >>
> > > https://lists.apache.org/thread.html/rbbc887ca31d46f6e73ffc6e08df7e4bda69c89ff820986c30274e272%40%3Cdev.kafka.apache.org%3E
> > > 
> > >  Thanks,
> > >  anna
> > > 
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
> > --
> > -- Guozhang


Jenkins build is back to normal : kafka-trunk-jdk8 #4172

2020-01-19 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-trunk-jdk11 #1096

2020-01-19 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-9455) Consider using TreeMap for In-memory stores of Streams

2020-01-19 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-9455:


 Summary: Consider using TreeMap for In-memory stores of Streams
 Key: KAFKA-9455
 URL: https://issues.apache.org/jira/browse/KAFKA-9455
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


>From [~ableegoldman]: It's worth noting that it might be a good idea to switch 
>to TreeMap for different reasons. Right now the ConcurrentSkipListMap allows 
>us to safely perform range queries without copying over the entire keyset, but 
>the performance on point queries seems to scale noticeably worse with the 
>number of unique keys. Point queries are used by aggregations while range 
>queries are used by windowed joins, but of course both are available within 
>the PAPI and for interactive queries so it's hard to say which we should 
>prefer. Maybe rather than make that tradeoff we should have one version for 
>efficient range queries (a "JoinWindowStore") and one for efficient point 
>queries ("AggWindowStore") - or something. I know we've had similar thoughts 
>for a different RocksDB store layout for Joins (although I can't find that 
>ticket anywhere..), it seems like the in-memory stores could benefit from a 
>special "Join" version as well cc/ Guozhang Wang

Here are some random thoughts:

1. For kafka streams processing logic (i.e. without IQ), it's better to make 
all processing logic relying on point queries rather than range queries. Right 
now the only processor that use range queries are, as mentioned above, windowed 
stream-stream joins. I think we should consider using a different window 
implementation for this (and as a result also get rid of the retainDuplicate 
flags) to refactor the windowed stream-stream join operation.

2. With 1), range queries would only be exposed as IQ. Depending on its usage 
frequency I think it makes lots of sense to optimize for single-point queries.

Of course, even without step 1) we should still consider using tree-map for 
windowed in-memory stores to have a better scaling effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] KIP-563: Add 'tail -n' feature for ConsoleConsumer

2020-01-19 Thread Hu Xi
Hi All,

I'd like to start a discussion on KIP-563 which is to add "tail -n" capability 
for ConsumerConsole tool.

Please check 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-563%3A+Add+%27tail+-n%27+feature+for+ConsoleConsumer
 for more details. Any feedbacks or comments are welcomed.



Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-19 Thread Navinder Brar
I have made some edits in the KIP, please take another look. It would be great 
if we can push it in 2.5.0.
~Navinder


On Sunday, January 19, 2020, 12:59 AM, Navinder Brar 
 wrote:

Sure John, I will update the StoreQueryParams with static factory methods.
@Ted, we would need to create taskId only in case a user provides one single 
partition. In case user wants to query all partitions of an instance the 
current code is good enough where we iterate over all stream threads and go 
over all taskIds to match the store. But in case a user requests for a single 
partition-based store, we need to create a taskId out of that partition and 
store name(using internalTopologyBuilder class) and match with the taskIds 
belonging to that instance. I will add the code in the KIP. 

    On Sunday, 19 January, 2020, 12:47:08 am IST, Ted Yu  
wrote:  
 
 Looking at the current KIP-562:

bq. Create a taskId from the combination of store name and partition
provided by the user

I wonder if a single taskId would be used for the “all partitions” case.
If so, we need to choose a numerical value for the partition portion of the
taskId.

On Sat, Jan 18, 2020 at 10:27 AM John Roesler  wrote:

> Thanks, Ted!
>
> This makes sense, but it seems like we should lean towards explicit
> semantics in the public API. ‘-1’ meaning “all partitions” is reasonable,
> but not explicit. That’s why I suggested the Boolean for “all partitions”.
> I guess this also means getPartition() should either throw an exception or
> return null if the partition is unspecified.
>
> Thanks,
> John
>
> On Sat, Jan 18, 2020, at 08:43, Ted Yu wrote:
> > I wonder if the following two methods can be combined:
> >
> > Integer getPartition() // would be null if unset or if "all partitions"
> > boolean getAllLocalPartitions() // true/false if "all partitions"
> requested
> >
> > into:
> >
> > Integer getPartition() // would be null if unset or -1 if "all
> partitions"
> >
> > Cheers
> >
> > On Fri, Jan 17, 2020 at 9:56 PM John Roesler 
> wrote:
> >
> > > Thanks, Navinder!
> > >
> > > I took a look at the KIP.
> > >
> > > We tend to use static factory methods instead of public constructors,
> and
> > > also builders for optional parameters.
> > >
> > > Given that, I think it would be more typical to have a factory method:
> > > storeQueryParams()
> > >
> > > and also builders for setting the optional parameters, like:
> > > withPartitions(List partitions)
> > > withStaleStoresEnabled()
> > > withStaleStoresDisabled()
> > >
> > >
> > > I was also thinking this over today, and it really seems like there are
> > > two main cases for specifying partitions,
> > > 1. you know exactly what partition you want. In this case, you'll only
> > > pass in a single number.
> > > 2. you want to get a handle on all the stores for this instance (the
> > > current behavior). In this case, it's not clear how to use
> withPartitions
> > > to achieve the goal, unless you want to apply a-priori knowledge of the
> > > number of partitions in the store. We could consider an empty list, or
> a
> > > null, to indicate "all", but that seems a little complicated.
> > >
> > > Thus, maybe it would actually be better to eschew withPartitions for
> now
> > > and instead just offer:
> > > withPartition(int partition)
> > > withAllLocalPartitions()
> > >
> > > and the getters:
> > > Integer getPartition() // would be null if unset or if "all partitions"
> > > boolean getAllLocalPartitions() // true/false if "all partitions"
> requested
> > >
> > > Sorry, I know I'm stirring the pot, but what do you think about this?
> > >
> > > Oh, also, the KIP is missing the method signature for the new
> > > KafkaStreams#store overload.
> > >
> > > Thanks!
> > > -John
> > >
> > > On Fri, Jan 17, 2020, at 08:07, Navinder Brar wrote:
> > > > Hi all,
> > > > I have created a new
> > > > KIP:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
> > > > Please take a look if you get a chance.
> > > > ~Navinder
> > >
> >
>  




Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-19 Thread John Roesler
Thanks, Navinder,

The Param object looks a bit different than I would have done, but it certainly 
is explicit. We might have to deprecate those particular factory methods and 
move to a builder pattern if we need to add any more options in the future, but 
I’m fine with that possibility. 

The KIP also discusses some implementation details that aren’t necessary here. 
We really only need to see the public interfaces. We can discuss the 
implementation in the PR.

That said, the public API part of the current proposal looks good to me! I 
would be a +1 if you called for a vote. 

Thanks,
John

On Sun, Jan 19, 2020, at 20:50, Navinder Brar wrote:
> I have made some edits in the KIP, please take another look. It would 
> be great if we can push it in 2.5.0.
> ~Navinder
> 
> 
> On Sunday, January 19, 2020, 12:59 AM, Navinder Brar 
>  wrote:
> 
> Sure John, I will update the StoreQueryParams with static factory 
> methods.
> @Ted, we would need to create taskId only in case a user provides one 
> single partition. In case user wants to query all partitions of an 
> instance the current code is good enough where we iterate over all 
> stream threads and go over all taskIds to match the store. But in case 
> a user requests for a single partition-based store, we need to create a 
> taskId out of that partition and store name(using 
> internalTopologyBuilder class) and match with the taskIds belonging to 
> that instance. I will add the code in the KIP. 
> 
>     On Sunday, 19 January, 2020, 12:47:08 am IST, Ted Yu 
>  wrote:  
>  
>  Looking at the current KIP-562:
> 
> bq. Create a taskId from the combination of store name and partition
> provided by the user
> 
> I wonder if a single taskId would be used for the “all partitions” case.
> If so, we need to choose a numerical value for the partition portion of the
> taskId.
> 
> On Sat, Jan 18, 2020 at 10:27 AM John Roesler  wrote:
> 
> > Thanks, Ted!
> >
> > This makes sense, but it seems like we should lean towards explicit
> > semantics in the public API. ‘-1’ meaning “all partitions” is reasonable,
> > but not explicit. That’s why I suggested the Boolean for “all partitions”.
> > I guess this also means getPartition() should either throw an exception or
> > return null if the partition is unspecified.
> >
> > Thanks,
> > John
> >
> > On Sat, Jan 18, 2020, at 08:43, Ted Yu wrote:
> > > I wonder if the following two methods can be combined:
> > >
> > > Integer getPartition() // would be null if unset or if "all partitions"
> > > boolean getAllLocalPartitions() // true/false if "all partitions"
> > requested
> > >
> > > into:
> > >
> > > Integer getPartition() // would be null if unset or -1 if "all
> > partitions"
> > >
> > > Cheers
> > >
> > > On Fri, Jan 17, 2020 at 9:56 PM John Roesler 
> > wrote:
> > >
> > > > Thanks, Navinder!
> > > >
> > > > I took a look at the KIP.
> > > >
> > > > We tend to use static factory methods instead of public constructors,
> > and
> > > > also builders for optional parameters.
> > > >
> > > > Given that, I think it would be more typical to have a factory method:
> > > > storeQueryParams()
> > > >
> > > > and also builders for setting the optional parameters, like:
> > > > withPartitions(List partitions)
> > > > withStaleStoresEnabled()
> > > > withStaleStoresDisabled()
> > > >
> > > >
> > > > I was also thinking this over today, and it really seems like there are
> > > > two main cases for specifying partitions,
> > > > 1. you know exactly what partition you want. In this case, you'll only
> > > > pass in a single number.
> > > > 2. you want to get a handle on all the stores for this instance (the
> > > > current behavior). In this case, it's not clear how to use
> > withPartitions
> > > > to achieve the goal, unless you want to apply a-priori knowledge of the
> > > > number of partitions in the store. We could consider an empty list, or
> > a
> > > > null, to indicate "all", but that seems a little complicated.
> > > >
> > > > Thus, maybe it would actually be better to eschew withPartitions for
> > now
> > > > and instead just offer:
> > > > withPartition(int partition)
> > > > withAllLocalPartitions()
> > > >
> > > > and the getters:
> > > > Integer getPartition() // would be null if unset or if "all partitions"
> > > > boolean getAllLocalPartitions() // true/false if "all partitions"
> > requested
> > > >
> > > > Sorry, I know I'm stirring the pot, but what do you think about this?
> > > >
> > > > Oh, also, the KIP is missing the method signature for the new
> > > > KafkaStreams#store overload.
> > > >
> > > > Thanks!
> > > > -John
> > > >
> > > > On Fri, Jan 17, 2020, at 08:07, Navinder Brar wrote:
> > > > > Hi all,
> > > > > I have created a new
> > > > > KIP:
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
> > > > > Please take a look if you get a chance.
> > > > > ~Navinder
> > > >
> > >
> >  

Re: [DISCUSS] KIP-563: Add 'tail -n' feature for ConsoleConsumer

2020-01-19 Thread Dirk Wilden
Hi,

I'm new here, but I just read this and have implemented similar functionality 
for our cli tool (https://github.com/deviceinsight/kafkactl).

In order to get a tail on topic level we fetch the last n messages from all 
partitions but only keep the latest n messages according to message timestamp 
in a buffer and finally print those messages.

Perhaps that's also an option?
Regards,
Dirk


Am 20. Januar 2020 03:27:54 MEZ schrieb Hu Xi :
>Hi All,
>
>I'd like to start a discussion on KIP-563 which is to add "tail -n"
>capability for ConsumerConsole tool.
>
>Please check
>https://cwiki.apache.org/confluence/display/KAFKA/KIP-563%3A+Add+%27tail+-n%27+feature+for+ConsoleConsumer
>for more details. Any feedbacks or comments are welcomed.


[jira] [Resolved] (KAFKA-9351) Higher count in destination cluster using Kafka MM2

2020-01-19 Thread Ryanne Dolan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryanne Dolan resolved KAFKA-9351.
-
Resolution: Information Provided

> Higher count in destination cluster using Kafka MM2
> ---
>
> Key: KAFKA-9351
> URL: https://issues.apache.org/jira/browse/KAFKA-9351
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Nitish Goyal
>Priority: Minor
>
> I have setup replication between cluster across different data centres. After 
> setting up replication, at times, I am observing higher event count in 
> destination cluster
> Below are counts in source and destination cluster
>  
> *Source Cluster*
> ```
>  
> events_4:0:51048
> events_4:1:52250
> events_4:2:51526
> ```
>  
> *Destination Cluster*
> ```
> nm5.events_4:0:53289
> nm5.events_4:1:54569
> nm5.events_4:2:53733
> ```
>  
> This is a blocker for us to start using MM2 replicatior



--
This message was sent by Atlassian Jira
(v8.3.4#803005)