Checking Connection with Kafka Broker from Client-side

2018-08-15 Thread Jorge Esteban Quilcate Otoya
Hi everyone,

I'm evaluating how to validate connection to Kafka Brokers in an
application that uses Consumer API by making health check using
AdminClient.
Is there any consideration around Authorization that I should take into
consideration/any best practice? (I'm considering calling `describeCluster`
as health check)

Related issue: https://github.com/openzipkin/zipkin/issues/2098

Thanks in advance for your feedback.

Jorge.


Re: [VOTE] KIP-353: Allow Users to Configure Multi-Streams Timestamp Synchronization Behavior

2018-08-15 Thread Damian Guy
+1

On Tue, 14 Aug 2018 at 19:35 Ted Yu  wrote:

> +1
>  Original message From: Bill Bejeck 
> Date: 8/14/18  11:09 AM  (GMT-08:00) To: dev@kafka.apache.org Subject:
> Re: [VOTE] KIP-353: Allow Users to Configure Multi-Streams Timestamp
> Synchronization Behavior
> +1
>
> Thanks,
> Bill
>
> On Thu, Aug 9, 2018 at 4:20 PM John Roesler  wrote:
>
> > +1 non-binding
> >
> > On Thu, Aug 9, 2018 at 3:14 PM Matthias J. Sax 
> > wrote:
> >
> > > +1 (binding)
> > >
> > > On 8/9/18 11:57 AM, Guozhang Wang wrote:
> > > > Hello all,
> > > >
> > > > I would like to start the voting processing on the following KIP, to
> > > allow
> > > > users control when a task can be processed based on its buffered
> > records,
> > > > and how the stream time of a task be advanced.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 353%3A+Improve+Kafka+Streams+Timestamp+Synchronization
> > > >
> > > >
> > > >
> > > > Thanks,
> > > > -- Guozhang
> > > >
> > >
> > >
> >
>


Re: [VOTE] KIP-356: Add withCachingDisabled() to StoreBuilder

2018-08-15 Thread Damian Guy
+1

On Tue, 14 Aug 2018 at 22:58 Matthias J. Sax  wrote:

> +1 (binding)
>
> On 8/14/18 11:16 AM, Eno Thereska wrote:
> > +1 (non binding)
> >
> > Thanks
> > Eno
> >
> > On Tue, Aug 14, 2018 at 10:53 AM, Bill Bejeck  wrote:
> >
> >> Thanks for the KIP.
> >>
> >> +1
> >>
> >> -Bill
> >>
> >> On Tue, Aug 14, 2018 at 1:42 PM Guozhang Wang 
> wrote:
> >>
> >>> Hello folks,
> >>>
> >>> I'd like to start a voting thread on the following KIP:
> >>>
> >>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-356%3A+Add+
> >> withCachingDisabled%28%29+to+StoreBuilder
> >>>
> >>> It is a pretty straightforward one, adding a missing API to
> StoreBuilder
> >>> which should actually be added at the very beginning but somehow was
> >> lost.
> >>> Hence I skipped the DISCUSS process of it. But if you have any
> feedbacks
> >>> please feel free to share as well.
> >>>
> >>>
> >>>
> >>> -- Guozhang
> >>>
> >>
> >
>
>


[jira] [Reopened] (KAFKA-7119) Intermittent test failure with GSSAPI authentication failure

2018-08-15 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram reopened KAFKA-7119:
---

There was a test failure with the 1.0 build. It looks like we need to handle 
retriable Kerberos exceptions on the server-side as well. I could recreate this 
failure only with Java 7, but to be safe, will fix trunk as well.

Exception from https://builds.apache.org/job/kafka–jdk7/232/:
{quote}
java.util.concurrent.ExecutionException: java.lang.AssertionError: 
expected: but was: at 
java.util.concurrent.FutureTask.report(FutureTask.java:122) at 
java.util.concurrent.FutureTask.get(FutureTask.java:202) at 
kafka.server.GssapiAuthenticationTest$$anonfun$testRequestIsAReplay$1.apply(GssapiAuthenticationTest.scala:80)
 at 
kafka.server.GssapiAuthenticationTest$$anonfun$testRequestIsAReplay$1.apply(GssapiAuthenticationTest.scala:80)
{quote}


> Intermittent test failure with GSSAPI authentication failure
> 
>
> Key: KAFKA-7119
> URL: https://issues.apache.org/jira/browse/KAFKA-7119
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.0.3, 1.1.2, 2.0.1, 2.1.0
>
>
> I have seen this failure a couple of times in builds (e.g. 
> [https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/2412/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testLogStartOffsetCheckpoint/)]
> {quote}
> org.apache.kafka.common.errors.SaslAuthenticationException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Request is a replay (34) - Request is a replay)]) occurred 
> when evaluating SASL token received from the Kafka Broker. Kafka Client will 
> go to AUTHENTICATION_FAILED state. Caused by: 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Request is a 
> replay (34) - Request is a replay)] at 
> jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:358)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:356)
>  at java.base/java.security.AccessController.doPrivileged(Native Method) at 
> java.base/javax.security.auth.Subject.doAs(Subject.java:423) at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:356)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:268)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:205)
>  at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) 
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487) 
> at org.apache.kafka.common.network.Selector.poll(Selector.java:425) at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) 
> at 
> kafka.api.AdminClientIntegrationTest.$anonfun$subscribeAndWaitForAssignment$2(AdminClientIntegrationTest.scala:980)
>  at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:781) at 
> kafka.api.AdminClientIntegrationTest.subscribeAndWaitForAssignment(AdminClientIntegrationTest.scala:979)
>  at 
> kafka.api.AdminClientIntegrationTest.testLogStartOffsetCheckpoint(AdminClientIntegrationTest.scala:755)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.inter

[jira] [Resolved] (KAFKA-7288) Transient failure in SslSelectorTest.testCloseConnectionInClosingState

2018-08-15 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7288.
---
   Resolution: Fixed
 Reviewer: Jun Rao
Fix Version/s: 2.1.0

> Transient failure in SslSelectorTest.testCloseConnectionInClosingState
> --
>
> Key: KAFKA-7288
> URL: https://issues.apache.org/jira/browse/KAFKA-7288
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.1.0
>
>
> Noticed this failure in SslSelectorTest.testCloseConnectionInClosingState a 
> few times in unit tests in Jenkins:
> {quote}
> java.lang.AssertionError: Channel not expired expected null, but 
> was: at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.failNotNull(Assert.java:755) at 
> org.junit.Assert.assertNull(Assert.java:737) at 
> org.apache.kafka.common.network.SelectorTest.testCloseConnectionInClosingState(SelectorTest.java:341)
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : kafka-trunk-jdk10 #406

2018-08-15 Thread Apache Jenkins Server
See 




Re: [DISCUSS]: KIP-339: Create a new ModifyConfigs API

2018-08-15 Thread Viktor Somogyi
Hi,

To weigh-in, I agree with Colin on the API naming, overloads shouldn't
change behavior. I think all of the Java APIs I've used so far followed
this principle and I think we shouldn't diverge.

Also I think I have an entry about this incremental thing in KIP-248. It
died off a bit at voting (I guess 2.0 came quick) but I was about to revive
and restructure it a bit. If you remember it would have done something
similar. Back then we discussed an "incremental_update" flag would have
been sufficient to keep backward compatibility with the protocol. Since
here you designed a new protocol I think I'll remove this bit from my KIP
and also align the other parts/namings to yours so we'll have a more
unified interface on this front.

At last, one minor comment: is throttling a part of your protocol similarly
to alterConfigs?

Viktor


On Fri, Jul 20, 2018 at 8:05 PM Colin McCabe  wrote:

> I updated the KIP.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API
>
> Updates:
> * Use "incrementalAlterConfigs" rather than "modifyConfigs," for
> consistency with the other "alter" APIs.
> * Implement Magnus' idea of supporting "append" and "subtract" on
> configuration keys that contain lists.
>
> best,
> Colin
>
>
> On Mon, Jul 16, 2018, at 14:12, Colin McCabe wrote:
> > Hi Magnus,
> >
> > Thanks for taking a look.
> >
> > On Mon, Jul 16, 2018, at 11:43, Magnus Edenhill wrote:
> > > Thanks for driving this KIP, Colin.
> > >
> > > I agree with Dong that a new similar modifyConfigs API (and protocol
> API)
> > > is confusing and that
> > > we should try to extend the current alterConfigs interface to support
> the
> > > incremental mode instead,
> > > deprecating the non-incremental mode in the process.
> >
> > In the longer term, I think that the non-incremental mode should
> > definitely go away, and not be an option at all.  That's why I don't
> > think of this KIP as "adding more options  to AlterConfigs" but as
> > getting rid of a broken API.  I've described a lot of reasons why non-
> > incremental mode is broken.  I've also described why the brokenness is
> > subtle and an easy trap for newbies to fall into.  Hopefully everyone
> > agrees that getting rid of non-incremental mode completely should be the
> > eventual goal.
> >
> > I do not think that having a different name for modifyConfigs is
> > confusing.  "Deleting all the configs and then setting some designated
> > ones" is a very different operation from "modifying some
> > configurations".  Giving the two operations different names expresses
> > the fact  that they really are very different.  Would it be less
> > confusing if the new function were called alterConfigsIncremental rather
> > than modifyConfigs?
> >
> > I think it's important to have a new name for the new function.  If the
> > names are the same, how can we even explain to users which API they
> > should or should not use?  "Use the three argument overload, or the two
> > argument overload where the Options class is not the final parameter"
> > That is not user-friendly.
> >
> > You could say that some of the overloads would be deprecated.  However,
> > my experience as a Hadoop developer is that most users simply don't care
> > about deprecation warnings.  They will use autocomplete in their IDEs
> > and use whatever function seems to have the parameters they need.
> > Hadoop and Kafka themselves use plenty of deprecated APIs.  But somehow
> > we expect that our users have much more respect for @deprecated than we
> > ourselves do.
> >
> > I would further argue that function overloads in Java are intended to
> > provide additional parameters, not to fundamentally change the semantics
> > of a function.  If you have two functions int addTwoNumbers(int a, int
> > b) and int addTwoNumbers(int a, int b, boolean verbose), they should
> > both add together two numbers.  And a user should be able to expect that
> > the plain old addTwoNumbers is equivalent to either
> > addTwoNumbers(verbose=true) or addTwoNumbers(verbose=false), not a
> > totally different operation.
> >
> > Every time programmers violate this contract, it inevitably leads to
> > misunderstanding.  One example is how in HDFS there are multiple
> > function overloads for renaming a file.  Depending on which one you
> > call, you will get either RENAME or RENAME2, which have different
> > semantics.  I think RENAME2 has a different set of return codes
> > surrounding "destination exists" conditions, among other things.  Of
> > course users have no idea of whether they're getting RENAME or RENAME2
> > unless they're developers.  It's not obvious from the function call,
> > which is named "rename" in both cases, just with different function
> > parameters.  So the whole thing is just a huge source of confusion and
> > user pain.
> >
> > Another thing to consider is that since  function overloads are also not
> > an option in C or Go, we need a different solution for those language

Jenkins build is back to normal : kafka-trunk-jdk8 #2898

2018-08-15 Thread Apache Jenkins Server
See 




Anyone interested in helping out a little brother Apache project with their Kafka integration?

2018-08-15 Thread Christofer Dutz
Hi all,

I am one of the Apache PLC4X (incubating) committers and am looking for people 
willing to help out with a little thing.

PLC4X is aiming at industrial programmable logic controllers. So what we are 
doing is similar what JDBC did in the late 90s.

We’ve implemented a universal API with which we can write software to access 
industrial PLCs in a unified way.
In contrast to OPC-UA we do not require any changes on the devices as we 
implement the drivers for the individual protocols as main part of the PLC4X 
project.

We hope that this will help us use all our cool Apache software for building 
the next generation of Industry 4.0 solutions,
as currently it’s a real problem to communicate with these little gray boxes 
directly.

We not only work hard on the API and the matching drivers, but also on 
integration modules to easily use PLC4X in other frameworks.
We already have full Apache Camel and Apache Edgent support.

Even if we do have examples for reading and writing data from and to Kafka, we 
would really like to provide a Kafka-Connect adapter.
This would make it super easy to communicate with industrial controllers from 
inside the Kafka ecosystem.

I have already implemented a stub of a Kafka Connect plugin with Source and 
Sink partially implemented.
So this stub has all the code for reading and writing via PLC4X but I am not 
quite that deep into Kafka that I think I should finish this
implementation without professional support of the people actually knowing what 
they are doing on the Kafka side.

So is anyone here able and willing to help with this Kafka Connect Plugin for 
PLC4X?

Looking forward to positive responses ;-)

Chris


Permission to create KIP

2018-08-15 Thread Attila Sasvári
Hi there,

Can you please grant me permission to create a KIP?

Wiki ID: asasvari

Thanks,
Attila


Re: [VOTE] KIP-344: The auto-generated client id should be passed to MetricsReporter

2018-08-15 Thread Kevin Lu
Hi All,

Forgot to post this earlier, but I cancelled this KIP as we determined this
is a bug fix.

Could a committer review/merge the PR (
https://github.com/apache/kafka/pull/5383)?

Colin has already finished and approved a review.

Thanks!

Regards,
Kevin

On Wed, Jul 25, 2018 at 10:12 AM Ted Yu  wrote:

> +1
>
> On Wed, Jul 25, 2018 at 9:49 AM Kevin Lu  wrote:
>
> > Hi All,
> >
> > I am calling a vote for KIP-344
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-344%3A+The+auto-generated+client+id+should+be+passed+to+MetricsReporter
> > .
> >
> > If any committer feels that this minor change does not need to go through
> > the KIP process, then feel free to leave a code review at
> > https://github.com/apache/kafka/pull/5383 and I will cancel the KIP.
> >
> > Regards,
> > Kevin
> >
>


Re: [DISCUSS] KIP-346 - Limit blast radius of log compaction failure

2018-08-15 Thread Stanislav Kozlovski
Hi Jason,

I was thinking about your suggestion. I agree that it makes sense to cap it
at a certain threshold and it doesn't sound *too* restrictive to me either,
considering the common case.

The issue with the __consumer_offsets topic is problematic, that is true.
Nevertheless, I have some concerns with having a certain threshold of
`uncleanable.bytes`. There is now a chance that a single error in a big
partition (other than __consumer_offsets) marks the directory as offline
outright. To avoid this, we would need to have it be set to *at least* half
of the biggest compacted partition's size - this is since the default of
`log.cleaner.min.cleanable.ratio` is 0.5. Even then, that single partition
will quickly go over the threshold since it is not cleaned at all.

Ideally, we'd want to validate that more partitions are failing before
marking the disk as offline to best ensure it is an actual disk problem.
Having a threshold makes this tricky. Placing a reasonable default value
seems very hard too, as it would either be too small (mark too fast) or too
big (never mark offline) for some users, which would cause issues in the
former case. Perhaps the best approach would be to have the functionality
be disabled by default.

I am now left with the conclusion that it's best to have that functionality
be disabled by default. Since configs are relatively easy to add but hard
to take away, I believe it might be best to drop off that functionality in
this KIP. We could consider adding it later if the community believes it is
needed.
I consider that a reasonable approach, since the main perceived benefit of
this KIP is the isolation of partition failures and to some extent the new
metrics.

What are other people's thoughts on this? I have updated the KIP
accordingly.

Best,
Stanislav

On Wed, Aug 15, 2018 at 12:27 AM Jason Gustafson  wrote:

> Sorry for the noise. Let me try again:
>
> My initial suggestion was to *track *the uncleanable disk space.
> > I can see why marking a log directory as offline after a certain
> threshold
> > of uncleanable disk space is more useful.
> > I'm not sure if we can set that threshold to be of certain size (e.g
> 100GB)
> > as log directories might have different sizes.  Maybe a percentage would
> be
> > better then (e.g 30% of whole log dir size), WDYT?
>
>
> The two most common problems I am aware of when the log cleaner crashes are
> 1) running out of disk space and 2) excessive coordinator loading time. The
> problem in the latter case is that when the log cleaner is not running, the
> __consumer_offsets topics can become huge. If there is a failure which
> causes a coordinator change, then it can take a long time for the new
> coordinator to load the offset cache since it reads from the beginning.
> Consumers are effectively dead in the water when this happens since they
> cannot commit offsets. We've seen coordinator loading times in the hours
> for some users. If we could set a total cap on the uncleanable size, then
> we can reduce the impact from unbounded __consumer_offsets growth.
>
> Also it's true that log directories may have different sizes, but I'm not
> sure that is a common case. I don't think it would be too restrictive to
> use a single max size for all directories. I think the key is just having
> some way to cap the size of the uncleaned data.
>
> I feel it still makes sense to have a metric tracking how many uncleanable
> > partitions there are and the total amount of uncleanable disk space (per
> > log dir, via a JMX tag).
> > But now, rather than fail the log directory after a certain count of
> > uncleanable partitions, we could fail it after a certain percentage (or
> > size) of its storage is uncleanable.
>
>
> Yes, having the metric for uncleanable partitions could be useful. I was
> mostly concerned about the corresponding config since it didn't seem to
> address the main problems with the cleaner dying.
>
> Thanks,
> Jason
>
> On Tue, Aug 14, 2018 at 4:11 PM, Jason Gustafson 
> wrote:
>
> > Hey Stanislav, responses below:
> >
> > My initial suggestion was to *track *the uncleanable disk space.
> >> I can see why marking a log directory as offline after a certain
> threshold
> >> of uncleanable disk space is more useful.
> >> I'm not sure if we can set that threshold to be of certain size (e.g
> >> 100GB)
> >> as log directories might have different sizes.  Maybe a percentage would
> >> be
> >> better then (e.g 30% of whole log dir size), WDYT?
> >
> >
> >
> >
> >
> > On Fri, Aug 10, 2018 at 2:05 AM, Stanislav Kozlovski <
> > stanis...@confluent.io> wrote:
> >
> >> Hey Jason,
> >>
> >> My initial suggestion was to *track *the uncleanable disk space.
> >> I can see why marking a log directory as offline after a certain
> threshold
> >> of uncleanable disk space is more useful.
> >> I'm not sure if we can set that threshold to be of certain size (e.g
> >> 100GB)
> >> as log directories might have different sizes.  Maybe a percentage would
> >> be
> >> bett

Re: [DISCUSS] KIP-325: Extend Consumer Group Command to Show Beginning Offsets

2018-08-15 Thread Vahid S Hashemian
In the absence of additional feedback to upvote one option against the 
other, I decided to roll the KIP back to an earlier version without the 
"partition size" support.
I'll start another KIP to add the partition size info to kafka-topic 
command output, where I think would be a better fit.

If there is no additional comment/feedback I'll start a vote soon.

Thanks.
--Vahid




From:   "Vahid S Hashemian" 
To: dev@kafka.apache.org
Date:   08/07/2018 01:58 PM
Subject:Re: [DISCUSS] KIP-325: Extend Consumer Group Command to 
Show Beginning Offsets



Any additional feedback on whether we should also include a partition size 

column or not?

Options:

1. The current KIP (with a partition size column): 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-325%3A+Extend+Consumer+Group+Command+to+Show+Beginning+Offsets+and+Partition+Size

** Requires additional processing to extract topic configs
** Will not return a precise size for compacted topics
** Simplifies checking the consumption progress on each partition

2. The earlier version (without a partition size column): 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87296412

** Less processing overhead compared to Option 1
** Partition size can be provided via kafka-topics tool

For further info, please see the recent discussions on the thread.

Thanks!
--Vahid




From:   "Vahid S Hashemian" 
To: dev@kafka.apache.org
Date:   07/03/2018 09:11 AM
Subject:Re: [DISCUSS] KIP-325: Extend Consumer Group Command to 
Show Beginning Offsets



Hi Jason,

Thanks for the feedback. Your suggestions make sense to me. I think I'm 
more in favor of adding this info to kafka-topic tool (through another 
KIP) since it is not consumer group specific.
I'll wait for Gwen and others to comment before making changes to the KIP.

--Vahid




From:   Jason Gustafson 
To: dev 
Date:   06/28/2018 02:39 PM
Subject:Re: [DISCUSS] KIP-325: Extend Consumer Group Command to 
Show Beginning Offsets



Hey Gwen/Vahid,

I think that use case makes sense, but isn't it a little odd to go through
the consumer group tool? I would expect to find something like that from
the kafka-topics tool instead. I don't feel too strongly about it, but I
hate to complicate the tool by adding the need to query topic configs. If
we don't have a meaningful number to report for compacted topics anyway,
then it feels like only a half solution. I'd probably suggest leaving this
out or just reporting the absolute difference even if a topic is 
compacted.

-Jason



On Thu, Jun 28, 2018 at 1:05 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi James,
>
>
>
> Thanks for the feedback. I updated the KIP and added some of the 
benefits
>
> of this improvement (including some that you mentioned).
>
>
>
> Regards.
>
> --Vahid
>
>
>
>
>
>
>
> From:   James Cheng 
>
> To: dev@kafka.apache.org
>
> Date:   06/27/2018 09:13 PM
>
> Subject:Re: [DISCUSS] KIP-325: Extend Consumer Group Command to
>
> Show Beginning Offsets
>
>
>
>
>
>
>
> The “Motivation” section of the KIP says that the starting offset will 
be
>
> useful but doesn’t say why. Can you add a use-case or two to describe 
how
>
> it will be useful?
>
>
>
> In our case (see
>
> 
https://github.com/wushujames/kafka-utilities/blob/master/Co



> nsumerGroupLag/README.md
>
> ), we found the starting offset useful so that we could calculate
>
> partition size so that we could identify empty partitions (partitions
>
> where all the data had expired). In particular, we needed that info so
>
> that we could calculate “lag”. Consider that case where we are asked to
>
> mirror an abandoned topic where startOffset==endOffset==100. We 
would
>
> have no committed offset on it, and the topic has no data in it, so we
>
> won’t soon get any committed offset, and so “lag” is kind of undefined. 
We
>
> used the additional startOffset to allow us to detect this case.
>
>
>
> -James
>
>
>
> Sent from my iPhone
>
>
>
> > On Jun 26, 2018, at 11:23 AM, Vahid S Hashemian
>
>  wrote:
>
> >
>
> > Hi everyone,
>
> >
>
> > I have created a trivial KIP to improve the offset reporting of the
>
> > consumer group command:
>
> >
>
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-325%



> 3A+Extend+Consumer+Group+Command+to+Show+Beginning+Offsets
>
>
>
> > Looking forward to your feedback!
>
> >
>
> > Thanks.
>
> > --Vahid
>
> >
>
> >
>
>
>
>
>
>
>
>
>
>














Re: Permission to create KIP

2018-08-15 Thread Matthias J. Sax
Done.

On 8/15/18 8:55 AM, Attila Sasvári wrote:
> Hi there,
> 
> Can you please grant me permission to create a KIP?
> 
> Wiki ID: asasvari
> 
> Thanks,
> Attila
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Resolved] (KAFKA-6974) Changes the interaction between request handler threads and fetcher threads into an ASYNC model

2018-08-15 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang resolved KAFKA-6974.
---
Resolution: Won't Fix

> Changes the interaction between request handler threads and fetcher threads 
> into an ASYNC model
> ---
>
> Key: KAFKA-6974
> URL: https://issues.apache.org/jira/browse/KAFKA-6974
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> At LinkedIn, occasionally our clients complain about receiving consant 
> NotLeaderForPartition exceptions 
> Investigations:
> For one investigated case, the cluster was going through a rolling bounce. 
> And we saw there was a ~8 minutes delay between an old partition leader 
> resigning and the new leader becoming active, based on entries of "Broker xxx 
> handling LeaderAndIsr request" in the state change log.
> Our monitoring shows the LeaderAndISR request local time during the incident 
> went up to ~4 minutes.
> Explanations:
> One possible explanation of the ~8 minutes of delay is:
> During controlled shutdown of a broker, the partitions whose leaders lie on 
> the shutting down broker need to go through leadership transitions. And the 
> controller process partitions in batches with each batch having 
> config.controlledShutdownPartitionBatchSize partitions, e.g. 100.
> If the 1st LeaderAndISR sent to a new leader broker takes too long, e.g. 4 
> minutes, then the subsequent LeaderAndISR requests can have an accumulated 
> delay of maybe 4 minutes, 8 minutes, or even 12 minutes... The reason is that 
> subsequent LeaderAndISR requests are blocked in a muted channel, given only 
> one LeaderAndISR request can be processed at a time with a 
> maxInFlightRequestsPerConnection setting of 1. When that happens, no existing 
> metric would show the total delay of 8 or 12 minutes for muted requests.
> Now the question is why it took ~4 minutes for the the 1st LeaderAndISR 
> request to finish.
> Explanation for the ~4 minutes of local time for LeaderAndISR request:
> During processing of an LeaderAndISR request, the request handler thread 
> needs to add partitions to or remove partitions from partitionStates field of 
> the ReplicaFetcherThread, also shutdown idle fetcher threads by checking the 
> size of the partitionStates field. On the other hand, background fetcher 
> threads need to iterate through all the partitions in partitionStates in 
> order to build fetch request, and process fetch responses. The 
> synchronization between request handler thread and the fetcher threads is 
> done through a partitionMapLock. 
> Specifically, the fetcher threads may acquire the partitionMapLock, and then 
> calls the following functions for processing the fetch response
> (1) processPartitionData, which in turn calls 
> (2) Replica.maybeIncrementLogStartOffset, which calls 
> (3) Log.maybeIncrementLogStartOffset, which calls 
> (4) LeaderEpochCache.clearAndFlushEarliest.
> Now two factors contribute to the long holding of the partitionMapLock,
> 1. function (4) above entails calling sync() to make sure data gets 
> persistent to the disk, which may potentially have a long latency
> 2. All the 4 functions above can potentially be called for each partition in 
> the fetch response, multiplying the sync() latency by a factor of n.
> The end result is that the request handler thread got blocked for a long time 
> trying to acquire the partitionMapLock of some fetcher inside 
> AbstractFetcherManager.shutdownIdleFetcherThreads since checking each 
> fetcher's partitionCount requires getting the partitionMapLock.
> In our testing environment, we reproduced the problem and confirmed the 
> explanation above with a request handler thread getting blocked for 10 
> seconds trying to acquire the partitionMapLock of one particular fetcher 
> thread, while there are many log entries showing "Incrementing log start 
> offset of partition..."
> Proposed change:
> We propose to change the interaction between the request handler threads and 
> the fetcher threads to an ASYNC model by using an event queue. All requests 
> to add or remove partitions, or shutdown idle fetcher threads are modeled as 
> items in the event queue. And only the fetcher threads can take items out of 
> the event queue and actually process them.
> In the new ASYNC model, in order to be able to process an infinite sequence 
> of FetchRequests, a fetcher thread initially has one FetchRequest, and after 
> it's done processing one FetchRequest, it enqueues one more into its own 
> event queue.
> Also since the current AbstractFetcherThread logic is inherited by both the 
> replica-fetcher-threads and the consumer-fetcher-threads for the old 
> consumer, and the latter has been deprecated, 

Re: [VOTE] KIP-353: Allow Users to Configure Multi-Streams Timestamp Synchronization Behavior

2018-08-15 Thread Guozhang Wang
+1 from myself (binding).


I'm closing this thread with 3 binding votes (Damian, Matthias, myself) and
3 non-binding votes (Bill, John, Ted).

Thanks!

Guozhang

On Wed, Aug 15, 2018 at 1:18 AM, Damian Guy  wrote:

> +1
>
> On Tue, 14 Aug 2018 at 19:35 Ted Yu  wrote:
>
> > +1
> >  Original message From: Bill Bejeck 
> > Date: 8/14/18  11:09 AM  (GMT-08:00) To: dev@kafka.apache.org Subject:
> > Re: [VOTE] KIP-353: Allow Users to Configure Multi-Streams Timestamp
> > Synchronization Behavior
> > +1
> >
> > Thanks,
> > Bill
> >
> > On Thu, Aug 9, 2018 at 4:20 PM John Roesler  wrote:
> >
> > > +1 non-binding
> > >
> > > On Thu, Aug 9, 2018 at 3:14 PM Matthias J. Sax 
> > > wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > On 8/9/18 11:57 AM, Guozhang Wang wrote:
> > > > > Hello all,
> > > > >
> > > > > I would like to start the voting processing on the following KIP,
> to
> > > > allow
> > > > > users control when a task can be processed based on its buffered
> > > records,
> > > > > and how the stream time of a task be advanced.
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 353%3A+Improve+Kafka+Streams+Timestamp+Synchronization
> > > > >
> > > > >
> > > > >
> > > > > Thanks,
> > > > > -- Guozhang
> > > > >
> > > >
> > > >
> > >
> >
>



-- 
-- Guozhang


[jira] [Created] (KAFKA-7296) Producer should handler COORDINATOR_LOADING error in TxnOffsetCommit

2018-08-15 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-7296:
--

 Summary: Producer should handler COORDINATOR_LOADING error in 
TxnOffsetCommit
 Key: KAFKA-7296
 URL: https://issues.apache.org/jira/browse/KAFKA-7296
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.0
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 2.0.1


The producer should check for the COORDINATOR_LOADING error when handling the 
TxnOffsetCommit response. We fixed the logic in GroupCoordinator to correctly 
check for offset loading in https://github.com/apache/kafka/pull/4788. Prior to 
this fix, it was not possible to see this error, so we were able to get away 
with not having the check.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2018-08-15 Thread Guozhang Wang
I think Headers are not meant for user-space only: in fact, in KIP-258
there is also a proposal for compatibility of changelog topics relying on
Headers. But you have a good point how to avoid conflicting with user
header key space.

I think there is no absolute-safe ways to avoid conflicts, but we can still
consider using some name patterns to reduce the likelihood as much as
possible.. e.g. consider sth. like the internal topics naming: e.g.
"__internal_[name]"?

I'm cc'ing contributor of KIP-258 to share thoughts as well.


Guozhang

On Tue, Aug 14, 2018 at 7:44 PM, Adam Bellemare 
wrote:

> @Jan
> Thanks Jan. I take it you mean "key-widening" somehow includes information
> about which record is processed first? I understand about a CombinedKey
> with both the Foreign and Primary key, but I don't see how you track
> ordering metadata in there unless you actually included a metadata field in
> the key type as well.
>
> @Guozhang
> As Jan mentioned earlier, is Record Headers mean to strictly be used in
> just the user-space? It seems that it is possible that a collision on the
> (key,value) tuple I wish to add to it could occur. For instance, if I
> wanted to add a ("foreignKeyOffset",10) to the Headers but the user already
> specified their own header with the same key name, then it appears there
> would be a collision. (This is one of the issues I brought up in the KIP).
>
> 
>
> I will be posting a prototype PR against trunk within the next day or two.
> One thing I need to point out is that my design very strictly wraps the
> entire foreignKeyJoin process entirely within the DSL function. There is no
> exposure of CombinedKeys or widened keys, nothing to resolve with regards
> to out-of-order processing and no need for the DSL user to even know what's
> going on inside of the function. The code simply returns the results of the
> join, keyed by the original key. Currently my API mirrors identically the
> format of the data returned by the regular join function, and I believe
> that this is very useful to many users of the DSL. It is my understanding
> that one of the main design goals of the DSL is to provide higher level
> functionality without requiring the users to know exactly what's going on
> under the hood. With this in mind, I thought it best to solve ordering and
> partitioning problems within the function and eliminate the requirement for
> users to do additional work after the fact to resolve the results of their
> join. Basically, I am assuming that most users of the DSL just "want it to
> work" and want it to be easy. I did this operating under the assumption
> that if a user truly wants to optimize their own workflow down to the
> finest details then they will break from strictly using the DSL and move
> down to the processors API.
>
> I updated the KIP today with some points worth talking about, should anyone
> be so inclined to check it out. Currently we are running this code in
> production to handle relational joins from our Kafka Connect topics, as per
> the original motivation of the KIP.
>
>
>
>
>
>
>
>
>
>
> I believe the foreignKeyJoin should be responsible for. In my
>
>
>
> On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang  wrote:
>
> > Hello Adam,
> >
> > As for your question regarding GraphNodes, it is for extending Streams
> > optimization framework. You can find more details on
> > https://issues.apache.org/jira/browse/KAFKA-6761.
> >
> > The main idea is that instead of directly building up the "physical
> > topology" (represented as Topology in the public package, and internally
> > built as the ProcessorTopology class) while users are specifying the
> > transformation operators, we first keep it as a "logical topology"
> > (represented as GraphNode inside InternalStreamsBuilder). And then only
> > execute the optimization and the construction of the "physical" Topology
> > when StreamsBuilder.build() is called.
> >
> > Back to your question, I think it makes more sense to add a new type of
> > StreamsGraphNode (maybe you can consider inheriting from the
> > BaseJoinProcessorNode). Note that although in the Topology we will have
> > multiple connected ProcessorNodes to represent a (foreign-key) join, we
> > still want to keep it as a single StreamsGraphNode, or just a couple of
> > them in the logical representation so that in the future we can construct
> > the physical topology differently (e.g. having another way than the
> current
> > distributed hash-join).
> >
> > ---
> >
> > Back to your questions to KIP-213, I think Jan has summarized it
> > pretty-well. Note that back then we do not have headers support so we
> have
> > to do such "key-widening" approach to ensure ordering.
> >
> >
> > Guozhang
> >
> >
> >
> > On Mon, Aug 13, 2018 at 11:39 PM, Jan Filipiak  >
> > wrote:
> >
> > > Hi Adam,
> > >
> > > I love how you are on to this already! I resolve this by
> "key-widening" I
> > > t

Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-15 Thread xiongqi wu
Eno, Dong,

I have updated the KIP.  We decide not to address the issue that we might
have for both compaction and time retention enabled topics (see the
rejected alternative item 2).  This KIP will only ensure log can be
compacted after a specified time-interval.

As suggested by Dong,  we will also enforce "max.compaction.lag.ms" is not
less than "min.compaction.lag.ms".

https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy


On Tue, Aug 14, 2018 at 5:01 PM, xiongqi wu  wrote:

>
> Per discussion with Dong,  he made a very good point that if compaction
> and time based retention are both enabled on a topic, the compaction might
> prevent records from being deleted on time.  The reason is when compacting
> multiple segments into one single segment, the newly created segment will
> have same lastmodified timestamp as latest original segment. We lose the
> timestamp of all original segments except the last one. As a result,
> records might not be deleted as it should be through time based retention.
>
> With the current KIP proposal,   if we want to ensure timely deletion,  we
> have the following configurations:
> 1) enable time based log compaction only :  deletion is done though
> overriding the same key
> 2) enable time based log retention only: deletion is done though
> time-based retention
> 3) enable both log compaction and time based retention:Deletion is not
> guaranteed.
>
> Not sure if we have use case 3 and also want deletion to happen on time.
> There are several options to address deletion issue when enable both
> compaction and retention:
> A) During log compaction, looking into record timestamp to delete expired
> records. This can be done in compaction logic itself or use
> AdminClient.deleteRecords() . But this assumes we have record timestamp.
> B) retain the lastModifed time of original segments during log compaction.
> This requires extra meta data to record the information or not grouping
> multiple segments into one during compaction.
>
> If we have use case 3 in general,  I would prefer solution A and rely on
> record timestamp.
>
>
> Two questions:
> Do we have use case 3? Is it nice to have or must have?
> If we have use case 3 and want to go with solution A,  should we introduce
> a new configuration to enforce deletion by timestamp?
>
>
> On Tue, Aug 14, 2018 at 1:52 PM, xiongqi wu  wrote:
>
>> Dong,
>>
>> Thanks for the comment.
>>
>> There are two retention policy: log compaction and time based retention.
>>
>> Log compaction:
>>
>> we have use cases to keep infinite retention of a topic (only
>> compaction).  GDPR cares about deletion of PII  (personal identifiable
>> information) data.
>> Since Kafka doesn't know what records contain PII, it relies on upper
>> layer to delete those records.
>> For those infinite retention uses uses,  kafka needs to provide a way to
>> enforce compaction on time. This is what we try to address in this KIP.
>>
>> Time based retention,
>>
>> There are also use cases that users of Kafka might want to expire all
>> their data.
>> In those cases, they can use time based retention of their topics.
>>
>>
>> Regarding your first question,  if a user wants to delete a key in the
>> log compaction topic,  the user has to send a deletion using the same key.
>> Kafka only makes sure the deletion will happen under a certain time
>> periods (like 2 days/7 days).
>>
>> Regarding your second question.  In most cases, we might want to delete
>> all duplicated keys at the same time.
>> Compaction might be more efficient since we need to scan the log and find
>> all duplicates.  However,  the expected use case is to set the time based
>> compaction interval on the order of days,  and be larger than 'min
>> compaction lag".  We don't want log compaction to happen frequently since
>> it is expensive.  The purpose is to help low production rate topic to get
>> compacted on time.  For the topic with "normal" incoming message message
>> rate, the "min dirty ratio" might have triggered the compaction before this
>> time based compaction policy takes effect.
>>
>>
>> Eno,
>>
>> For your question,  like I mentioned we have long time retention use case
>> for log compacted topic, but we want to provide ability to delete certain
>> PII records on time.
>> Kafka itself doesn't know whether a record contains sensitive information
>> and relies on the user for deletion.
>>
>>
>> On Mon, Aug 13, 2018 at 6:58 PM, Dong Lin  wrote:
>>
>>> Hey Xiongqi,
>>>
>>> Thanks for the KIP. I have two questions regarding the use-case for
>>> meeting
>>> GDPR requirement.
>>>
>>> 1) If I recall correctly, one of the GDPR requirement is that we can not
>>> keep messages longer than e.g. 30 days in storage (e.g. Kafka). Say there
>>> exists a partition p0 which contains message1 with key1 and message2 with
>>> key2. And then user keeps producing messages with key=key2 to this
>>> partition. Since message1 with key1 is never overridden, sooner or lat

[jira] [Created] (KAFKA-7297) Both read/write access to Log.segments should be protected by lock

2018-08-15 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7297:
---

 Summary: Both read/write access to Log.segments should be 
protected by lock
 Key: KAFKA-7297
 URL: https://issues.apache.org/jira/browse/KAFKA-7297
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin


Log.replaceSegments() updates segments in two steps. It first adds new segments 
and then remove old segments. Though this operation is protected by a lock, 
other read access such as Log.logSegments does not grab lock and thus these 
methods may return an inconsistent view of the segments.

As an example, say Log.replaceSegments() intends to replace segments [0, 100), 
[100, 200) with a new segment [0, 200). In this case if Log.logSegments is 
called right after the new segments are added, the method may return segments 
[0, 200), [100, 200) and messages in the range [100, 200) may be duplicated if 
caller choose to enumerate all messages in all segments returned by the method.

The solution is probably to protect read/write access to Log.segments with 
read/write lock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-347: Enable batching in FindCoordinatorRequest

2018-08-15 Thread Yishun Guan
Hi, I am looking into AdminClient.scala and AdminClient.java, and also
looking into ApiVersionRequest.java and ApiVersionResponse.java, but I
don't see anywhere contains to logic of the one-to-one mapping from version
to version, am i looking at the right place?

On Mon, Aug 13, 2018 at 1:30 PM Guozhang Wang  wrote:

> Regarding 3): Today we do not have this logic with the existing client,
> because defer the decision about the version to use (we always assume that
> an new versioned request need to be down-converted to a single old
> versioned request: i.e. an one-to-one mapping), but in principle, we should
> be able to modify the client make it work.
>
> Again this is not necessarily need to be included in this KIP, but I'd
> recommend you to look into AdminClient implementations around the
> ApiVersionRequest / Response and think about how that logic can be modified
> in the follow-up PR of this KIP.
>
>
>
> Guozhang
>
> On Mon, Aug 13, 2018 at 12:55 PM, Yishun Guan  wrote:
>
> > @Guozhang, thank you so much!
> > 1. I agree, fixed.
> > 2. Added.
> > 3. I see, that is something that I haven't think about. How does Kafka
> > handle other api's different version problem now? So we have a specific
> > convertor that convect a new version request to a old version one for
> each
> > API (is this what the ApiVersionsRequest supposed to do, does it only
> > handle the detection or it should handle the conversion too)? What will
> be
> > the consequence of not having such a transformer but the version is
> > incompatible?
> >
> > Best,
> > Yishun
> >
> > On Sat, Aug 11, 2018 at 11:27 AM Guozhang Wang 
> wrote:
> >
> > > Hello Yishun,
> > >
> > > Thanks for the proposed KIP. I made a pass over the wiki and here are
> > some
> > > comments:
> > >
> > > 1. "DESCRIBE_GROUPS_RESPONSE_MEMBER_V0", why we need to encode the full
> > > schema for the "COORDINATOR_GROUPIDS_KEY_NAME" field? Note it includes
> a
> > > lot of fields such as member id that is not needed for this case. I
> > think a
> > > "new ArrayOf(String)" for the group ids should be sufficient.
> > >
> > > 2. "schemaVersions" of the "FindCoordinatorRequest" needs to include
> > > FIND_COORDINATOR_REQUEST_V3 as well.
> > >
> > > 3. One thing you may need to consider is that, in the adminClient to
> > handle
> > > broker compatibility, how to transform a new (v3) request to a bunch of
> > > (v2) requests if it detects the broker is still in old version and
> hence
> > > cannot support v3 request (this logic is already implemented via
> > > ApiVersionsRequest in AdminClient, but may need to be extended to
> handle
> > > one-to-many mapping of different versions).
> > >
> > > This is not sth. that you need to implement under this KIP, but I'd
> > > recommend you think about this earlier than later and see if it may
> > affect
> > > this proposal.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sat, Aug 11, 2018 at 10:54 AM, Yishun Guan 
> wrote:
> > >
> > > > Hi, thank you Ted! I have addressed your comments:
> > > >
> > > > 1. Added more descriptions about later optimization.
> > > > 2. Yes, I will implement the V3 later when this KIP gets accepted.
> > > > 3. Fixed.
> > > >
> > > > Thanks,
> > > > Yishun
> > > >
> > > > On Fri, Aug 10, 2018 at 3:32 PM Ted Yu  wrote:
> > > >
> > > > > bq. this is the foundation of some later possible
> > optimizations(enable
> > > > > batching in *describeConsumerGroups ...*
> > > > >
> > > > > *Can you say more why this change lays the foundation for the
> future
> > > > > optimizations ?*
> > > > >
> > > > > *You mentioned **FIND_COORDINATOR_REQUEST_V3 in the wiki but I
> don't
> > > see
> > > > it
> > > > > in PR.*
> > > > > *I assume you would add that later.*
> > > > >
> > > > > *Please read your wiki and fix grammatical error such as the
> > > following:*
> > > > >
> > > > > bq. that need to be make
> > > > >
> > > > > Thanks
> > > > >
> > > > > On Wed, Aug 8, 2018 at 3:55 PM Yishun Guan 
> > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I would like to start a discussion on:
> > > > > >
> > > > > > KIP-347: Enable batching in FindCoordinatorRequest
> > > > > > https://cwiki.apache.org/confluence/x/CgZPBQ
> > > > > >
> > > > > > Thanks @Guozhang Wang  for his help and
> > > patience!
> > > > > >
> > > > > > Thanks,
> > > > > > Yishun
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


[jira] [Created] (KAFKA-7298) Concurrent DeleteRecords can lead to fatal OutOfSequence error in producer

2018-08-15 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-7298:
--

 Summary: Concurrent DeleteRecords can lead to fatal OutOfSequence 
error in producer
 Key: KAFKA-7298
 URL: https://issues.apache.org/jira/browse/KAFKA-7298
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


We have logic in the producer to handle unknown producer errors. Basically when 
the producer gets an unknown producer error, it checks whether the log start 
offset is larger than the last acknowledged offset. If it is, then we know the 
error is spurious and we reset the sequence number to 0, which the broker will 
then accept.

It can happen after a DeleteRecords call, however, that the only record 
remaining in the log is a transaction marker, which does not have a sequence 
number. The error we get in this case is OUT_OF_SEQUENCE rather than 
UNKNOWN_PRODUCER, which is fatal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-15 Thread Brett Rann
We've been looking into this too.

Mailing list:
https://lists.apache.org/thread.html/ed7f6a6589f94e8c2a705553f364ef599cb6915e4c3ba9b561e610e4@%3Cdev.kafka.apache.org%3E
jira wish: https://issues.apache.org/jira/browse/KAFKA-7137
confluent slack discussion:
https://confluentcommunity.slack.com/archives/C49R61XMM/p153076012139

A person on my team has started on code so you might want to coordinate:
https://github.com/dongxiaohe/kafka/tree/dongxiaohe/log-cleaner-compaction-max-lifetime-2.0

 He's been working with Jason Gustafson and James Chen around the changes.
You can ping him on confluent slack as Xiaohe Dong.

It's great to know others are thinking on it as well.

You've added the requirement to force a segment roll which we hadn't gotten
to yet, which is great. I was content with it not including the active
segment.

> Adding topic level configuration "max.compaction.lag.ms",  and
corresponding broker configuration "log.cleaner.max.compaction.lag.ms",
which is set to 0 (disabled) by default.

Glancing at some other settings convention seems to me to be -1 for
disabled (or infinite, which is more meaningful here).  0 to me implies
instant, a little quicker than 1.

We've been trying to think about a way to trigger compaction as well
through an API call, which would need to be flagged somewhere (ZK admin/
space?) but we're struggling to think how that would be coordinated across
brokers and partitions.  Have you given any thought to that?






On Thu, Aug 16, 2018 at 8:44 AM xiongqi wu  wrote:

> Eno, Dong,
>
> I have updated the KIP. We decide not to address the issue that we might
> have for both compaction and time retention enabled topics (see the
> rejected alternative item 2). This KIP will only ensure log can be
> compacted after a specified time-interval.
>
> As suggested by Dong, we will also enforce "max.compaction.lag.ms" is not
> less than "min.compaction.lag.ms".
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-354: Time-based log
> compaction policy
>  compaction policy>
>
>
> On Tue, Aug 14, 2018 at 5:01 PM, xiongqi wu  wrote:
>
> >
> > Per discussion with Dong, he made a very good point that if compaction
> > and time based retention are both enabled on a topic, the compaction
> might
> > prevent records from being deleted on time. The reason is when compacting
> > multiple segments into one single segment, the newly created segment will
> > have same lastmodified timestamp as latest original segment. We lose the
> > timestamp of all original segments except the last one. As a result,
> > records might not be deleted as it should be through time based
> retention.
> >
> > With the current KIP proposal, if we want to ensure timely deletion, we
> > have the following configurations:
> > 1) enable time based log compaction only : deletion is done though
> > overriding the same key
> > 2) enable time based log retention only: deletion is done though
> > time-based retention
> > 3) enable both log compaction and time based retention: Deletion is not
> > guaranteed.
> >
> > Not sure if we have use case 3 and also want deletion to happen on time.
> > There are several options to address deletion issue when enable both
> > compaction and retention:
> > A) During log compaction, looking into record timestamp to delete expired
> > records. This can be done in compaction logic itself or use
> > AdminClient.deleteRecords() . But this assumes we have record timestamp.
> > B) retain the lastModifed time of original segments during log
> compaction.
> > This requires extra meta data to record the information or not grouping
> > multiple segments into one during compaction.
> >
> > If we have use case 3 in general, I would prefer solution A and rely on
> > record timestamp.
> >
> >
> > Two questions:
> > Do we have use case 3? Is it nice to have or must have?
> > If we have use case 3 and want to go with solution A, should we introduce
> > a new configuration to enforce deletion by timestamp?
> >
> >
> > On Tue, Aug 14, 2018 at 1:52 PM, xiongqi wu  wrote:
> >
> >> Dong,
> >>
> >> Thanks for the comment.
> >>
> >> There are two retention policy: log compaction and time based retention.
> >>
> >> Log compaction:
> >>
> >> we have use cases to keep infinite retention of a topic (only
> >> compaction). GDPR cares about deletion of PII (personal identifiable
> >> information) data.
> >> Since Kafka doesn't know what records contain PII, it relies on upper
> >> layer to delete those records.
> >> For those infinite retention uses uses, kafka needs to provide a way to
> >> enforce compaction on time. This is what we try to address in this KIP.
> >>
> >> Time based retention,
> >>
> >> There are also use cases that users of Kafka might want to expire all
> >> their data.
> >> In those cases, they can use time based retention of their topics.
> >>
> >>
> >> Regarding your first question, if a user wants to delete a key in

Build failed in Jenkins: kafka-1.1-jdk7 #180

2018-08-15 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-7284: streams should unwrap fenced exception (#5513)

--
[...truncated 423.07 KB...]
kafka.zk.KafkaZkClientTest > testCreateConfigChangeNotification STARTED

kafka.zk.KafkaZkClientTest > testCreateConfigChangeNotification PASSED

kafka.zk.KafkaZkClientTest > testDelegationTokenMethods STARTED

kafka.zk.KafkaZkClientTest > testDelegationTokenMethods PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic STARTED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic PASSED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > testSessionExpireListenerMetrics STARTED

kafka.metrics.MetricsTest > testSessionExpireListenerMetrics PASSED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > testClusterIdMetric STARTED

kafka.metrics.MetricsTest > testClusterIdMetric PASSED

kafka.metrics.MetricsTest > testControllerMetrics STARTED

kafka.metrics.MetricsTest > testControllerMetrics PASSED

kafka.metrics.MetricsTest > testWindowsStyleTagNames STARTED

kafka.metrics.MetricsTest > testWindowsStyleTagNames PASSED

kafka.metrics.MetricsTest > testMetricsLeak STARTED

kafka.metrics.MetricsTest > testMetricsLeak PASSED

kafka.metrics.MetricsTest > testBrokerTopicMetricsBytesInOut STARTED

kafka.metrics.MetricsTest > testBrokerTopicMetricsBytesInOut PASSED

kafka.metrics.KafkaTimerTest > testKafkaTimer STARTED

kafka.metrics.KafkaTimerTest > testKafkaTimer PASSED

kafka.security.auth.ResourceTypeTest > testJavaConversions STARTED

kafka.security.auth.ResourceTypeTest > testJavaConversions PASSED

kafka.security.auth.ResourceTypeTest > testFromString STARTED

kafka.security.auth.ResourceTypeTest > testFromString PASSED

kafka.security.auth.OperationTest > testJavaConversions STARTED

kafka.security.auth.OperationTest > testJavaConversions PASSED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled STARTED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled PASSED

kafka.security.auth.ZkAuthorizationTest > testZkUtils STARTED

kafka.security.auth.ZkAuthorizationTest > testZkUtils PASSED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration STARTED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testZkMigration STARTED

kafka.security.auth.ZkAuthorizationTest > testZkMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testChroot STARTED

kafka.security.auth.ZkAuthorizationTest > testChroot PASSED

kafka.security.auth.ZkAuthorizationTest > testDelete STARTED

kafka.security.auth.ZkAuthorizationTest > testDelete PASSED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive STARTED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive PASSED

kafka.security.auth.AclTest > testAclJsonConversion STARTED

kafka.security.auth.AclTest > testAclJsonConversion PASSED

kafka.security.auth.PermissionTypeTest > testJavaConversions STARTED

kafka.security.auth.PermissionTypeTest > testJavaConversions PASSED

kafka.security.auth.PermissionTypeTest > testFromString STARTED

kafka.security.auth.PermissionTypeTest > testFromString PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAllowAllAccess STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAllowAllAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testLocalConcurrentModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 

Build failed in Jenkins: kafka-trunk-jdk10 #407

2018-08-15 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] MINOR: (re)add equals/hashCode to *Windows (#5510)

--
[...truncated 1.98 MB...]
org.apache.kafka.streams.StreamsConfigTest > shouldAcceptExactlyOnce STARTED

org.apache.kafka.streams.StreamsConfigTest > shouldAcceptExactlyOnce PASSED

org.apache.kafka.streams.StreamsConfigTest > 
testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix STARTED

org.apache.kafka.streams.StreamsConfigTest > 
testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldForwardCustomConfigsWithNoPrefixToAllClients STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldForwardCustomConfigsWithNoPrefixToAllClients PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldResetToDefaultIfRestoreConsumerAutoCommitIsOverridden STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldResetToDefaultIfRestoreConsumerAutoCommitIsOverridden PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSpecifyCorrectValueSerdeClassOnError STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSpecifyCorrectValueSerdeClassOnError PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldResetToDefaultIfGlobalConsumerAutoCommitIsOverridden STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldResetToDefaultIfGlobalConsumerAutoCommitIsOverridden PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedGlobalConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedGlobalConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportNonPrefixedAdminConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportNonPrefixedAdminConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportMultipleBootstrapServers STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportMultipleBootstrapServers PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfKeySerdeConfigFails STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfKeySerdeConfigFails PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportNonPrefixedProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportNonPrefixedProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > testGetRestoreConsumerConfigs 
STARTED

org.apache.kafka.streams.StreamsConfigTest > testGetRestoreConsumerConfigs 
PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfBootstrapServersIsNotSet STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfBootstrapServersIsNotSet PASSED

org.apache.kafka.streams.StreamsConfigTest > 
consumerConfigMustUseAdminClientConfigForRetries STARTED

org.apache.kafka.streams.StreamsConfigTest > 
consumerConfigMustUseAdminClientConfigForRetries PASSED

org.apache.kafka.streams.StreamsConfigTest > 
testGetMainConsumerConfigsWithMainConsumerOverridenPrefix STARTED

org.apache.kafka.streams.StreamsConfigTest > 
testGetMainConsumerConfigsWithMainConsumerOverridenPrefix PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedRestoreConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedRestoreConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedRestoreConsumerConfigs STARTED

org.apache.kafka.streams.Streams

Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-15 Thread Brett Rann
Eno,

For us as well the requirement is around compacted topics because they are
the topics that already facilitate selective deletes. Currently they allow
specifying a minimum life time, but lacks the ability to specify a maximum
life time.

For non compacted topics there's no ability to delete individual messages,
they're immutable logs. We treat those with hard rules: Max retention time
on the topic; accept the topic may get truncated; or to not store
information that may be subject to GDPR. (and i've read others use tricks
with encryption and forgetting the decryption key).

Enhancing compaction to support a max compaction time makes the compacted
topics more useful, especially in that it allows the dirty ratio to be used
for its intended purpose while allowing automatic cleaning based on a new
time config.

On Tue, Aug 14, 2018 at 9:00 PM Eno Thereska  wrote:

> Adding to this, what about topics that are not log compacted? As Dong says,
> "one of the GDPR requirement is that we can not keep messages longer than
> e.g. 30 days in storage (e.g. Kafka)". The GDPR requirement must hold
> irrespective of the low level details, on whether the topic is compacted or
> not, right?
>
> Thanks
> Eno
>
>
> On Mon, Aug 13, 2018 at 6:58 PM, Dong Lin  wrote:
>
> > Hey Xiongqi,
> >
> > Thanks for the KIP. I have two questions regarding the use-case for
> meeting
> > GDPR requirement.
> >
> > 1) If I recall correctly, one of the GDPR requirement is that we can not
> > keep messages longer than e.g. 30 days in storage (e.g. Kafka). Say there
> > exists a partition p0 which contains message1 with key1 and message2 with
> > key2. And then user keeps producing messages with key=key2 to this
> > partition. Since message1 with key1 is never overridden, sooner or later
> we
> > will want to delete message1 and keep the latest message with key=key2.
> But
> > currently it looks like log compact logic in Kafka will always put these
> > messages in the same segment. Will this be an issue?
> >
> > 2) The current KIP intends to provide the capability to delete a given
> > message in log compacted topic. Does such use-case also require Kafka to
> > keep the messages produced before the given message? If yes, then we can
> > probably just use AdminClient.deleteRecords() or time-based log retention
> > to meet the use-case requirement. If no, do you know what is the GDPR's
> > requirement on time-to-deletion after user explicitly requests the
> deletion
> > (e.g. 1 hour, 1 day, 7 day)?
> >
> > Thanks,
> > Dong
> >
> >
> > On Mon, Aug 13, 2018 at 3:44 PM, xiongqi wu  wrote:
> >
> > > Hi Eno,
> > >
> > > The GDPR request we are getting here at linkedin is if we get a request
> > to
> > > delete a record through a null key on a log compacted topic,
> > > we want to delete the record via compaction in a given time period
> like 2
> > > days (whatever is required by the policy).
> > >
> > > There might be other issues (such as orphan log segments under certain
> > > conditions) that lead to GDPR problem but they are more like something
> > we
> > > need to fix anyway regardless of GDPR.
> > >
> > >
> > > -- Xiongqi (Wesley) Wu
> > >
> > > On Mon, Aug 13, 2018 at 2:56 PM, Eno Thereska 
> > > wrote:
> > >
> > > > Hello,
> > > >
> > > > Thanks for the KIP. I'd like to see a more precise definition of what
> > > part
> > > > of GDPR you are targeting as well as some sort of verification that
> > this
> > > > KIP actually addresses the problem. Right now I find this a bit
> vague:
> > > >
> > > > "Ability to delete a log message through compaction in a timely
> manner
> > > has
> > > > become an important requirement in some use cases (e.g., GDPR)"
> > > >
> > > >
> > > > Is there any guarantee that after this KIP the GDPR problem is solved
> > or
> > > do
> > > > we need to do something else as well, e.g., more KIPs?
> > > >
> > > >
> > > > Thanks
> > > >
> > > > Eno
> > > >
> > > >
> > > >
> > > > On Thu, Aug 9, 2018 at 4:18 PM, xiongqi wu 
> > wrote:
> > > >
> > > > > Hi Kafka,
> > > > >
> > > > > This KIP tries to address GDPR concern to fulfill deletion request
> on
> > > > time
> > > > > through time-based log compaction on a compaction enabled topic:
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 
> > > > > 354%3A+Time-based+log+compaction+policy
> > > > >
> > > > > Any feedback will be appreciated.
> > > > >
> > > > >
> > > > > Xiongqi (Wesley) Wu
> > > > >
> > > >
> > >
> >
>


-- 

Brett Rann

Senior DevOps Engineer


Zendesk International Ltd

395 Collins Street, Melbourne VIC 3000 Australia

Mobile: +61 (0) 418 826 017


Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-15 Thread xiongqi wu
Brett,

Thank you for your comments.
I was thinking since we already has immediate compaction setting by setting
min dirty ratio to 0, so I decide to use "0" as disabled state.
I am ok to go with -1(disable), 0 (immediate) options.

For the implementation, there are a few differences between mine and
"Xiaohe Dong"'s :
1) I used the estimated creation time of a log segment instead of largest
timestamp of a log to determine the compaction eligibility, because a log
segment might stay as an active segment up to "max compaction lag". (see
the KIP for detail).
2) I measure how much bytes that we must clean to follow the "max
compaction lag" rule, and use that to determine the order of compaction.
3) force active segment to roll to follow the "max compaction lag"

I can share my code so we can coordinate.

I haven't think about a new API to force a compaction. what is the use case
for this one?


On Wed, Aug 15, 2018 at 5:33 PM, Brett Rann 
wrote:

> We've been looking into this too.
>
> Mailing list:
> https://lists.apache.org/thread.html/ed7f6a6589f94e8c2a705553f364ef
> 599cb6915e4c3ba9b561e610e4@%3Cdev.kafka.apache.org%3E
> jira wish: https://issues.apache.org/jira/browse/KAFKA-7137
> confluent slack discussion:
> https://confluentcommunity.slack.com/archives/C49R61XMM/p153076012139
>
> A person on my team has started on code so you might want to coordinate:
> https://github.com/dongxiaohe/kafka/tree/dongxiaohe/log-
> cleaner-compaction-max-lifetime-2.0
>
>  He's been working with Jason Gustafson and James Chen around the changes.
> You can ping him on confluent slack as Xiaohe Dong.
>
> It's great to know others are thinking on it as well.
>
> You've added the requirement to force a segment roll which we hadn't gotten
> to yet, which is great. I was content with it not including the active
> segment.
>
> > Adding topic level configuration "max.compaction.lag.ms",  and
> corresponding broker configuration "log.cleaner.max.compaction.lag.ms",
> which is set to 0 (disabled) by default.
>
> Glancing at some other settings convention seems to me to be -1 for
> disabled (or infinite, which is more meaningful here).  0 to me implies
> instant, a little quicker than 1.
>
> We've been trying to think about a way to trigger compaction as well
> through an API call, which would need to be flagged somewhere (ZK admin/
> space?) but we're struggling to think how that would be coordinated across
> brokers and partitions.  Have you given any thought to that?
>
>
>
>
>
>
> On Thu, Aug 16, 2018 at 8:44 AM xiongqi wu  wrote:
>
> > Eno, Dong,
> >
> > I have updated the KIP. We decide not to address the issue that we might
> > have for both compaction and time retention enabled topics (see the
> > rejected alternative item 2). This KIP will only ensure log can be
> > compacted after a specified time-interval.
> >
> > As suggested by Dong, we will also enforce "max.compaction.lag.ms" is
> not
> > less than "min.compaction.lag.ms".
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-354: Time-based
> log
> > compaction policy
> >  log compaction policy>
> >
> >
> > On Tue, Aug 14, 2018 at 5:01 PM, xiongqi wu  wrote:
> >
> > >
> > > Per discussion with Dong, he made a very good point that if compaction
> > > and time based retention are both enabled on a topic, the compaction
> > might
> > > prevent records from being deleted on time. The reason is when
> compacting
> > > multiple segments into one single segment, the newly created segment
> will
> > > have same lastmodified timestamp as latest original segment. We lose
> the
> > > timestamp of all original segments except the last one. As a result,
> > > records might not be deleted as it should be through time based
> > retention.
> > >
> > > With the current KIP proposal, if we want to ensure timely deletion, we
> > > have the following configurations:
> > > 1) enable time based log compaction only : deletion is done though
> > > overriding the same key
> > > 2) enable time based log retention only: deletion is done though
> > > time-based retention
> > > 3) enable both log compaction and time based retention: Deletion is not
> > > guaranteed.
> > >
> > > Not sure if we have use case 3 and also want deletion to happen on
> time.
> > > There are several options to address deletion issue when enable both
> > > compaction and retention:
> > > A) During log compaction, looking into record timestamp to delete
> expired
> > > records. This can be done in compaction logic itself or use
> > > AdminClient.deleteRecords() . But this assumes we have record
> timestamp.
> > > B) retain the lastModifed time of original segments during log
> > compaction.
> > > This requires extra meta data to record the information or not grouping
> > > multiple segments into one during compaction.
> > >
> > > If we have use case 3 in general, I would prefer solution A and rely on
> > > record timestamp.
> > >
> > >
> > > 

Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-15 Thread Brett Rann
An API was suggested by Gwen and James when I discussed it with them. For
me I can think of it as a use case for scheduling compaction rather than
relying on an config based time trigger.  We're looking at creating some
potentially very large compacted topics for event sourcing and from an
operators perspective I'm concerned about potential impact of long running
compaction, especially if multiple topics run back to back.  Having the
ability to schedule them at my own time gives some peace of mind for that
concern.  Another use case might be a more urgent delete. That could be
done manually now by just modifying the max time and waiting for compaction
to run. But hitting an API end point is a bit cleaner.

But in thinking about what that mechanism would be it started to feel like
it would be a complicated implementation so we've put it aside for now. But
maybe we just haven't seen the clean way yet.



On Thu, Aug 16, 2018 at 11:22 AM xiongqi wu  wrote:

> Brett,
>
> Thank you for your comments.
> I was thinking since we already has immediate compaction setting by setting
> min dirty ratio to 0, so I decide to use "0" as disabled state.
> I am ok to go with -1(disable), 0 (immediate) options.
>
> For the implementation, there are a few differences between mine and
> "Xiaohe Dong"'s :
> 1) I used the estimated creation time of a log segment instead of largest
> timestamp of a log to determine the compaction eligibility, because a log
> segment might stay as an active segment up to "max compaction lag". (see
> the KIP for detail).
> 2) I measure how much bytes that we must clean to follow the "max
> compaction lag" rule, and use that to determine the order of compaction.
> 3) force active segment to roll to follow the "max compaction lag"
>
> I can share my code so we can coordinate.
>
> I haven't think about a new API to force a compaction. what is the use case
> for this one?
>
>
> On Wed, Aug 15, 2018 at 5:33 PM, Brett Rann 
> wrote:
>
> > We've been looking into this too.
> >
> > Mailing list:
> > https://lists.apache.org/thread.html/ed7f6a6589f94e8c2a705553f364ef
> 
> > 599cb6915e4c3ba9b561e610e4@%3Cdev.kafka.apache.org%3E
> > jira wish: https://issues.apache.org/jira/browse/KAFKA-7137
> 
> > confluent slack discussion:
> >
> https://confluentcommunity.slack.com/archives/C49R61XMM/p153076012139
> 
> >
> > A person on my team has started on code so you might want to coordinate:
> > https://github.com/dongxiaohe/kafka/tree/dongxiaohe/log-
> 
> > cleaner-compaction-max-lifetime-2.0
> >
> > He's been working with Jason Gustafson and James Chen around the changes.
> > You can ping him on confluent slack as Xiaohe Dong.
> >
> > It's great to know others are thinking on it as well.
> >
> > You've added the requirement to force a segment roll which we hadn't
> gotten
> > to yet, which is great. I was content with it not including the active
> > segment.
> >
> > > Adding topic level configuration "max.compaction.lag.ms", and
> > corresponding broker configuration "log.cleaner.max.compaction.lag.ms",
> > which is set to 0 (disabled) by default.
> >
> > Glancing at some other settings convention seems to me to be -1 for
> > disabled (or infinite, which is more meaningful here). 0 to me implies
> > instant, a little quicker than 1.
> >
> > We've been trying to think about a way to trigger compaction as well
> > through an API call, which would need to be flagged somewhere (ZK admin/
> > space?) but we're struggling to think how that would be coordinated
> across
> > brokers and partitions. Have you given any thought to that?
> >
> >
> >
> >
> >
> >
> > On Thu, Aug 16, 2018 at 8:44 AM xiongqi wu  wrote:
> >
> > > Eno, Dong,
> > >
> > > I have updated the KIP. We decide not to address the issue that we
> might
> > > have for both compaction and time retention enabled topics (see the
> > > rejected alternative item 2). This KIP will only ensure log can be
> > > compacted after a specified time-interval.
> > >
> > > As suggested by Dong, we will also enforce "max.compaction.lag.ms" is
> > not
> > > less than "min.compaction.lag.ms".
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-354
>  Time-based
> > log
> > > compaction policy
> > >   Time-based
> > log compaction policy>
> > >
> > >
> > > On Tue, Aug 14, 2018 at 5:01 PM, xiongqi wu 
> wrote:
> > >
> > > >
> > > > Per discussion with Dong, he made a very good point that if
> compaction
> > > > and time based retention are both enabled on a topic, the compaction
> > > might
> > > > prevent records from being deleted on time.

[jira] [Created] (KAFKA-7299) batch LeaderAndIsr requests during auto preferred leader election

2018-08-15 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7299:
--

 Summary: batch LeaderAndIsr requests during auto preferred leader 
election
 Key: KAFKA-7299
 URL: https://issues.apache.org/jira/browse/KAFKA-7299
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 2.0.0
Reporter: Jun Rao


Currently, in KafkaController.checkAndTriggerAutoLeaderRebalance(), we call 
onPreferredReplicaElection() one partition at a time. This means that the 
controller will be sending LeaderAndIsrRequest one partition at a time. It 
would be more efficient to call onPreferredReplicaElection() for a batch of 
partitions to reduce the number of LeaderAndIsrRequests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk10 #408

2018-08-15 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-7285: Create new producer on each rebalance if EOS enabled 
(#5501)

--
[...truncated 1.54 MB...]
kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAppendNewMetadataToLogOnAddPartitionsWhenPartitionsAdded PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestOnEndTxnWhenTransactionalIdIsNull STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestOnEndTxnWhenTransactionalIdIsNull PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithCoordinatorLoadInProgressOnEndTxnWhenCoordinatorIsLoading 
STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithCoordinatorLoadInProgressOnEndTxnWhenCoordinatorIsLoading 
PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithSuccessOnAddPartitionsWhenStateIsCompleteAbort STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithSuccessOnAddPartitionsWhenStateIsCompleteAbort PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareAbortState
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareAbortState
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithErrorsNoneOnAddPartitionWhenNoErrorsAndPartitionsTheSame 
STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithErrorsNoneOnAddPartitionWhenNoErrorsAndPartitionsTheSame PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestOnEndTxnWhenTransactionalIdIsEmpty STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestOnEndTxnWhenTransactionalIdIsEmpty PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsEmpty
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsEmpty
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAppendPrepareAbortToLogOnEndTxnWhenStatusIsOngoingAndResultIsAbort STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAppendPrepareAbortToLogOnEndTxnWhenStatusIsOngoingAndResultIsAbort PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAcceptInitPidAndReturnNextPidWhenTransactionalIdIsNull STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAcceptInitPidAndReturnNextPidWhenTransactionalIdIsNull PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRemoveTransactionsForPartitionOnEmigration STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRemoveTransactionsForPartitionOnEmigration PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareCommitState
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareCommitState
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAbortExpiredTransactionsInOngoingStateAndBumpEpoch STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAbortExpiredTransactionsInOngoingStateAndBumpEpoch PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnInvalidTxnRequestOnEndTxnRequestWhenStatusIsCompleteCommitAndResultIsNotCommit
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnInvalidTxnRequestOnEndTxnRequestWhenStatusIsCompleteCommitAndResultIsNotCommit
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnOkOnEndTxnWhenStatusIsCompleteCommitAndResultIsCommit STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnOkOnEndTxnWhenStatusIsCompleteCommitAndResultIsCommit PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionsOnAddPartitionsWhenStateIsPrepareCommit 
STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionsOnAddPartitionsWhenStateIsPrepareCommit 
PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldIncrementEpochAndUpdateMetadataOnHandleInitPidWhenExistingCompleteTransaction
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldIncrementEpochAndUpdateMetadataOnHandleInitPidWhenExistingCompleteTransaction
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldGenerateNewProducerI

Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-15 Thread Brett Rann
> (segment.largestTimestamp is lastModified time of the log segment or max
timestamp we see for the log segment. Due to the lack of record timestamp,
segment.largestTimestamp might be earlier than the actual timestamp of
latest record of that segment.).

I'm curious about the mention of last modified time of the segment.  As
noted back in here
https://cwiki-test.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
using the creation/modified time of files is unreliable in cases of
partitions being moved. It's why all those .timeindex files for each
partition appeared in 0.10.* I wonder if there's a better way using that to
get at a timestamp?

On Thu, Aug 16, 2018 at 11:30 AM Brett Rann  wrote:

> An API was suggested by Gwen and James when I discussed it with them. For
> me I can think of it as a use case for scheduling compaction rather than
> relying on an config based time trigger.  We're looking at creating some
> potentially very large compacted topics for event sourcing and from an
> operators perspective I'm concerned about potential impact of long running
> compaction, especially if multiple topics run back to back.  Having the
> ability to schedule them at my own time gives some peace of mind for that
> concern.  Another use case might be a more urgent delete. That could be
> done manually now by just modifying the max time and waiting for compaction
> to run. But hitting an API end point is a bit cleaner.
>
> But in thinking about what that mechanism would be it started to feel like
> it would be a complicated implementation so we've put it aside for now. But
> maybe we just haven't seen the clean way yet.
>
>
>
> On Thu, Aug 16, 2018 at 11:22 AM xiongqi wu  wrote:
>
>> Brett,
>>
>> Thank you for your comments.
>> I was thinking since we already has immediate compaction setting by
>> setting
>> min dirty ratio to 0, so I decide to use "0" as disabled state.
>> I am ok to go with -1(disable), 0 (immediate) options.
>>
>> For the implementation, there are a few differences between mine and
>> "Xiaohe Dong"'s :
>> 1) I used the estimated creation time of a log segment instead of largest
>> timestamp of a log to determine the compaction eligibility, because a log
>> segment might stay as an active segment up to "max compaction lag". (see
>> the KIP for detail).
>> 2) I measure how much bytes that we must clean to follow the "max
>> compaction lag" rule, and use that to determine the order of compaction.
>> 3) force active segment to roll to follow the "max compaction lag"
>>
>> I can share my code so we can coordinate.
>>
>> I haven't think about a new API to force a compaction. what is the use
>> case
>> for this one?
>>
>>
>> On Wed, Aug 15, 2018 at 5:33 PM, Brett Rann 
>> wrote:
>>
>> > We've been looking into this too.
>> >
>> > Mailing list:
>> > https://lists.apache.org/thread.html/ed7f6a6589f94e8c2a705553f364ef
>> 
>> > 599cb6915e4c3ba9b561e610e4@%3Cdev.kafka.apache.org%3E
>> > jira wish: https://issues.apache.org/jira/browse/KAFKA-7137
>> 
>> > confluent slack discussion:
>> >
>> https://confluentcommunity.slack.com/archives/C49R61XMM/p153076012139
>> 
>> >
>> > A person on my team has started on code so you might want to coordinate:
>> > https://github.com/dongxiaohe/kafka/tree/dongxiaohe/log-
>> 
>> > cleaner-compaction-max-lifetime-2.0
>> >
>> > He's been working with Jason Gustafson and James Chen around the
>> changes.
>> > You can ping him on confluent slack as Xiaohe Dong.
>> >
>> > It's great to know others are thinking on it as well.
>> >
>> > You've added the requirement to force a segment roll which we hadn't
>> gotten
>> > to yet, which is great. I was content with it not including the active
>> > segment.
>> >
>> > > Adding topic level configuration "max.compaction.lag.ms", and
>> > corresponding broker configuration "log.cleaner.max.compaction.lag.ms",
>> > which is set to 0 (disabled) by default.
>> >
>> > Glancing at some other settings convention seems to me to be -1 for
>> > disabled (or infinite, which is more meaningful here). 0 to me implies
>> > instant, a little quicker than 1.
>> >
>> > We've been trying to think about a way to trigger compaction as well
>> > through an API call, which would need to be flagged somewhere (ZK admin/
>> > space?) but we're struggling to think how that would be coordinated
>> across
>> > brokers and partitions. Have you given any thought to that?
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Thu, Aug 16, 2018 at 8:44 AM xiongqi wu  wrote:
>> >
>> > > Eno, Dong,
>> > >
>> > > I have updated the KIP. We decide not to address the issue that we
>> might
>> > > have for both compaction and time retention enabled topics (see the
>> > > rejected alternative item 2). This KIP will onl

Build failed in Jenkins: kafka-1.1-jdk7 #181

2018-08-15 Thread Apache Jenkins Server
See 


Changes:

[matthias] KAFKA-7285: Create new producer on each rebalance if EOS enabled 
(#5501)

--
[...truncated 1.93 MB...]

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSink STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSink PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testNamedTopicMatchesAlreadyProvidedPattern STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testNamedTopicMatchesAlreadyProvidedPattern PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithSameName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithSameName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithSelfParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithSelfParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSelfParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSelfParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSink STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSink PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testTopicGroups STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testTopicGroups PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testBuild STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testBuild PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowOffsetResetSourceWithoutTopics STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowOffsetResetSourceWithoutTopics PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAddNullStateStoreSupplier STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAddNullStateStoreSupplier PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullTopicWhenAddingSink STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullTopicWhenAddingSink PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddSourceWithOffsetReset STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddSourceWithOffsetReset PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldThroughOnUnassignedStateStoreAccess STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldThroughOnUnassignedStateStoreAccess PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddInternalTopicConfigForRepartitionTopics STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddInternalTopicConfigForRepartitionTopics PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddTimestampExtractorWithOffsetResetPerSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddTimestampExtractorWithOffsetResetPerSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddMoreThanOnePatternSourceNode STARTED

org.apach

Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2018-08-15 Thread Jan Filipiak
even before message headers, the option for me always existed to just 
wrap the messages into my own custom envelop.
So I of course thought this through. One sentence in your last email 
triggered all the thought process I put in the back then
again to design it in the, what i think is the "kafka-way". It ended up 
ranting a little about what happened in the past.


I see plenty of colleagues of mine falling into traps in the API, that I 
did warn about in the 1.0 DSL rewrite. I have the same
feeling again. So I hope it gives you some insights into my though 
process. I am aware that since i never ported 213 to higher
streams version, I don't really have a steak here and initially I didn't 
feel like actually sending it. But maybe you can pull

something good from it.

 Best jan


On 15.08.2018 04:44, Adam Bellemare wrote:

@Jan
Thanks Jan. I take it you mean "key-widening" somehow includes information
about which record is processed first? I understand about a CombinedKey
with both the Foreign and Primary key, but I don't see how you track
ordering metadata in there unless you actually included a metadata field in
the key type as well.

@Guozhang
As Jan mentioned earlier, is Record Headers mean to strictly be used in
just the user-space? It seems that it is possible that a collision on the
(key,value) tuple I wish to add to it could occur. For instance, if I
wanted to add a ("foreignKeyOffset",10) to the Headers but the user already
specified their own header with the same key name, then it appears there
would be a collision. (This is one of the issues I brought up in the KIP).



I will be posting a prototype PR against trunk within the next day or two.
One thing I need to point out is that my design very strictly wraps the
entire foreignKeyJoin process entirely within the DSL function. There is no
exposure of CombinedKeys or widened keys, nothing to resolve with regards
to out-of-order processing and no need for the DSL user to even know what's
going on inside of the function. The code simply returns the results of the
join, keyed by the original key. Currently my API mirrors identically the
format of the data returned by the regular join function, and I believe
that this is very useful to many users of the DSL. It is my understanding
that one of the main design goals of the DSL is to provide higher level
functionality without requiring the users to know exactly what's going on
under the hood. With this in mind, I thought it best to solve ordering and
partitioning problems within the function and eliminate the requirement for
users to do additional work after the fact to resolve the results of their
join. Basically, I am assuming that most users of the DSL just "want it to
work" and want it to be easy. I did this operating under the assumption
that if a user truly wants to optimize their own workflow down to the
finest details then they will break from strictly using the DSL and move
down to the processors API.

I think. The abstraction is not powerful enough
to not have kafka specifics leak up The leak I currently think this has 
is that you can not reliable prevent the delete coming out first,
before you emit the correct new record. As it is an abstraction entirely 
around kafka.
I can only recommend to not to. Honesty and simplicity should always be 
first prio
trying to hide this just makes it more complex, less understandable and 
will lead to mistakes

in usage.

Exactly why I am also in big disfavour of GraphNodes and later 
optimization stages.
Can someone give me an example of an optimisation that really can't be 
handled by the user

constructing his topology differently?
Having reusable Processor API components accessible by the DSL and 
composable as
one likes is exactly where DSL should max out and KSQL should do the 
next step.
I find it very unprofessional from a software engineering approach to 
run software where
you can not at least senseful reason about the inner workings of the 
libraries used.

Gives this people have to read and understand in anyway, why try to hide it?

It really miss the beauty of 0.10 version DSL.
Apparently not a thing I can influence but just warn about.

@gouzhang
you can't imagine how many extra IQ-Statestores I constantly prune from 
stream app's
because people just keep passing Materialized's into all the operations. 
:D :'-(
I regret that I couldn't convince you guys back then. Plus this whole 
entire topology as a floating

interface chain, never seen it anywhere :-/ :'(

I don't know. I guess this is just me regretting to only have 24h/day.



I updated the KIP today with some points worth talking about, should anyone
be so inclined to check it out. Currently we are running this code in
production to handle relational joins from our Kafka Connect topics, as per
the original motivation of the KIP.










I believe the foreignKeyJoin should be responsible for. In my



On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang  wrote

[jira] [Created] (KAFKA-7300) Add Fetcher fetch-error-rate and fetch-error-total metrics

2018-08-15 Thread Kevin Lu (JIRA)
Kevin Lu created KAFKA-7300:
---

 Summary: Add Fetcher fetch-error-rate and fetch-error-total 
metrics 
 Key: KAFKA-7300
 URL: https://issues.apache.org/jira/browse/KAFKA-7300
 Project: Kafka
  Issue Type: New Feature
  Components: clients, consumer, metrics
Reporter: Kevin Lu
Assignee: Kevin Lu


The KafkaConsumer is a complex client that requires many different components 
to function properly. When a consumer fails, it can be difficult to identify 
the root cause and which component failed (ConsumerCoordinator, Fetcher, 
ConsumerNetworkClient, etc).

 

This aims to improve the monitoring and detection of KafkaConsumer’s Fetcher 
component.

 

Fetcher will send a fetch request for each node that the consumer has assigned 
partitions for.

 

This fetch request may fail under the following cases:
 * Intermittent network issues (goes to onFailure)
 * Node sent an invalid full/incremental fetch response (FetchSessionHandler’s 
handleResponse returns false)
 * FetchSessionIdNotFound
 * InvalidFetchSessionEpochException

 

These cases are logged, but it would be valuable to provide a corresponding 
metric that allows for monitoring and alerting.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk8 #2900

2018-08-15 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-7285: Create new producer on each rebalance if EOS enabled 
(#5501)

--
[...truncated 2.48 MB...]
org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSink STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSink PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithNullParents STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithNullParents PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testNamedTopicMatchesAlreadyProvidedPattern STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testNamedTopicMatchesAlreadyProvidedPattern PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithSameName STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithSameName PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithSelfParent STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithSelfParent PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddProcessorWithSelfParent STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddProcessorWithSelfParent PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddStateStoreWithSink STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddStateStoreWithSink PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testTopicGroups STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testTopicGroups PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldSortProcessorNodesCorrectly STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldSortProcessorNodesCorrectly PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testBuild STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testBuild PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowOffsetResetSourceWithoutTopics STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowOffsetResetSourceWithoutTopics PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAddNullStateStoreSupplier STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAddNullStateStoreSupplier PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddProcessorWithNullParents STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddProcessorWithNullParents PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithEmptyParents STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithEmptyParents PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSource STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSource PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowNullTopicWhenAddingSink STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowNullTopicWhenAddingSink PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldAddSourceWithOffsetReset STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldAddSourceWithOffsetReset PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddStateStoreWithSource STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddStateStoreWithSource PASSED

org.apache.k

[DISCUSS] KIP-356: Add KafkaConsumer fetch-error-rate and fetch-error-total metrics

2018-08-15 Thread Kevin Lu
Hi friends! :)

I believe we currently have a gap in KafkaConsumer metrics for errors since
the KafkaConsumer is complex and are many places where things can go wrong.
Currently, these failures are logged and certain ones can be inferred from
the existing metrics (ex. heartbeat-rate).

This KIP seeks to improve monitoring and alerting for the consumer by
providing metrics for the Fetcher class.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-356%3A+Add+KafkaConsumer+fetch-error-rate+and+fetch-error-total+metrics

There are also a few other places in the Fetcher where errors may happen
(parsing completed fetches, offset requests, etc) but it may be appropriate
to monitor them in separate metrics.

Any thoughts?

Thanks!

Regards,
Kevin