Re: Request to become a contributor

2019-07-29 Thread Manikumar
Hi,

Thanks for your interest. Added you to the contributor list.

Thanks,

On Mon, Jul 29, 2019 at 11:35 AM Amrutha Shanbhag <
amrutha.shanb...@instaclustr.com> wrote:

> Hi team,
>
> Request you to add me to the contributor list.
> JIRA username: amrutha.shanbhag
>
> Thanks,
>
>
> *Amrutha Shanbhag**Senior Software Engineer*
>
>
>    
> 
>
> Read our latest technical blog posts here
> .
>
> This email has been sent on behalf of Instaclustr Pty. Limited (Australia)
> and Instaclustr Inc (USA).
>
> This email and any attachments may contain confidential and legally
> privileged information.  If you are not the intended recipient, do not copy
> or disclose its content, but please reply to this email immediately and
> highlight the error to the sender and then immediately delete the message.
>
> Instaclustr values your privacy. Our privacy policy can be found at
> https://www.instaclustr.com/company/policies/privacy-policy
>


[jira] [Created] (KAFKA-8725) Improve LogCleaner error handling when failing to grab the filthiest log

2019-07-29 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-8725:
--

 Summary: Improve LogCleaner error handling when failing to grab 
the filthiest log
 Key: KAFKA-8725
 URL: https://issues.apache.org/jira/browse/KAFKA-8725
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


https://issues.apache.org/jira/browse/KAFKA-7215 improved error handling in the 
log cleaner with the goal of not having the whole thread die when an exception 
happens, but rather mark the partition that caused it as uncleanable and 
continue cleaning the error-free partitions.

Unfortunately, the current code can still bubble up an exception and cause the 
thread to die when an error happens before we can grab the filthiest log and 
start cleaning it. At that point, we don't have a clear reference to the log 
that caused the exception and chose to throw an IllegalStateException - 
[https://github.com/apache/kafka/blob/39bcc8447c906506d63b8df156cf90174bbb8b78/core/src/main/scala/kafka/log/LogCleaner.scala#L346]
 (as seen in https://issues.apache.org/jira/browse/KAFKA-8724)

Essentially, exceptions in `grabFilthiestCompactedLog` still cause the thread 
to die. This can be further improved by trying to catch what log caused the 
exception in the aforementioned function



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8726) Producer can't abort a transaction aftersome send errors

2019-07-29 Thread Mattia Barbon (JIRA)
Mattia Barbon created KAFKA-8726:


 Summary: Producer can't abort a transaction aftersome send errors
 Key: KAFKA-8726
 URL: https://issues.apache.org/jira/browse/KAFKA-8726
 Project: Kafka
  Issue Type: Bug
  Components: clients, producer 
Affects Versions: 2.3.0
Reporter: Mattia Barbon


I am following the producer with transactions example in 
[https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html,]
 and on kafkaException, I use abortTransaction and retry.

 

In some cases, abortTransaction fails, with:

```

org.apache.kafka.common.KafkaException: Cannot execute transactional method 
because we are in an error state

```

as far as I can tell, this is caused by

```

org.apache.kafka.common.KafkaException: The client hasn't received 
acknowledgment for some previously sent messages and can no longer retry them. 
It isn't safe to continue.
```

 

Since both are KafkaException, the example seems to imply they are retriable, 
but they seem not to be. Ideally, I would expect abortTransaction to succeed in 
this case (the broker will abort the transaction anyway because it can't be 
committed), but at the very least, I would expect to have a way to determine 
that the producer is unusable and it can't recover.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-481: SerDe Improvements for Connect Decimal type in JSON

2019-07-29 Thread Andy Coates
The way I see it, we need to control two seperate things:

1. How do we _deserialize_ a decimal type if we encounter a text node in
the JSON?(We should _always_ be able to deserialize a standard JSON
number as a decimal).
2. How do we chose how we want decimals to be _serialized_.

This looks to fits well with your second suggestion of slightly different
configs names for serialization vs deserialization.
a, For deserialization we only care about how to handle text nodes: `
deserialization.decimal.*text*.format`, which should only have two valid
values BINARY | TEXT.
b. For serialization we need all three: `serialization.decimal.format`,
which should support all three options: BINARY | TEXT | NUMERIC.

Implementation wise, I think these should be two separate enums, rather
than one shared enum and throwing an error if the deserializer is set to
NUMERIC.  Mainly as this means the enums reflect the options available,
rather than this being hidden in config checking code.  But that's a minor
implementation detail.

Personally, I'd be tempted to have the BINARY value named something like
`LEGACY` or `LEGACY_BINARY` as a way of encouraging users to move away from
it.

It's a real shame that both of these settings require a default of BINARY
for backwards compatibility, but I agree that discussions / plans around
switching the defaults should not block this KIP.

Andy


On Thu, 25 Jul 2019 at 18:26, Almog Gavra  wrote:

> Thanks for the replies Andy and Andrew (2x Andy?)!
>
> > Is the text decimal a base16 encoded number, or is it base16 encoded
> binary
> > form of the number?
>
> The conversion happens as decimal.unscaledValue().toByteArray() and then
> the byte array is converted to a hex string, so it's definitely the binary
> form of the number converted to base16. Whether or not that's the same as
> the base16 encoded number is a good question (toByteArray returns a byte
> array containing a signed, big-endian, two's complement representation of
> the big integer).
>
> > One suggestion I have is to change the proposed new config to only affect
> > decimals stored as text, i.e. to switch between the current base16 and
> the
> > more common base10.   Then add another config to the serializer only that
> > controls if decimals should be serialized as text or numeric.
>
> I think we need to be able to handle all mappings from serialization format
> to deserialization format (e.g. read in BINARY and output TEXT), which I
> think would be impossible with the alternative suggestion. I agree that
> automatically deserializing numerics is valuable. I see two other ways to
> get this, both keeping the serialization.format config the same:
>
> - have json.decimal.deserialization.format accept all three formats. if set
> to BINARY/TEXT, numerics would be automatically supported. If set to
> NUMERIC, then any string coming in would result in deserialization error
> (defaults to BINARY for backwards compatibility)
> - change json.decimal.deserialization.format to
> json.decimal.deserialization.string.format which accepts only BINARY/TEXT
> (defaults to BINARY for backwards compatibility)
>
> > would be a breaking change in that things that previously failed would
> > suddenly start deserializing.  This is a price I'm willing to pay.
>
> I agree. I'm willing to pay this price too.
>
> > IMHO, we should then plan to switch the default of decimal serialization
> to
> > numeric, and text serialization to base 10 in the next major release.
>
> I think that can be a separate discussion, I don't want to block this KIP
> on it.
>
> Thoughts?
>
> On Thu, Jul 25, 2019 at 6:35 AM Andrew Otto  wrote:
>
> > This is a bit orthogonal, but in JsonSchemaConverter I use JSONSchemas to
> > indicate whether a JSON number should be deserialized as an integer or a
> > decimal
> > <
> >
> https://github.com/ottomata/kafka-connect-jsonschema/blob/master/src/main/java/org/wikimedia/kafka/connect/jsonschema/JsonSchemaConverter.java#L251-L261
> > >.
> > Not everyone is going to have JSONSchemas available when converting, but
> if
> > you do, it is an easy way to support JSON numbers as decimals.
> >
> > Carry on! :)
> >
> > On Thu, Jul 25, 2019 at 9:12 AM Andy Coates  wrote:
> >
> > > Hi Almog,
> > >
> > > Like the KIP - I think being able to support decimals in JSON in the
> same
> > > way most other systems do is a great improvement.
> > >
> > > It's not 100% clear to me from the KIP what the current format is.  Is
> > the
> > > text decimal a base16 encoded number, or is it base16 encoded binary
> form
> > > of the number? (I've not tried to get my head around if these two are
> > even
> > > different!)
> > >
> > > One suggestion I have is to change the proposed new config to only
> affect
> > > decimals stored as text, i.e. to switch between the current base16 and
> > the
> > > more common base10.   Then add another config to the serialzier only
> that
> > > controls if decimals should be serialized as text or numeric.  The
> > benefit
> 

Re: [DISCUSS] KIP-490: log when consumer groups lose a message because offset has been deleted

2019-07-29 Thread Stanislav Kozlovski
Hey Jose,

Thanks for sharing your use cases.
>From my experience, it is uncommon to run with a retention.ms setting small
enough that it can make you lose messages when your consumers can't catch
up. If you are concerned with data loss, I think the cost investment into
hardware is generally worth it.
I think your use case might benefit from setting `retention.bytes` to
ensure you don't go over a specific size and a higher retention.ms. I
assume that might be more deterministic as it is likely you have a better
idea of how much data these files will be (and can get) rather than how
long they'd take to process.

In any case, I think it's an exception to have to manually configure and
modify retention.ms in real time according to consumer lag. This metric (if
enabled) would be the highest cardinality metric in the server, as it is
per consumer group *and* partition. I know the current proposal suggests we
enable it through a whitelist config, but I think that would be intuitive
to users and I'm not sure if it's a good idea to guard metrics according to
configurations.
In general, I believe we should aim to limit the raw number of metrics
exposed from the broker when there is another way to solve the problem.

I think the metric should go on the broker side, in case the consumer
> is not even be instantiated, or it is crashing in a loop.

We would need *all* consumers in the consumer group to not be available in
order to not have the information exposed. Also, it is generally expected
to have your consumers run all the time (Kafka is a real-time streaming
platform) and batch use cases are the exception.
If they are all crashing in a loop, there is an outage to be solved and you
should increase your retention if there is a chance it deleted unconsumed
data.
Because of the rareness of needing the information in real-time, I still
think having it in the consumer is a good approach.

Let me know if that makes sense.

Thanks,
Stanislav

On Sun, Jul 28, 2019 at 5:00 PM Jose M  wrote:

> Hello,
>
> Thanks for taking the time to review my KIP!
>
> I will describe some production scenarios I faced to better explain
> the reasons for this KIP.
>
> * Usecase 1: batch processing of files.
> A batch is producing huge files that must be processed. Each line of
> the file will be a message produced to a topic. It means the topic
> storing this messages will go from 0 lag to lets say 5 million lag, in
> a few seconds. I will adjust the retention time on the topic based on
> the processing rate on the consumer of this topic. Ex: 5 million
> messages at 100 TPS needs ~14 hours retention time. In practice we set
> up bigger retention time, just in case. If a second file arrives
> before the first one has been processed and the processing ratio is
> slower than I thought, I will lose the end of the first file, without
> notice.
>
> * Usecase 2: application facing network errors.
> The application consumes messages on input topic, process them and
> push them to an external system (ex: webservice). If there are
> connectivity problem between my kafka consumer and the external
> webservice, the lag of the application will grow. As I have alerting
> rules on records-max-lag, I will be aware the backlog of the topic is
> above a limit. I will take action as in the previous example, and I
> will adjust retention time on the topic based on the processing rate.
> If the processing rate is not constant, due to the network
> connectivity problem, the retention time may not be enough and I will
> lose messages.
>
> In both cases, I don't know if Ive lost messages or not. I suspect
> that yes but I can not give an accurate number of messages lost, or
> guarantee I have not lost any of them.
>
> I could solve both use cases setting up oversized retention time for
> the topics, but in practice I'm limited by the hardware resources.
>
> One of the reasons Ive opened this KIP is because I think the
> implementation should be doable. The broker has all the information
> needed (expired offset and last consumed offset). Though I have
> questions about the impact on the performance, that's why I hesitate
> to propose this new metric as default for all consumers, or only to
> the consumers that request it through configuration.
>
> As you said, the motivation is to know when (and which), and how many
> messages a consumer have missed because they have been deleted. I
> think it is possible to return the exact amount of messages missed due
> to retention policy.
>
> I think the metric should go on the broker side, in case the consumer
> is not even be instantiated, or it is crashing in a loop.
>
> Please let me know what do you think.
>
>
> Thanks,
> Jose M
>
>
>
> On Sat, Jul 27, 2019 at 4:22 PM Stanislav Kozlovski
>  wrote:
> >
> > Hey Jose,
> >
> > Thanks for the KIP.
> >
> > I think that Colin was referring to an existing client metric called
> >
> "kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",name=records-lag-max"

Re: [VOTE] KIP-467: Augment ProduceResponse error messaging

2019-07-29 Thread Bill Bejeck
Hi Guozang,

Thanks for the KIP, looks like a great improvement.  +1 for me.

-Bill

On Fri, Jul 26, 2019 at 8:18 PM Jun Rao  wrote:

> Hi, Guozhang,
>
> Thanks for the KIP. +1
>
> Jun
>
> On Fri, Jul 26, 2019 at 4:40 PM Jason Gustafson 
> wrote:
>
> > Hi Guozhang,
> >
> > I agree it is misleading to suggest corruption in these cases. Have you
> > considered alternative error codes? I think INVALID_REQUEST may be more
> > suggestive that the server has rejected the request for some reason.
> >
> > In any case, it's a small point that need not block the KIP. I'm +1
> > overall.
> >
> > Thanks,
> > Jason
> >
> > On Fri, Jul 26, 2019 at 4:24 PM Guozhang Wang 
> wrote:
> >
> > > Hi Jason, thanks for the comments.
> > >
> > > 1. Yes that's a good point. Will move it to `errors`.
> > >
> > > 2. The point is that when broker returning the new error code
> > > INVALID_RECORD to the old versioned clients who do not recognize the
> > code,
> > > it would be translated to a UnknownServerException, whereas today
> > (without
> > > this KIP) the client would see CorruptRecordException that covers a
> bunch
> > > of scenarios that actually are not related to corrupted records at all.
> > >
> > > I feel that the new behavior is actually better, i.e. let clients
> report
> > an
> > > UnknownServerException rather than a more concrete, but incorrect
> > > CorruptRecordException. If we want to maintain compatibility we can let
> > > brokers to return the same error code to old versioned clients, but I'm
> > not
> > > sure if it is actually better.
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Jul 25, 2019 at 5:08 PM Jason Gustafson 
> > > wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > The proposal looks good. A couple minor questions.
> > > >
> > > > 1. InvalidRecordException is currently located in
> > > > `org.apache.kafka.common.record`, which is not a public package.
> Shall
> > we
> > > > move it to `org.apache.kafka.common.errors`?
> > > > 2. I'm not sure I understand the point about UnknownServerException
> in
> > > the
> > > > compatibility section. Are you suggesting that we would use the new
> > error
> > > > code even for old versions of the produce request?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Jul 16, 2019 at 3:46 PM Guozhang Wang 
> > > wrote:
> > > >
> > > > > Hello folks,
> > > > >
> > > > > I'd like to start a voting thread on KIP-467 to improve error
> > > > communication
> > > > > and handling for producer response:
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-467
> > > > >
> > >
> %3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


Kafka Group Coordinator Failover

2019-07-29 Thread Nakul Goyal
Hello There,

I am using Kafka and Zookeeper in a cluster setup, wherein having two nodes
of Kafka and three nodes of Zookeeper with my clustered Spring Boot
Application.

The issue I am facing is when I stop both the Kafka nodes and starts one of
them (which is not a group coordinator), the consumers sitting at my Spring
Boot Application stops receiving the messages in the queue.

Spring Kafka Version - 2.1.9.RELEASE
Kafka Version - 2.11-2.1.0
Zookeeper Version - 3.4.13

Please help or suggest a way out of it.

Regards,
Nakul Goyal
8949552129


[jira] [Created] (KAFKA-8727) Control over standby tasks host assignment

2019-07-29 Thread Levani Kokhreidze (JIRA)
Levani Kokhreidze created KAFKA-8727:


 Summary: Control over standby tasks host assignment
 Key: KAFKA-8727
 URL: https://issues.apache.org/jira/browse/KAFKA-8727
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Levani Kokhreidze


*Motivation*

As of now, Kafka Streams user has no control over to which host Kafka Streams 
application will create standby task. In production deployments (especially in 
Kubernetes) it's quite common to have multiple instances of the same Kafka 
Streams application deployed across more than one "cluster" in order to have 
high availability of the system.

For example, if we have 6 Kafka Streams instances deployed across two clusters, 
we'll get 3 Kafka Streams instances per cluster. With the current 
implementation, Kafka Streams application may create "standby task" in the same 
cluster as the active task which, is not the most optimal solution, since, in 
case of cluster failure recovery time will be much bigger. This is especially 
problematic for Kafka Streams application that manages large state.

 

*Possible Solution*

**It would be great if in the Kafka Streams configuration we could have a 
possibility to inject dynamic environment variables and use that environment 
variables to control over where standby task should be created.

For example, suppose I have active task *1_1* with environment variable: 
*CLUSTER_ID: main01* then stnadby task for *1_1* should be created where 
*CLUSTER_ID* *!=* *main01*



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8728) Flaky Test KTableSourceTopicRestartIntegrationTest #shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled

2019-07-29 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-8728:
--

 Summary: Flaky Test KTableSourceTopicRestartIntegrationTest 
#shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled
 Key: KAFKA-8728
 URL: https://issues.apache.org/jira/browse/KAFKA-8728
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Affects Versions: 2.4.0
Reporter: Matthias J. Sax


[https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk11/detail/kafka-trunk-jdk11/719/tests]
{quote}java.lang.AssertionError: Condition not met within timeout 3. Table 
did not read all values
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)
at 
org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.assertNumberValuesRead(KTableSourceTopicRestartIntegrationTest.java:187)
at 
org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled(KTableSourceTopicRestartIntegrationTest.java:113){quote}
STDOUT
{quote}[2019-07-29 04:08:45,009] ERROR [Controller id=2 epoch=3] Controller 2 
epoch 3 failed to change state for partition __transaction_state-23 from 
OnlinePartition to OnlinePartition (state.change.logger:76)
kafka.common.StateChangeFailedException: Failed to elect leader for partition 
__transaction_state-23 under strategy 
ControlledShutdownPartitionLeaderElectionStrategy
at 
kafka.controller.ZkPartitionStateMachine.$anonfun$doElectLeaderForPartitions$7(PartitionStateMachine.scala:424)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at 
kafka.controller.ZkPartitionStateMachine.doElectLeaderForPartitions(PartitionStateMachine.scala:421)
at 
kafka.controller.ZkPartitionStateMachine.electLeaderForPartitions(PartitionStateMachine.scala:335)
at 
kafka.controller.ZkPartitionStateMachine.doHandleStateChanges(PartitionStateMachine.scala:236)
at 
kafka.controller.ZkPartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:157)
at 
kafka.controller.KafkaController.doControlledShutdown(KafkaController.scala:1091)
at 
kafka.controller.KafkaController.$anonfun$processControlledShutdown$1(KafkaController.scala:1053)
at scala.util.Try$.apply(Try.scala:213)
at 
kafka.controller.KafkaController.processControlledShutdown(KafkaController.scala:1053)
at kafka.controller.KafkaController.process(KafkaController.scala:1624)
at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53)
at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89){quote}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-496: Administrative API to delete consumer offsets

2019-07-29 Thread Guozhang Wang
Thanks for the replies Jason!

2. No I do not see any problems, just trying to understand how restrict we
are applying this rule :) Piggy-backing on the existing background thread
and check interval mechanism means we are not "eagerly" expiring either,
but I think this is fine.


Guozhang


On Thu, Jul 25, 2019 at 3:16 PM Jason Gustafson  wrote:

> 1. Fixed, thanks!
>
> 2. Yes, that is what I was thinking. Do you see any problems?
>
> 3. Good point. Do you think a meter for expired and deleted offsets would
> be sufficient?
>
> 4. I considered it. I thought that might be a little dangerous for dynamic
> groups which have subscriptions changing. If the first member to discover a
> subscription change falls out, then offsets would be lost. Also, it seemed
> a little more consistent with empty group expiration. From an offset
> expiration perspective, an empty group is just treated as a case where the
> subscription is empty, which makes all offsets subject to expiration.
>
>
> Thanks,
> Jason
>
> On Thu, Jul 25, 2019 at 1:41 PM Guozhang Wang  wrote:
>
> > Hi Jason,
> >
> > Thanks for the KIP! I've made a pass on it and here are few comments:
> >
> > 1. " before the clients which ." --> incomplete sentence?
> >
> > 2. " Any committed offset for a partition which is not currently
> subscribed
> > to is subject to expiration." --> this may be an implementation detail,
> but
> > are we going to piggy-back on the same offsetsRetentionCheckIntervalMs to
> > check for expirable offsets as well?
> >
> > Some meta comment:
> >
> > 3. Looking into the current broker-side metrics, we do not have a good
> user
> > visibility yet for offset removal either due to expiration or deletion,
> > maybe we should consider adding one?
> >
> > 4. Playing the devil's advocate here: for cases where deleting expirable
> > offsets is needed (like you mentioned lag monitoring), should we just
> > by-pass the offset retention ms (by default it's one day) and remove
> > immediately? What scenarios would require those non-subscribed partition
> > offsets to be retained longer?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Jul 23, 2019 at 10:11 AM Jason Gustafson 
> > wrote:
> >
> > > Hi All,
> > >
> > > I have a short KIP to add an api for consumer offset deletion:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets
> > > .
> > > Please take a look and let me know what you think.
> > >
> > > Thanks,
> > > Jason
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-497: Add inter-broker API to alter ISR

2019-07-29 Thread Guozhang Wang
Hi Jason,

Thanks for the KIP. It looks good to me overall.

1. Just clarifying the "CurrentVersion" field in the newly proposed
protocol. Does it contains the same value as zkVersion that controller get
from ZK?

2. As for the comment in the KIP: "We can either await the update or we can
send a new update with the current version. If we did the latter, then we
would have multiple pending inflights using the same version." My
understanding is that it is the controller who acts as the source of truth
for "currentVersion", in which case I think there's little latency benefits
to send multiple pending requests with the same version, since which-ever
arrives controller first would cause the zkVersion to be bumped and
therefore the rest of the requests would be rejected with
"INVALID_ISR_VERSION". So I'd favor just wait the update response from the
current inflight request before sending out the next request -- admittedly
this requires a bit more complicated implementation on the brokers, but
maybe we can generalize the request queue module on controller for this
purpose?


Guozhang


On Sun, Jul 28, 2019 at 10:32 AM Colin McCabe  wrote:

> Hi Jason,
>
> This looks good.
>
> If the AlterIsr request returns a higher ZK version than the one the
> broker currently has, will the broker use that as its new ZK version?  I
> suppose this could happen if some of the updates the controller pushed out
> were not received or not received yet by the broker in question.
>
> best,
> Colin
>
>
> On Fri, Jul 26, 2019, at 09:43, Jason Gustafson wrote:
> > Hi All,
> >
> > I have written a proposal to change the way leaders make ISR
> > modifications:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-497%3A+Add+inter-broker+API+to+alter+ISR
> .
> > Have a look and let me know what you think.
> >
> > Thanks,
> > Jason
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-481: SerDe Improvements for Connect Decimal type in JSON

2019-07-29 Thread Almog Gavra
I'm mostly happy with your current suggestion (two configs, one for
serialization and one for deserialization) and your implementation
suggestion. One thing to note:

> We should _always_ be able to deserialize a standard JSON
> number as a decimal

I was doing some research into decimals and JSON, and I can imagine a
compelling reason to require string representations to avoid losing
precision and to be certain that whomever is sending the data isn't losing
precision (e.g. https://stackoverflow.com/a/38357877/2258040).

I'm okay with always allowing numerics, but thought it's worth raising the
thought.

On Mon, Jul 29, 2019 at 4:57 AM Andy Coates  wrote:

> The way I see it, we need to control two seperate things:
>
> 1. How do we _deserialize_ a decimal type if we encounter a text node in
> the JSON?(We should _always_ be able to deserialize a standard JSON
> number as a decimal).
> 2. How do we chose how we want decimals to be _serialized_.
>
> This looks to fits well with your second suggestion of slightly different
> configs names for serialization vs deserialization.
> a, For deserialization we only care about how to handle text nodes: `
> deserialization.decimal.*text*.format`, which should only have two valid
> values BINARY | TEXT.
> b. For serialization we need all three: `serialization.decimal.format`,
> which should support all three options: BINARY | TEXT | NUMERIC.
>
> Implementation wise, I think these should be two separate enums, rather
> than one shared enum and throwing an error if the deserializer is set to
> NUMERIC.  Mainly as this means the enums reflect the options available,
> rather than this being hidden in config checking code.  But that's a minor
> implementation detail.
>
> Personally, I'd be tempted to have the BINARY value named something like
> `LEGACY` or `LEGACY_BINARY` as a way of encouraging users to move away from
> it.
>
> It's a real shame that both of these settings require a default of BINARY
> for backwards compatibility, but I agree that discussions / plans around
> switching the defaults should not block this KIP.
>
> Andy
>
>
> On Thu, 25 Jul 2019 at 18:26, Almog Gavra  wrote:
>
> > Thanks for the replies Andy and Andrew (2x Andy?)!
> >
> > > Is the text decimal a base16 encoded number, or is it base16 encoded
> > binary
> > > form of the number?
> >
> > The conversion happens as decimal.unscaledValue().toByteArray() and then
> > the byte array is converted to a hex string, so it's definitely the
> binary
> > form of the number converted to base16. Whether or not that's the same as
> > the base16 encoded number is a good question (toByteArray returns a byte
> > array containing a signed, big-endian, two's complement representation of
> > the big integer).
> >
> > > One suggestion I have is to change the proposed new config to only
> affect
> > > decimals stored as text, i.e. to switch between the current base16 and
> > the
> > > more common base10.   Then add another config to the serializer only
> that
> > > controls if decimals should be serialized as text or numeric.
> >
> > I think we need to be able to handle all mappings from serialization
> format
> > to deserialization format (e.g. read in BINARY and output TEXT), which I
> > think would be impossible with the alternative suggestion. I agree that
> > automatically deserializing numerics is valuable. I see two other ways to
> > get this, both keeping the serialization.format config the same:
> >
> > - have json.decimal.deserialization.format accept all three formats. if
> set
> > to BINARY/TEXT, numerics would be automatically supported. If set to
> > NUMERIC, then any string coming in would result in deserialization error
> > (defaults to BINARY for backwards compatibility)
> > - change json.decimal.deserialization.format to
> > json.decimal.deserialization.string.format which accepts only BINARY/TEXT
> > (defaults to BINARY for backwards compatibility)
> >
> > > would be a breaking change in that things that previously failed would
> > > suddenly start deserializing.  This is a price I'm willing to pay.
> >
> > I agree. I'm willing to pay this price too.
> >
> > > IMHO, we should then plan to switch the default of decimal
> serialization
> > to
> > > numeric, and text serialization to base 10 in the next major release.
> >
> > I think that can be a separate discussion, I don't want to block this KIP
> > on it.
> >
> > Thoughts?
> >
> > On Thu, Jul 25, 2019 at 6:35 AM Andrew Otto  wrote:
> >
> > > This is a bit orthogonal, but in JsonSchemaConverter I use JSONSchemas
> to
> > > indicate whether a JSON number should be deserialized as an integer or
> a
> > > decimal
> > > <
> > >
> >
> https://github.com/ottomata/kafka-connect-jsonschema/blob/master/src/main/java/org/wikimedia/kafka/connect/jsonschema/JsonSchemaConverter.java#L251-L261
> > > >.
> > > Not everyone is going to have JSONSchemas available when converting,
> but
> > if
> > > you do, it is an easy way to support JSON numbers as decimals.
>

Re: [VOTE] KIP-467: Augment ProduceResponse error messaging

2019-07-29 Thread Guozhang Wang
Yeah my thinking is that changing the return error code away from
CORRUPTED_RECORD is really a bug fix, so we should just do it anyways
without considering compatibility. I like returning INVALID_REQUEST too,
would change it in the wiki.

Guozhang

On Fri, Jul 26, 2019 at 4:40 PM Jason Gustafson  wrote:

> Hi Guozhang,
>
> I agree it is misleading to suggest corruption in these cases. Have you
> considered alternative error codes? I think INVALID_REQUEST may be more
> suggestive that the server has rejected the request for some reason.
>
> In any case, it's a small point that need not block the KIP. I'm +1
> overall.
>
> Thanks,
> Jason
>
> On Fri, Jul 26, 2019 at 4:24 PM Guozhang Wang  wrote:
>
> > Hi Jason, thanks for the comments.
> >
> > 1. Yes that's a good point. Will move it to `errors`.
> >
> > 2. The point is that when broker returning the new error code
> > INVALID_RECORD to the old versioned clients who do not recognize the
> code,
> > it would be translated to a UnknownServerException, whereas today
> (without
> > this KIP) the client would see CorruptRecordException that covers a bunch
> > of scenarios that actually are not related to corrupted records at all.
> >
> > I feel that the new behavior is actually better, i.e. let clients report
> an
> > UnknownServerException rather than a more concrete, but incorrect
> > CorruptRecordException. If we want to maintain compatibility we can let
> > brokers to return the same error code to old versioned clients, but I'm
> not
> > sure if it is actually better.
> >
> >
> > Guozhang
> >
> > On Thu, Jul 25, 2019 at 5:08 PM Jason Gustafson 
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > The proposal looks good. A couple minor questions.
> > >
> > > 1. InvalidRecordException is currently located in
> > > `org.apache.kafka.common.record`, which is not a public package. Shall
> we
> > > move it to `org.apache.kafka.common.errors`?
> > > 2. I'm not sure I understand the point about UnknownServerException in
> > the
> > > compatibility section. Are you suggesting that we would use the new
> error
> > > code even for old versions of the produce request?
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > >
> > >
> > > On Tue, Jul 16, 2019 at 3:46 PM Guozhang Wang 
> > wrote:
> > >
> > > > Hello folks,
> > > >
> > > > I'd like to start a voting thread on KIP-467 to improve error
> > > communication
> > > > and handling for producer response:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-467
> > > >
> > %3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: [VOTE] KIP-467: Augment ProduceResponse error messaging

2019-07-29 Thread Guozhang Wang
+1 from myself as well (binding).


I'm closing this vote thread with the following votes:

binding +1s: 4 (Guozhang, Jun, Jason, Bill).


Thanks everyone who've reviewed and voted on the KIP!

Guozhang

On Mon, Jul 29, 2019 at 9:30 AM Guozhang Wang  wrote:

> Yeah my thinking is that changing the return error code away from
> CORRUPTED_RECORD is really a bug fix, so we should just do it anyways
> without considering compatibility. I like returning INVALID_REQUEST too,
> would change it in the wiki.
>
> Guozhang
>
> On Fri, Jul 26, 2019 at 4:40 PM Jason Gustafson 
> wrote:
>
>> Hi Guozhang,
>>
>> I agree it is misleading to suggest corruption in these cases. Have you
>> considered alternative error codes? I think INVALID_REQUEST may be more
>> suggestive that the server has rejected the request for some reason.
>>
>> In any case, it's a small point that need not block the KIP. I'm +1
>> overall.
>>
>> Thanks,
>> Jason
>>
>> On Fri, Jul 26, 2019 at 4:24 PM Guozhang Wang  wrote:
>>
>> > Hi Jason, thanks for the comments.
>> >
>> > 1. Yes that's a good point. Will move it to `errors`.
>> >
>> > 2. The point is that when broker returning the new error code
>> > INVALID_RECORD to the old versioned clients who do not recognize the
>> code,
>> > it would be translated to a UnknownServerException, whereas today
>> (without
>> > this KIP) the client would see CorruptRecordException that covers a
>> bunch
>> > of scenarios that actually are not related to corrupted records at all.
>> >
>> > I feel that the new behavior is actually better, i.e. let clients
>> report an
>> > UnknownServerException rather than a more concrete, but incorrect
>> > CorruptRecordException. If we want to maintain compatibility we can let
>> > brokers to return the same error code to old versioned clients, but I'm
>> not
>> > sure if it is actually better.
>> >
>> >
>> > Guozhang
>> >
>> > On Thu, Jul 25, 2019 at 5:08 PM Jason Gustafson 
>> > wrote:
>> >
>> > > Hi Guozhang,
>> > >
>> > > The proposal looks good. A couple minor questions.
>> > >
>> > > 1. InvalidRecordException is currently located in
>> > > `org.apache.kafka.common.record`, which is not a public package.
>> Shall we
>> > > move it to `org.apache.kafka.common.errors`?
>> > > 2. I'm not sure I understand the point about UnknownServerException in
>> > the
>> > > compatibility section. Are you suggesting that we would use the new
>> error
>> > > code even for old versions of the produce request?
>> > >
>> > > Thanks,
>> > > Jason
>> > >
>> > >
>> > >
>> > >
>> > > On Tue, Jul 16, 2019 at 3:46 PM Guozhang Wang 
>> > wrote:
>> > >
>> > > > Hello folks,
>> > > >
>> > > > I'd like to start a voting thread on KIP-467 to improve error
>> > > communication
>> > > > and handling for producer response:
>> > > >
>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-467
>> > > >
>> > %3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records
>> > > >
>> > > >
>> > > > Thanks,
>> > > >
>> > > > -- Guozhang
>> > > >
>> > >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
> --
> -- Guozhang
>


-- 
-- Guozhang


[jira] [Created] (KAFKA-8729) Augment ProduceResponse error messaging for specific culprit records

2019-07-29 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-8729:


 Summary: Augment ProduceResponse error messaging for specific 
culprit records
 Key: KAFKA-8729
 URL: https://issues.apache.org/jira/browse/KAFKA-8729
 Project: Kafka
  Issue Type: Bug
  Components: clients, core
Reporter: Guozhang Wang
Assignee: Tu Tran


1. We should replace the misleading CORRUPT_RECORD error code with a new 
INVALID_RECORD.

2. We should augment the ProduceResponse with customizable error message and 
indicators of culprit records.

3. We should change the client-side handling logic of non-retriable 
INVALID_RECORD to re-batch the records.

Details see: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-467%3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-487: Automatic Topic Creation on Producer

2019-07-29 Thread Mickael Maison
Hi Justine,

Thanks for the KIP!

In case auto creation is enabled on both the client and server, will
the producer still use the AdminClient (CreateTopics request) to
create topics? and not rely on the broker auto create.
I'm guessing the answer is yes but can you make it explicit.

Thank you

On Wed, Jul 24, 2019 at 6:23 PM Justine Olshan  wrote:
>
> Hi,
> Just a friendly reminder to take a look at this KIP if you have the time.
>
> I was thinking about broker vs. client default precedence, and I think it
> makes sense to keep the broker as the default used when both client-side
> and broker-side defaults are configured. The idea is that there would be
> pretty clear documentation, and that many systems with configurations that
> the client could not change would likely have the auto-create default off.
> (In cloud for example).
>
> It also seems like in most cases, the consumer config
> 'allow.auto.create.topics' was created to actually prevent the creation of
> topics, so the loss of creation functionality will not be a big problem.
>
>  I'm happy to discuss any other compatibility problems or components of
> this KIP.
>
> Thank you,
> Justine
>
> On Wed, Jul 17, 2019 at 9:11 AM Justine Olshan  wrote:
>
> > Hello all,
> >
> > I was looking at this KIP again, and there is a decision I made that I
> > think is worth discussing.
> >
> > In the case where both the broker and producer's
> > 'auto.create.topics.enable' are set to true, we have to choose either the
> > broker configs or the producer configs for the replication
> > factor/partitions.
> >
> > Currently, the decision is to have the broker defaults take precedence.
> > (It is easier to do this in the implementation.) It also makes some sense
> > for this behavior to take precedence since this behavior already occurs as
> > the default.
> >
> > However, I was wondering if it would be odd for those who can only see the
> > producer side to set configs for replication factor/partitions and see
> > different results. Currently the documentation for the config states that
> > the config values are only used when the broker config is not enabled, but
> > this might not always be clear to the user.  Changing the code to have the
> > producer's configurations take precedence is possible, but I was wondering
> > what everyone thought.
> >
> > Thank you,
> > Justine
> >
> > On Fri, Jul 12, 2019 at 2:49 PM Justine Olshan 
> > wrote:
> >
> >> Just a quick update--
> >>
> >> It seems that enabling both the broker and producer configs works fine,
> >> except that the broker configurations for partitions, replication factor
> >> take precedence.
> >> I don't know if that is something we would want to change, but I'll be
> >> updating the KIP for now to reflect this. Perhaps we would want to add more
> >> to the documentation of the the producer configs to clarify.
> >>
> >> Thank you,
> >> Justine
> >>
> >> On Fri, Jul 12, 2019 at 9:28 AM Justine Olshan 
> >> wrote:
> >>
> >>> Hi Colin,
> >>>
> >>> Thanks for looking at the KIP. I can definitely add to the title to make
> >>> it more clear.
> >>>
> >>> It makes sense that both configurations could be turned on since there
> >>> are many cases where the user can not control the server-side
> >>> configurations. I was a little concerned about how both interacting would
> >>> work out -- if there would be to many requests for new topics, for 
> >>> example.
> >>> But it since it does make sense to allow both configurations enabled, I
> >>> will test out some scenarios and I'll change the KIP to support this.
> >>>
> >>> I also agree with documentation about distinguishing the differences. I
> >>> was having some trouble with the wording but I like the phrases
> >>> "server-side" and "client-side." That's a good distinction I can use when
> >>> describing.
> >>>
> >>> I'll try to update the KIP soon keeping everyone's input in mind.
> >>>
> >>> Thanks,
> >>> Justine
> >>>
> >>> On Thu, Jul 11, 2019 at 5:39 PM Colin McCabe  wrote:
> >>>
>  Hi Justine,
> 
>  Thanks for the KIP.  This seems like a good step towards removing
>  server-side topic auto-creation.
> 
>  We should add included "client-side" to the title of the KIP somewhere,
>  to make it clear that we're talking about client-side auto creation.
> 
>  The KIP says:
>  > In order to automatically create topics with the producer, the
>  producer's
>  > auto.create.topics.enable config must be set to true and the broker
>  config should be set to false
> 
>  From a user's point of view, this seems counter-intuitive.  In order to
>  auto-create topics the broker's auto.create.topics.enable config should 
>  be
>  set to false?  It seems like the server-side auto-create is unrelated to
>  the client-side auto-create.  We could have both turned on (and I'm sure
>  that in the real world, people will try this configuration...)  There's 
>  no
>  reason not to support this, I th

Re: [DISCUSS] KIP-487: Automatic Topic Creation on Producer

2019-07-29 Thread Justine Olshan
Currently the way it is implemented, the broker auto-creation configuration
takes precedence. The producer will not use the CreateTopics request.
(Technically it can--but the topic will already be created through the
broker, so it will never try to create the topic.)
It is possible to change this however, and I'd be happy to discuss the
benefits of this alternative.

Thank you,
Justine

On Mon, Jul 29, 2019 at 10:26 AM Mickael Maison 
wrote:

> Hi Justine,
>
> Thanks for the KIP!
>
> In case auto creation is enabled on both the client and server, will
> the producer still use the AdminClient (CreateTopics request) to
> create topics? and not rely on the broker auto create.
> I'm guessing the answer is yes but can you make it explicit.
>
> Thank you
>
> On Wed, Jul 24, 2019 at 6:23 PM Justine Olshan 
> wrote:
> >
> > Hi,
> > Just a friendly reminder to take a look at this KIP if you have the time.
> >
> > I was thinking about broker vs. client default precedence, and I think it
> > makes sense to keep the broker as the default used when both client-side
> > and broker-side defaults are configured. The idea is that there would be
> > pretty clear documentation, and that many systems with configurations
> that
> > the client could not change would likely have the auto-create default
> off.
> > (In cloud for example).
> >
> > It also seems like in most cases, the consumer config
> > 'allow.auto.create.topics' was created to actually prevent the creation
> of
> > topics, so the loss of creation functionality will not be a big problem.
> >
> >  I'm happy to discuss any other compatibility problems or components of
> > this KIP.
> >
> > Thank you,
> > Justine
> >
> > On Wed, Jul 17, 2019 at 9:11 AM Justine Olshan 
> wrote:
> >
> > > Hello all,
> > >
> > > I was looking at this KIP again, and there is a decision I made that I
> > > think is worth discussing.
> > >
> > > In the case where both the broker and producer's
> > > 'auto.create.topics.enable' are set to true, we have to choose either
> the
> > > broker configs or the producer configs for the replication
> > > factor/partitions.
> > >
> > > Currently, the decision is to have the broker defaults take precedence.
> > > (It is easier to do this in the implementation.) It also makes some
> sense
> > > for this behavior to take precedence since this behavior already
> occurs as
> > > the default.
> > >
> > > However, I was wondering if it would be odd for those who can only see
> the
> > > producer side to set configs for replication factor/partitions and see
> > > different results. Currently the documentation for the config states
> that
> > > the config values are only used when the broker config is not enabled,
> but
> > > this might not always be clear to the user.  Changing the code to have
> the
> > > producer's configurations take precedence is possible, but I was
> wondering
> > > what everyone thought.
> > >
> > > Thank you,
> > > Justine
> > >
> > > On Fri, Jul 12, 2019 at 2:49 PM Justine Olshan 
> > > wrote:
> > >
> > >> Just a quick update--
> > >>
> > >> It seems that enabling both the broker and producer configs works
> fine,
> > >> except that the broker configurations for partitions, replication
> factor
> > >> take precedence.
> > >> I don't know if that is something we would want to change, but I'll be
> > >> updating the KIP for now to reflect this. Perhaps we would want to
> add more
> > >> to the documentation of the the producer configs to clarify.
> > >>
> > >> Thank you,
> > >> Justine
> > >>
> > >> On Fri, Jul 12, 2019 at 9:28 AM Justine Olshan 
> > >> wrote:
> > >>
> > >>> Hi Colin,
> > >>>
> > >>> Thanks for looking at the KIP. I can definitely add to the title to
> make
> > >>> it more clear.
> > >>>
> > >>> It makes sense that both configurations could be turned on since
> there
> > >>> are many cases where the user can not control the server-side
> > >>> configurations. I was a little concerned about how both interacting
> would
> > >>> work out -- if there would be to many requests for new topics, for
> example.
> > >>> But it since it does make sense to allow both configurations
> enabled, I
> > >>> will test out some scenarios and I'll change the KIP to support this.
> > >>>
> > >>> I also agree with documentation about distinguishing the
> differences. I
> > >>> was having some trouble with the wording but I like the phrases
> > >>> "server-side" and "client-side." That's a good distinction I can use
> when
> > >>> describing.
> > >>>
> > >>> I'll try to update the KIP soon keeping everyone's input in mind.
> > >>>
> > >>> Thanks,
> > >>> Justine
> > >>>
> > >>> On Thu, Jul 11, 2019 at 5:39 PM Colin McCabe 
> wrote:
> > >>>
> >  Hi Justine,
> > 
> >  Thanks for the KIP.  This seems like a good step towards removing
> >  server-side topic auto-creation.
> > 
> >  We should add included "client-side" to the title of the KIP
> somewhere,
> >  to make it clear that we're talking about client-side auto c

[jira] [Resolved] (KAFKA-8727) Control over standby tasks host assignment

2019-07-29 Thread Levani Kokhreidze (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze resolved KAFKA-8727.
--
Resolution: Duplicate

 Duplicate of KAFKA-6718

> Control over standby tasks host assignment
> --
>
> Key: KAFKA-8727
> URL: https://issues.apache.org/jira/browse/KAFKA-8727
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Minor
>
> *Motivation*
> As of now, Kafka Streams user has no control over to which host Kafka Streams 
> application will create standby task. In production deployments (especially 
> in Kubernetes) it's quite common to have multiple instances of the same Kafka 
> Streams application deployed across more than one "cluster" in order to have 
> high availability of the system.
> For example, if we have 6 Kafka Streams instances deployed across two 
> clusters, we'll get 3 Kafka Streams instances per cluster. With the current 
> implementation, Kafka Streams application may create "standby task" in the 
> same cluster as the active task. This is not the most optimal solution, 
> since, in case of cluster failure recovery time will be much bigger. This is 
> especially problematic for Kafka Streams application that manages large state.
>  
> *Possible Solution*
> It would be great if in the Kafka Streams configuration we could have a 
> possibility to inject dynamic environment variables and use that environment 
> variables to control over where standby task should be created.
> For example, suppose I have active task *1_1* with environment variable: 
> *CLUSTER_ID: main01* then stnadby task for *1_1* should be created where 
> *CLUSTER_ID* *!=* *main01*



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-497: Add inter-broker API to alter ISR

2019-07-29 Thread Jason Gustafson
@Colin

Yeah, that's a good question. If the version is higher, I think it's not
safe for the leader to use the new version until it has received the full
LeaderAndIsr state. For example, there may be a reassignment in progress
which could alter the leader's expected state change. At least knowing
about the higher version saves the leader unnecessary retries and will be
useful for debugging missed LeaderAndIsr updates. I considered letting the
AlterIsr response include the full leader and ISR state so that the leader
could continue without relying on the update. However, I think this would
just tend to mask bugs in the controller's propagation logic. It seems
simpler to have one mechanism for propagation of leader and ISR state
rather than two.

@Guozhang

1. Yes, it is the same version field used in LeaderAndIsr requests. I will
make this explicit.
2. I think both options can work. The main thing I'm trying to avoid is
having the leader blocking on an ISR update. At some point, if the leader
doesn't receive an expected LeaderAndIsr update, then it must retry the
AlterIsr request. I thought it would be simpler if retries always reflected
the latest expectation on the ISR state rather than letting the leader be
stuck retrying a state change which may no longer be relevant. This is the
approach that I modeled. That being said, if it's ok with you, I'd prefer
to leave this decision to the implementation. I think the main point is
that once the leader receives the latest update, it can discard any pending
state.


Thanks,
Jason



On Mon, Jul 29, 2019 at 9:22 AM Guozhang Wang  wrote:

> Hi Jason,
>
> Thanks for the KIP. It looks good to me overall.
>
> 1. Just clarifying the "CurrentVersion" field in the newly proposed
> protocol. Does it contains the same value as zkVersion that controller get
> from ZK?
>
> 2. As for the comment in the KIP: "We can either await the update or we can
> send a new update with the current version. If we did the latter, then we
> would have multiple pending inflights using the same version." My
> understanding is that it is the controller who acts as the source of truth
> for "currentVersion", in which case I think there's little latency benefits
> to send multiple pending requests with the same version, since which-ever
> arrives controller first would cause the zkVersion to be bumped and
> therefore the rest of the requests would be rejected with
> "INVALID_ISR_VERSION". So I'd favor just wait the update response from the
> current inflight request before sending out the next request -- admittedly
> this requires a bit more complicated implementation on the brokers, but
> maybe we can generalize the request queue module on controller for this
> purpose?
>
>
> Guozhang
>
>
> On Sun, Jul 28, 2019 at 10:32 AM Colin McCabe  wrote:
>
> > Hi Jason,
> >
> > This looks good.
> >
> > If the AlterIsr request returns a higher ZK version than the one the
> > broker currently has, will the broker use that as its new ZK version?  I
> > suppose this could happen if some of the updates the controller pushed
> out
> > were not received or not received yet by the broker in question.
> >
> > best,
> > Colin
> >
> >
> > On Fri, Jul 26, 2019, at 09:43, Jason Gustafson wrote:
> > > Hi All,
> > >
> > > I have written a proposal to change the way leaders make ISR
> > > modifications:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-497%3A+Add+inter-broker+API+to+alter+ISR
> > .
> > > Have a look and let me know what you think.
> > >
> > > Thanks,
> > > Jason
> > >
> >
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2019-07-29 Thread Bill Bejeck
Thanks for the KIP.

+1 (binding)

-Bill

On Wed, Jul 24, 2019 at 12:12 PM Guozhang Wang  wrote:

> Yeah I think I agree with you.
>
> +1 (binding) from me.
>
>
> Guozhang
>
>
> On Wed, Jul 24, 2019 at 7:43 AM John Roesler  wrote:
>
> > Hi Guozhang,
> >
> > Thanks! I just replied in the discuss thread. I agree with what you're
> > proposing, but would like to consider it outside the scope of this KIP,
> if
> > that's ok with you.
> >
> > -John
> >
> > On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang 
> wrote:
> >
> > > Hi John,
> > >
> > > I left another question regarding Transformer in the DISCUSS thread.
> > Other
> > > than that I think this KIP is ready. Thanks!
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Jul 16, 2019 at 9:01 AM John Roesler 
> wrote:
> > >
> > > > Hi Dev,
> > > >
> > > > After a good discussion, I'd like to start the vote for KIP-478
> > > > (https://cwiki.apache.org/confluence/x/2SkLBw).
> > > >
> > > > The proposal is to deprecate the existing interface
> > > > org.apache.kafka.streams.processor.Processor in favor of a
> > > > new one, org.apache.kafka.streams.processor.api.Processor > > > KOut, VOut> that parameterizes both the input and output types.
> > > >
> > > > This change enables both the Streams DSL internal code and external
> > > > Processor API code to improve their type safety and protect
> themselves
> > > > from type-level bugs.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>


Kafka connect task assignment Improvement ( New Feature )

2019-07-29 Thread Manjeet Duhan
Hi ,

This is Manjeet here working in operative media . I have been working on 
confluent kafka for almost 4 years and have made many customized changes for 
kafka connect sink and source connectors . I have made changes in kafka code 
base as well for our requirement.

There is one feature I have added recently after discussing with our architect 
Praveen Manvi which I wanted to discuss with you for larger community usage.

Background  :- We are running more than 30 connectors in the operative but each 
connector require different machine specification . E.g Kafka connect s3 
requires more memory and some of the in house connector require more network 
bandwidth ( IO ) and processing power (CPU) . We were getting out of memory in 
worker due to one connector . This effected entire processes and we had to 
pause this connector.

Issue :- We wanted each connector to run on specific machine (in this case , we 
want 3 type of machines memory , cpu and IO).

Existing Solution :-  We can start 3 cluster and have specific type of machine 
in each cluster but this is difficult to manage.
  Pain points :-

1.   We have to consistently take care of cluster while starting machine 
otherwise it can start in different cluster.

2.   We have to change offset storage topic otherwise we will be able to 
see across cluster connectors

Issue Proposed :-  We specify type of machine in distributed properties of each 
worker machine so that when we specify target machine type in connector start , 
It should be able to start task on exactly same type of machines. In this case 
we don't have to take care of above pain points . Different type of machine 
will be part of same cluster.

Example :- I have 4 workers with type as memory (worker 1), cpu (worker 2) and 
IO (worker3 and worker 4 ).


a)   We started connector 1 with 2 tasks and specified target machine type 
as cpu. It will distribute tasks equally on worker 3 and worker 4.

b)  We started connector 2  with 2 task with target machine type as memory 
. It will start both task on worker 1.

I have made changes for this feature and it is working fine and we are pushing 
to our production cluster in few days.

Please tell if it can be helpful for the larger community.


Thanks,
Manjeet Duhan




[VOTE] KIP-492 Add java security providers in Kafka Security config

2019-07-29 Thread Sandeep Mopuri
Hi all, after some good discussion
 about the
KIP
,
I'm starting the voting.

This KIP proposes adding new security configuration to accept custom
security providers that can provide algorithms for SSL or SASL.

-- 
Thanks,
M.Sai Sandeep


Re: [DISCUSS] KIP-497: Add inter-broker API to alter ISR

2019-07-29 Thread Guozhang Wang
2. Sounds good. Thanks!

Guozhang

On Mon, Jul 29, 2019 at 11:43 AM Jason Gustafson  wrote:

> @Colin
>
> Yeah, that's a good question. If the version is higher, I think it's not
> safe for the leader to use the new version until it has received the full
> LeaderAndIsr state. For example, there may be a reassignment in progress
> which could alter the leader's expected state change. At least knowing
> about the higher version saves the leader unnecessary retries and will be
> useful for debugging missed LeaderAndIsr updates. I considered letting the
> AlterIsr response include the full leader and ISR state so that the leader
> could continue without relying on the update. However, I think this would
> just tend to mask bugs in the controller's propagation logic. It seems
> simpler to have one mechanism for propagation of leader and ISR state
> rather than two.
>
> @Guozhang
>
> 1. Yes, it is the same version field used in LeaderAndIsr requests. I will
> make this explicit.
> 2. I think both options can work. The main thing I'm trying to avoid is
> having the leader blocking on an ISR update. At some point, if the leader
> doesn't receive an expected LeaderAndIsr update, then it must retry the
> AlterIsr request. I thought it would be simpler if retries always reflected
> the latest expectation on the ISR state rather than letting the leader be
> stuck retrying a state change which may no longer be relevant. This is the
> approach that I modeled. That being said, if it's ok with you, I'd prefer
> to leave this decision to the implementation. I think the main point is
> that once the leader receives the latest update, it can discard any pending
> state.
>
>
> Thanks,
> Jason
>
>
>
> On Mon, Jul 29, 2019 at 9:22 AM Guozhang Wang  wrote:
>
> > Hi Jason,
> >
> > Thanks for the KIP. It looks good to me overall.
> >
> > 1. Just clarifying the "CurrentVersion" field in the newly proposed
> > protocol. Does it contains the same value as zkVersion that controller
> get
> > from ZK?
> >
> > 2. As for the comment in the KIP: "We can either await the update or we
> can
> > send a new update with the current version. If we did the latter, then we
> > would have multiple pending inflights using the same version." My
> > understanding is that it is the controller who acts as the source of
> truth
> > for "currentVersion", in which case I think there's little latency
> benefits
> > to send multiple pending requests with the same version, since which-ever
> > arrives controller first would cause the zkVersion to be bumped and
> > therefore the rest of the requests would be rejected with
> > "INVALID_ISR_VERSION". So I'd favor just wait the update response from
> the
> > current inflight request before sending out the next request --
> admittedly
> > this requires a bit more complicated implementation on the brokers, but
> > maybe we can generalize the request queue module on controller for this
> > purpose?
> >
> >
> > Guozhang
> >
> >
> > On Sun, Jul 28, 2019 at 10:32 AM Colin McCabe 
> wrote:
> >
> > > Hi Jason,
> > >
> > > This looks good.
> > >
> > > If the AlterIsr request returns a higher ZK version than the one the
> > > broker currently has, will the broker use that as its new ZK version?
> I
> > > suppose this could happen if some of the updates the controller pushed
> > out
> > > were not received or not received yet by the broker in question.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Fri, Jul 26, 2019, at 09:43, Jason Gustafson wrote:
> > > > Hi All,
> > > >
> > > > I have written a proposal to change the way leaders make ISR
> > > > modifications:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-497%3A+Add+inter-broker+API+to+alter+ISR
> > > .
> > > > Have a look and let me know what you think.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-496: Administrative API to delete consumer offsets

2019-07-29 Thread Jason Gustafson
Hi Guozhang,

I have added metrics to the KIP. Please take a look. This gave me an excuse
to also add a metric for the group rebalance rate, which probably would
have made detecting KAFKA-8653 easier.

Since this is a relatively straightforward KIP, I will go ahead and start a
vote later this week if there are no further comments.

Thanks,
Jason

On Mon, Jul 29, 2019 at 9:10 AM Guozhang Wang  wrote:

> Thanks for the replies Jason!
>
> 2. No I do not see any problems, just trying to understand how restrict we
> are applying this rule :) Piggy-backing on the existing background thread
> and check interval mechanism means we are not "eagerly" expiring either,
> but I think this is fine.
>
>
> Guozhang
>
>
> On Thu, Jul 25, 2019 at 3:16 PM Jason Gustafson 
> wrote:
>
> > 1. Fixed, thanks!
> >
> > 2. Yes, that is what I was thinking. Do you see any problems?
> >
> > 3. Good point. Do you think a meter for expired and deleted offsets would
> > be sufficient?
> >
> > 4. I considered it. I thought that might be a little dangerous for
> dynamic
> > groups which have subscriptions changing. If the first member to
> discover a
> > subscription change falls out, then offsets would be lost. Also, it
> seemed
> > a little more consistent with empty group expiration. From an offset
> > expiration perspective, an empty group is just treated as a case where
> the
> > subscription is empty, which makes all offsets subject to expiration.
> >
> >
> > Thanks,
> > Jason
> >
> > On Thu, Jul 25, 2019 at 1:41 PM Guozhang Wang 
> wrote:
> >
> > > Hi Jason,
> > >
> > > Thanks for the KIP! I've made a pass on it and here are few comments:
> > >
> > > 1. " before the clients which ." --> incomplete sentence?
> > >
> > > 2. " Any committed offset for a partition which is not currently
> > subscribed
> > > to is subject to expiration." --> this may be an implementation detail,
> > but
> > > are we going to piggy-back on the same offsetsRetentionCheckIntervalMs
> to
> > > check for expirable offsets as well?
> > >
> > > Some meta comment:
> > >
> > > 3. Looking into the current broker-side metrics, we do not have a good
> > user
> > > visibility yet for offset removal either due to expiration or deletion,
> > > maybe we should consider adding one?
> > >
> > > 4. Playing the devil's advocate here: for cases where deleting
> expirable
> > > offsets is needed (like you mentioned lag monitoring), should we just
> > > by-pass the offset retention ms (by default it's one day) and remove
> > > immediately? What scenarios would require those non-subscribed
> partition
> > > offsets to be retained longer?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Jul 23, 2019 at 10:11 AM Jason Gustafson 
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I have a short KIP to add an api for consumer offset deletion:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets
> > > > .
> > > > Please take a look and let me know what you think.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-478 Strongly Typed Processor API

2019-07-29 Thread Matthias J. Sax
Thanks for the details!

Also talked to Guozhang about a potential upgrade path. This KIP seems
not to put us into an bad position to provide a clean upgrade path if we
change the `ProcessorContext` in the future.

Thus, I think we can move forward.


-Matthias

On 7/24/19 3:32 PM, John Roesler wrote:
> Hey again Matthias,
> 
> I think it might help to frame the evaluation of the Context question if we
> have a "spitball" proposal for what change we might want to make to the
> context.
> 
> Currently, the ProcessorContext is referenced in the following public
> interfaces:
> 
> org.apache.kafka.streams.errors.DeserializationExceptionHandler#handle
> org.apache.kafka.streams.kstream.Transformer#init
> org.apache.kafka.streams.kstream.ValueTransformer#init
> org.apache.kafka.streams.kstream.ValueTransformerWithKey#init
> org.apache.kafka.streams.processor.Processor#init
> org.apache.kafka.streams.processor.StateStore#init
> 
> We can sub-divide the ProcessorContext into broad categories:
> General Information:
> * a handle on the config
> * information about the execution context (what is the task id, the
> application id, etc)
> Record Information:
> * extra information about the current record
> Store Support:
> * the ability to register state restore callbacks
> Processor Support:
> * the ability to schedule punctuations
> * the ability to get registered state stores
> * the ability to schedule punctuations
> * the ability to forward records
> * the ability to request commits
> 
> We could imagine slicing the Processor Context into four new component
> interfaces, and making ProcessorContext just implement them. Then, we could
> mix-and-match those new component interfaces for use elsewhere.
> 
> E.g.,:
> org.apache.kafka.streams.errors.DeserializationExceptionHandler#handle
> * only gets the informational context
> 
> org.apache.kafka.streams.kstream.Transformer#init
> org.apache.kafka.streams.kstream.ValueTransformer#init
> org.apache.kafka.streams.kstream.ValueTransformerWithKey#init
> * information context
> * the ability to get registered state stores
> Also
> * the ability to schedule punctuations
> * restricted ability to forward (only obeying the rules of the particular
> interface, for example)
> Or maybe just:
> * no ability to foraward
> * the ability to schedule special punctuators that can return one or more
> keys or values when fired
> 
> org.apache.kafka.streams.processor.Processor#init
> * all the contexts, except the ability to register state restore callbacks
> 
> org.apache.kafka.streams.processor.StateStore#init
> * information contexts
> * the ability to register state restore callbacks
> * maybe punctuations and forwards, could be discussed further
> 
> 
> The operative question for us right now is whether there is a clean path to
> something like this from the current KIP, or whether we'd be forced to
> deprecate an interface we are only just now adding. Note that the only
> interfaces we're adding right now are :
> * org.apache.kafka.streams.processor.api.Processor
> * org.apache.kafka.streams.processor.api.ProcessorSupplier
> And the only thing we need to make the above spitball proposal compatible
> with these proposed interfaces is to deprecate the ability to register
> state restore callbacks from the ProcessorContext.
> 
> Otherwise, we would at that time be able to propose new Transformer
> interfaces that take (e.g.) TransformerContexts, likewise with
> DeserializationExceptionHandler and StateStore.
> 
> In other words, I _think_ that we have a clean migration path to address
> the Context problem in follow-on work. But clearly my motivation may be
> corrupt. What do you think?
> 
> Thanks,
> -John
> 
> 
> On Wed, Jul 24, 2019 at 5:06 PM John Roesler  wrote:
> 
>> Hey Matthias,
>>
>> I agree, it's worth double-checking to make sure that the upgrade path
>> would be smooth. There's no point in putting ourselves in an awkward jam.
>> I'll look into it and report back.
>>
>> Regarding the global store logic, I confirmed that the "state update
>> processor" shouldn't be forwarding anything, so we can safely bound its
>> output type to `Void`. I've updated the KIP.
>>
>> Thanks,
>> -John
>>
>> On Wed, Jul 24, 2019 at 3:08 PM Matthias J. Sax 
>> wrote:
>>
>>> If we don't fix the `ProcessorContext` now, how would an upgrade path
>>> look like?
>>>
>>> We woudl deprecate existing `init()` and add a new `init()`, and during
>>> runtime need to call both? This sound rather error prone to me and might
>>> be confusing to users? Hence, it might be beneficial to fix it right now.
>>>
>>> If my concerns are not valid, and we think that the upgrade path will
>>> smooth, we can of course do a follow up KIP. Another possibility would
>>> be, to still do an extra KIP but ensure that both KIPs are contained in
>>> the same release.
>>>
>>> WDYT?
>>>
>>>
>>> -Matthias
>>>
>>> On 7/24/19 11:55 AM, John Roesler wrote:
 Hey Matthias,

 Thanks for the review!

 I agree about P

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2019-07-29 Thread Matthias J. Sax
+1 (binding)

On 7/29/19 11:59 AM, Bill Bejeck wrote:
> Thanks for the KIP.
> 
> +1 (binding)
> 
> -Bill
> 
> On Wed, Jul 24, 2019 at 12:12 PM Guozhang Wang  wrote:
> 
>> Yeah I think I agree with you.
>>
>> +1 (binding) from me.
>>
>>
>> Guozhang
>>
>>
>> On Wed, Jul 24, 2019 at 7:43 AM John Roesler  wrote:
>>
>>> Hi Guozhang,
>>>
>>> Thanks! I just replied in the discuss thread. I agree with what you're
>>> proposing, but would like to consider it outside the scope of this KIP,
>> if
>>> that's ok with you.
>>>
>>> -John
>>>
>>> On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang 
>> wrote:
>>>
 Hi John,

 I left another question regarding Transformer in the DISCUSS thread.
>>> Other
 than that I think this KIP is ready. Thanks!


 Guozhang


 On Tue, Jul 16, 2019 at 9:01 AM John Roesler 
>> wrote:

> Hi Dev,
>
> After a good discussion, I'd like to start the vote for KIP-478
> (https://cwiki.apache.org/confluence/x/2SkLBw).
>
> The proposal is to deprecate the existing interface
> org.apache.kafka.streams.processor.Processor in favor of a
> new one, org.apache.kafka.streams.processor.api.Processor KOut, VOut> that parameterizes both the input and output types.
>
> This change enables both the Streams DSL internal code and external
> Processor API code to improve their type safety and protect
>> themselves
> from type-level bugs.
>
> Thanks,
> -John
>


 --
 -- Guozhang

>>>
>>
>>
>> --
>> -- Guozhang
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS]KIP-216: IQ should throw different exceptions for different errors

2019-07-29 Thread Matthias J. Sax
Any update on this KIP Vito?


On 7/11/19 4:26 PM, Matthias J. Sax wrote:
> Thanks Vito! I think the KIP shapes out nicely!
> 
> 
> To answer the open question you raised (I also adjust my answers based
> on the latest KIP update)
> 
> 
> 
> About `StreamThreadNotStartedException`: I understand what you pointed
> out. However, I think we can consider the following: If a thread is not
> started yet, and `KafkaStreams#store()` throw this exception, we would
> not return a `CompositeReadOnlyXxxStore` to the user. Hence, `get()`
> cannot be called. And if we return `CompositeReadOnlyXxxStore` the
> thread was started and `get()` would never hit the condition to throw
> the exception? Or do I miss something (this part of the logic is a
> little tricky...)
> 
> However, thinking about it, what could happen IMHO is, that the
> corresponding thread crashes after we handed out the store handle. For
> this case, it would make sense to throw an exception from `get()` but it
> would be a different one IMHO. Maybe we need a new type
> (`StreamThreadDeadException` or similar?) or we should reuse
> `StoreMigratedException` because if a thread dies we would migrate the
> store to another thread. (The tricky part might be, to detect this
> condition correctly -- not 100% sure atm how we could do this.)
> 
> What do you think about this?
> 
> 
> 
> About `KafkaStreamsNotRunningException` vs
> `StreamThreadNotRunningException` -- I see your point. Atm, I think we
> don't allow querying at all if KafkaStreams is not in state RUNNING
> (correct me if I am wrong). Hence, if there is an instance with 2
> thread, and 1 thread is actually up and ready, but the other thread is
> not, you cannot query anything. Only if both threads are in state
> RUNNING we allow to query. It might be possible to change the code to
> allow querying if a thread is ready independent from the other threads.
> For this case, the name you suggest would make more sense. But I
> _think_, that the current behavior is different and thus,
> `KafkaStreamsNotRunningException` seems to reflect the current behavior
> better? -- I also want to add that we are talking about a fatal
> exception -- if a thread crashes, we would migrate the store to another
> thread and it would not be fatal, but the store can be re-discovered.
> Only if all thread would die, if would be fatal -- however, for this
> case KafakStreams would transit to DEAD anyway.
> 
> 
> 
>> When the user passes a store name to `KafkaStreams#store()`, does there
>> have a way that distinguish the store name is "a wrong name" or "migrated"
>> during `QueryableStoreProvider#getStore()`?
>> From my current understanding, I cannot distinguish these two.
> 
> This should be possible. In the private KafkaStreams constructor, we
> have access to `InternalTopologyBuilder` that can give us all known
> store names. Hence, we can get a set of all known store names, keep them
> as a member variable and use in `KafkaStreams#store()` in an initial
> check if the store name is valid or not.
> 
> 
> 
>> Should we remove `StreamThreadNotRunningException` and throw
>> `FatalStateStoreException` directly ?
> 
> I would keep both, because `FatalStateStoreException` is not very
> descriptive. Also, we should still have fatal exception
> `StateStoreNotAvailableException`? Not sure why you remove it?
> 
> 
> 
> Glad you found a way to avoid
> `QueryableStoreType#setStreams(KafkaStreams streams)`.
> 
> 
> 
> -Matthias
> 
> 
> On 7/5/19 8:03 AM, Vito Jeng wrote:
>> Hi, Mattias,
>>
>> Just completed the modification of KIP, please take a look when you are
>> available.
>>
>> ---
>> Vito
>>
>>
>> On Wed, Jul 3, 2019 at 9:07 PM Vito Jeng  wrote:
>>
>>> Hi, Matthias,
>>>
>>> This is second part.
>>>
 For the internal exceptions:

 `StateStoreClosedException` -- why can it be wrapped as
 `StreamThreadNotStartedException` ? It seems that the later would only
 be thrown by `KafkaStreams#store()` and thus would be throw directly.
>>>
>>> Both `StateStoreClosedException` and `EmptyStateStoreException` not can be
>>> wrapped as `StreamThreadNotStartedException`.
>>> This is a mistaken written in the previous KIP. Thank you point this.
>>>
 A closed-exception should only happen after a store was successfully
 retrieved but cannot be queried any longer? Hence, converting/wrapping
 it into a `StateStoreMigratedException` make sense. I am also not sure,
 when a closed-exception would be wrapped by a
 `StateStoreNotAvailableException` (implying my understanding as describe
 above)?

 Same questions about `EmptyStateStoreException`.

 Thinking about both internal exceptions twice, I am wondering if it
 makes sense to have both internal exceptions at all? I have the
 impression that it make only sense to wrap them with a
 `StateStoreMigragedException`, but if they are wrapped into the same
 exception all the time, we can just remove both and throw
 `StateStoreMigr

Re: [VOTE] KIP-479 Add Materialized to Join

2019-07-29 Thread Guozhang Wang
+1 (binding)

On Thu, Jul 25, 2019 at 7:39 PM Matthias J. Sax 
wrote:

> +1 (binding)
>
> On 7/25/19 1:05 PM, Bill Bejeck wrote:
> > All,
> >
> > After a great discussion on KIP-479 (
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join
> )
> > I'd
> > like to start a vote.
> >
> > Thanks,
> > Bill
> >
>
>

-- 
-- Guozhang


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-07-29 Thread Matthias J. Sax
What is the status of this KIP?

-Matthias

On 7/11/19 4:00 PM, Matthias J. Sax wrote:
> Ivan,
> 
> did you see my last reply? What do you think about my proposal to mix
> both approaches and try to get best-of-both worlds?
> 
> 
> -Matthias
> 
> On 6/11/19 3:56 PM, Matthias J. Sax wrote:
>> Thanks for the input John!
>>
>>> under your suggestion, it seems that the name is required
>>
>> If you want to get the `KStream` as part of the `Map` back using a
>> `Function`, yes. If you follow the "embedded chaining" pattern using a
>> `Consumer`, no.
>>
>> Allowing for a default name via `split()` can of course be done.
>> Similarly, using `Named` instead of `String` is possible.
>>
>> I wanted to sketch out a high level proposal to merge both patterns
>> only. Your suggestions to align the new API with the existing API make
>> totally sense.
>>
>>
>>
>> One follow up question: Would `Named` be optional or required in
>> `split()` and `branch()`? It's unclear from your example.
>>
>> If both are mandatory, what do we gain by it? The returned `Map` only
>> contains the corresponding branches, so why should we prefix all of
>> them? If only `Named` is mandatory in `branch()`, but optional in
>> `split()`, the same question raises?
>>
>> Requiring `Named` in `split()` seems only to make sense, if `Named` is
>> optional in `branch()` and we generate `-X` suffix using a counter for
>> different branch name. However, this might lead to the problem of
>> changing names if branches are added/removed. Also, how would the names
>> be generated if `Consumer` is mixed in (ie, not all branches are
>> returned in the `Map`).
>>
>> If `Named` is optional for both, it could happen that a user misses to
>> specify a name for a branch what would lead to runtime issues.
>>
>>
>> Hence, I am actually in favor to not allow a default name but keep
>> `split()` without parameter and make `Named` in `branch()` required if a
>> `Function` is used. This makes it explicit to the user that specifying a
>> name is required if a `Function` is used.
>>
>>
>>
>> About
>>
>>> KBranchedStream#branch(BranchConfig)
>>
>> I don't think that the branching predicate is a configuration and hence
>> would not include it in a configuration object.
>>
>>> withChain(...);
>>
>> Similar, `withChain()` (that would only take a `Consumer`?) does not
>> seem to be a configuration. We can also not prevent a user to call
>> `withName()` in combination of `withChain()` what does not make sense
>> IMHO. We could of course throw an RTE but not have a compile time check
>> seems less appealing. Also, it could happen that neither `withChain()`
>> not `withName()` is called and the branch is missing in the returned
>> `Map` what lead to runtime issues, too.
>>
>> Hence, I don't think that we should add `BranchConfig`. A config object
>> is helpful if each configuration can be set independently of all others,
>> but this seems not to be the case here. If we add new configuration
>> later, we can also just move forward by deprecating the methods that
>> accept `Named` and add new methods that accepted `BranchConfig` (that
>> would of course implement `Named`).
>>
>>
>> Thoughts?
>>
>>
>> @Ivan, what do you think about the general idea to blend the two main
>> approaches of returning a `Map` plus an "embedded chaining"?
>>
>>
>>
>> -Matthias
>>
>>
>>
>> On 6/4/19 10:33 AM, John Roesler wrote:
>>> Thanks for the idea, Matthias, it does seem like this would satisfy
>>> everyone. Returning the map from the terminal operations also solves
>>> the problem of merging/joining the branched streams, if we want to add
>>> support for the compliment later on.
>>>
>>> Under your suggestion, it seems that the name is required. Otherwise,
>>> we wouldn't have keys for the map to return. I this this is actually
>>> not too bad, since experience has taught us that, although names for
>>> operations are not required to define stream processing logic, it does
>>> significantly improve the operational experience when you can map the
>>> topology, logs, metrics, etc. back to the source code. Since you
>>> wouldn't (have to) reference the name to chain extra processing onto
>>> the branch (thanks to the second argument), you can avoid the
>>> "unchecked name" problem that Ivan pointed out.
>>>
>>> In the current implementation of Branch, you can name the branch
>>> operator itself, and then all the branches get index-suffixed names
>>> built from the branch operator name. I guess under this proposal, we
>>> could naturally append the branch name to the branching operator name,
>>> like this:
>>>
>>>stream.split(Named.withName("mysplit")) //creates node "mysplit"
>>>   .branch(..., ..., "abranch") // creates node "mysplit-abranch"
>>>   .defaultBranch(...) // creates node "mysplit-default"
>>>
>>> It does make me wonder about the DSL syntax itself, though.
>>>
>>> We don't have a defined grammar, so there's plenty of room to debate
>>> the "best" syntax in the 

Re: [DISCUSS] KIP-496: Administrative API to delete consumer offsets

2019-07-29 Thread Guozhang Wang
Sounds good.

Just pointing out that the metrics scope likely overlap with
https://issues.apache.org/jira/browse/KAFKA-3556, so may better document
that in the wiki page.


Guozhang

On Mon, Jul 29, 2019 at 2:48 PM Jason Gustafson  wrote:

> Hi Guozhang,
>
> I have added metrics to the KIP. Please take a look. This gave me an excuse
> to also add a metric for the group rebalance rate, which probably would
> have made detecting KAFKA-8653 easier.
>
> Since this is a relatively straightforward KIP, I will go ahead and start a
> vote later this week if there are no further comments.
>
> Thanks,
> Jason
>
> On Mon, Jul 29, 2019 at 9:10 AM Guozhang Wang  wrote:
>
> > Thanks for the replies Jason!
> >
> > 2. No I do not see any problems, just trying to understand how restrict
> we
> > are applying this rule :) Piggy-backing on the existing background thread
> > and check interval mechanism means we are not "eagerly" expiring either,
> > but I think this is fine.
> >
> >
> > Guozhang
> >
> >
> > On Thu, Jul 25, 2019 at 3:16 PM Jason Gustafson 
> > wrote:
> >
> > > 1. Fixed, thanks!
> > >
> > > 2. Yes, that is what I was thinking. Do you see any problems?
> > >
> > > 3. Good point. Do you think a meter for expired and deleted offsets
> would
> > > be sufficient?
> > >
> > > 4. I considered it. I thought that might be a little dangerous for
> > dynamic
> > > groups which have subscriptions changing. If the first member to
> > discover a
> > > subscription change falls out, then offsets would be lost. Also, it
> > seemed
> > > a little more consistent with empty group expiration. From an offset
> > > expiration perspective, an empty group is just treated as a case where
> > the
> > > subscription is empty, which makes all offsets subject to expiration.
> > >
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Thu, Jul 25, 2019 at 1:41 PM Guozhang Wang 
> > wrote:
> > >
> > > > Hi Jason,
> > > >
> > > > Thanks for the KIP! I've made a pass on it and here are few comments:
> > > >
> > > > 1. " before the clients which ." --> incomplete sentence?
> > > >
> > > > 2. " Any committed offset for a partition which is not currently
> > > subscribed
> > > > to is subject to expiration." --> this may be an implementation
> detail,
> > > but
> > > > are we going to piggy-back on the same
> offsetsRetentionCheckIntervalMs
> > to
> > > > check for expirable offsets as well?
> > > >
> > > > Some meta comment:
> > > >
> > > > 3. Looking into the current broker-side metrics, we do not have a
> good
> > > user
> > > > visibility yet for offset removal either due to expiration or
> deletion,
> > > > maybe we should consider adding one?
> > > >
> > > > 4. Playing the devil's advocate here: for cases where deleting
> > expirable
> > > > offsets is needed (like you mentioned lag monitoring), should we just
> > > > by-pass the offset retention ms (by default it's one day) and remove
> > > > immediately? What scenarios would require those non-subscribed
> > partition
> > > > offsets to be retained longer?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Jul 23, 2019 at 10:11 AM Jason Gustafson  >
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I have a short KIP to add an api for consumer offset deletion:
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets
> > > > > .
> > > > > Please take a look and let me know what you think.
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-497: Add inter-broker API to alter ISR

2019-07-29 Thread Jun Rao
Hi, Jason,

Thanks for the KIP. Looks good overall. A couple of comments below.

10. It's not very clear to me when the partition state (i.e., leader, isr,
leaderEpoch and zkVersion) is updated on the leader. My initial
understanding is that the leader updates the partition state on receiving a
successful AlterIsr response (which will container a higher zkVersion).
However, your response to Colin seems to indicate that the leader only
updates the partition state on LeaderAndIsr request. Both approaches are
possible. If we do the former, the partition state can be updated through
both AlterIsr response and LeaderAndIsr request. So, we need a way to
serialize the ordering. We can potentially use zkVersion to do that. If we
do the latter, we need to have the controller additionally send a
LeaderAndIsr request on completing the AlterIsrRequest. We also need some
way to make those temporary ISR changes (due to AlterIsr) permanent based
on the LeaderAndIsr request.

11. A question on an implementation detail. I assume that if a follower
fetchRequest causes an ISR expansion, the completion of the fetchRequest
doesn't block on the AlterIsr request? If so, are we going to introduce a
separate thread to handle the sending/receiving of AlterIsr request?
Related to do this, do we guarantee that all the AlterIsr requests from a
broker will be sent to the controller in order? Do we allow more than one
pending AlterIsr request or not?

Jun



On Mon, Jul 29, 2019 at 12:59 PM Guozhang Wang  wrote:

> 2. Sounds good. Thanks!
>
> Guozhang
>
> On Mon, Jul 29, 2019 at 11:43 AM Jason Gustafson 
> wrote:
>
> > @Colin
> >
> > Yeah, that's a good question. If the version is higher, I think it's not
> > safe for the leader to use the new version until it has received the full
> > LeaderAndIsr state. For example, there may be a reassignment in progress
> > which could alter the leader's expected state change. At least knowing
> > about the higher version saves the leader unnecessary retries and will be
> > useful for debugging missed LeaderAndIsr updates. I considered letting
> the
> > AlterIsr response include the full leader and ISR state so that the
> leader
> > could continue without relying on the update. However, I think this would
> > just tend to mask bugs in the controller's propagation logic. It seems
> > simpler to have one mechanism for propagation of leader and ISR state
> > rather than two.
> >
> > @Guozhang
> >
> > 1. Yes, it is the same version field used in LeaderAndIsr requests. I
> will
> > make this explicit.
> > 2. I think both options can work. The main thing I'm trying to avoid is
> > having the leader blocking on an ISR update. At some point, if the leader
> > doesn't receive an expected LeaderAndIsr update, then it must retry the
> > AlterIsr request. I thought it would be simpler if retries always
> reflected
> > the latest expectation on the ISR state rather than letting the leader be
> > stuck retrying a state change which may no longer be relevant. This is
> the
> > approach that I modeled. That being said, if it's ok with you, I'd prefer
> > to leave this decision to the implementation. I think the main point is
> > that once the leader receives the latest update, it can discard any
> pending
> > state.
> >
> >
> > Thanks,
> > Jason
> >
> >
> >
> > On Mon, Jul 29, 2019 at 9:22 AM Guozhang Wang 
> wrote:
> >
> > > Hi Jason,
> > >
> > > Thanks for the KIP. It looks good to me overall.
> > >
> > > 1. Just clarifying the "CurrentVersion" field in the newly proposed
> > > protocol. Does it contains the same value as zkVersion that controller
> > get
> > > from ZK?
> > >
> > > 2. As for the comment in the KIP: "We can either await the update or we
> > can
> > > send a new update with the current version. If we did the latter, then
> we
> > > would have multiple pending inflights using the same version." My
> > > understanding is that it is the controller who acts as the source of
> > truth
> > > for "currentVersion", in which case I think there's little latency
> > benefits
> > > to send multiple pending requests with the same version, since
> which-ever
> > > arrives controller first would cause the zkVersion to be bumped and
> > > therefore the rest of the requests would be rejected with
> > > "INVALID_ISR_VERSION". So I'd favor just wait the update response from
> > the
> > > current inflight request before sending out the next request --
> > admittedly
> > > this requires a bit more complicated implementation on the brokers, but
> > > maybe we can generalize the request queue module on controller for this
> > > purpose?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Jul 28, 2019 at 10:32 AM Colin McCabe 
> > wrote:
> > >
> > > > Hi Jason,
> > > >
> > > > This looks good.
> > > >
> > > > If the AlterIsr request returns a higher ZK version than the one the
> > > > broker currently has, will the broker use that as its new ZK version?
> > I
> > > > suppose this could happen if some of the updates the c

Re: [VOTE] KIP-492 Add java security providers in Kafka Security config

2019-07-29 Thread Harsha Chintalapani
Thanks for the KIP Sandeep.

+1 (binding)

Thanks,
Harsha
On Jul 29, 2019, 12:22 PM -0700, Sandeep Mopuri , wrote:
> Hi all, after some good discussion
>  about the
> KIP
> ,
> I'm starting the voting.
>
> This KIP proposes adding new security configuration to accept custom
> security providers that can provide algorithms for SSL or SASL.
>
> --
> Thanks,
> M.Sai Sandeep


Re: [DISCUSS] KIP-497: Add inter-broker API to alter ISR

2019-07-29 Thread Jason Gustafson
Hi Jun,

Thanks for the comments.

10. My intent was to rely only on the LeaderAndIsr request. It seemed
simpler if there is just one way that ISR state is updated on leaders (e.g.
the request log is all you need to see the changes), but I'd be interested
if you can think of a reason to prefer giving AlterIsr synchronous
semantics. The zkVersion field in the AlterIsr response seems to be causing
some confusion, so I will probably take it out. I had thought it might be
useful for debugging, but it is not strictly needed since we have an error
code to indicate when the expected version does not match.

11. Yes, I was planning to introduce a dedicated thread for controller
requests. I think the key is that the leader only knows what it thinks to
be the latest version. It doesn't make much sense to try and reason about
multiple in-flight versions. So really this comes down to whether, in the
case of retries, we want to insist on always sending the same AlterIsr
request or if we allow it to be changed. It may be simplest to do the
former, but I thought it would be awkward to keep retrying a state which
had become obsolete. Maybe the simple approach will be a little easier to
explain in the end? In that case, if there is a pending ISR change, we can
ignore further changes until it is resolved. As documented in the KIP, when
the leader receives the updated state, it must check whether there are any
additional changes that are required.

-Jason


On Mon, Jul 29, 2019 at 3:53 PM Jun Rao  wrote:

> Hi, Jason,
>
> Thanks for the KIP. Looks good overall. A couple of comments below.
>
> 10. It's not very clear to me when the partition state (i.e., leader, isr,
> leaderEpoch and zkVersion) is updated on the leader. My initial
> understanding is that the leader updates the partition state on receiving a
> successful AlterIsr response (which will container a higher zkVersion).
> However, your response to Colin seems to indicate that the leader only
> updates the partition state on LeaderAndIsr request. Both approaches are
> possible. If we do the former, the partition state can be updated through
> both AlterIsr response and LeaderAndIsr request. So, we need a way to
> serialize the ordering. We can potentially use zkVersion to do that. If we
> do the latter, we need to have the controller additionally send a
> LeaderAndIsr request on completing the AlterIsrRequest. We also need some
> way to make those temporary ISR changes (due to AlterIsr) permanent based
> on the LeaderAndIsr request.
>
> 11. A question on an implementation detail. I assume that if a follower
> fetchRequest causes an ISR expansion, the completion of the fetchRequest
> doesn't block on the AlterIsr request? If so, are we going to introduce a
> separate thread to handle the sending/receiving of AlterIsr request?
> Related to do this, do we guarantee that all the AlterIsr requests from a
> broker will be sent to the controller in order? Do we allow more than one
> pending AlterIsr request or not?
>
> Jun
>
>
>
> On Mon, Jul 29, 2019 at 12:59 PM Guozhang Wang  wrote:
>
> > 2. Sounds good. Thanks!
> >
> > Guozhang
> >
> > On Mon, Jul 29, 2019 at 11:43 AM Jason Gustafson 
> > wrote:
> >
> > > @Colin
> > >
> > > Yeah, that's a good question. If the version is higher, I think it's
> not
> > > safe for the leader to use the new version until it has received the
> full
> > > LeaderAndIsr state. For example, there may be a reassignment in
> progress
> > > which could alter the leader's expected state change. At least knowing
> > > about the higher version saves the leader unnecessary retries and will
> be
> > > useful for debugging missed LeaderAndIsr updates. I considered letting
> > the
> > > AlterIsr response include the full leader and ISR state so that the
> > leader
> > > could continue without relying on the update. However, I think this
> would
> > > just tend to mask bugs in the controller's propagation logic. It seems
> > > simpler to have one mechanism for propagation of leader and ISR state
> > > rather than two.
> > >
> > > @Guozhang
> > >
> > > 1. Yes, it is the same version field used in LeaderAndIsr requests. I
> > will
> > > make this explicit.
> > > 2. I think both options can work. The main thing I'm trying to avoid is
> > > having the leader blocking on an ISR update. At some point, if the
> leader
> > > doesn't receive an expected LeaderAndIsr update, then it must retry the
> > > AlterIsr request. I thought it would be simpler if retries always
> > reflected
> > > the latest expectation on the ISR state rather than letting the leader
> be
> > > stuck retrying a state change which may no longer be relevant. This is
> > the
> > > approach that I modeled. That being said, if it's ok with you, I'd
> prefer
> > > to leave this decision to the implementation. I think the main point is
> > > that once the leader receives the latest update, it can discard any
> > pending
> > > state.
> > >
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > >
> > > On M

[jira] [Created] (KAFKA-8730) Add API to delete consumer offsets (KIP-496)

2019-07-29 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-8730:
--

 Summary: Add API to delete consumer offsets (KIP-496)
 Key: KAFKA-8730
 URL: https://issues.apache.org/jira/browse/KAFKA-8730
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


Implements KIP-496 as documented here: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets].



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [VOTE] KIP-492 Add java security providers in Kafka Security config

2019-07-29 Thread Satish Duggana
+1 (non-binding)

Thanks,
Satish.

On Tue, Jul 30, 2019 at 5:18 AM Harsha Chintalapani  wrote:
>
> Thanks for the KIP Sandeep.
>
> +1 (binding)
>
> Thanks,
> Harsha
> On Jul 29, 2019, 12:22 PM -0700, Sandeep Mopuri , wrote:
> > Hi all, after some good discussion
> >  about the
> > KIP
> > ,
> > I'm starting the voting.
> >
> > This KIP proposes adding new security configuration to accept custom
> > security providers that can provide algorithms for SSL or SASL.
> >
> > --
> > Thanks,
> > M.Sai Sandeep


[jira] [Created] (KAFKA-8731) InMemorySessionStore throws NullPointerException on startup

2019-07-29 Thread Jonathan Gordon (JIRA)
Jonathan Gordon created KAFKA-8731:
--

 Summary: InMemorySessionStore throws NullPointerException on 
startup
 Key: KAFKA-8731
 URL: https://issues.apache.org/jira/browse/KAFKA-8731
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.3.0
Reporter: Jonathan Gordon


I receive a NullPointerException on startup when trying to use the new 
InMemorySessionStore via Stores.inMemorySessionStore(...) using the DSL.

Here's the stack trace:

{{ERROR [2019-07-29 21:56:52,246] 
org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
[trace_indexer-c8439020-12af-4db2-ad56-3e58cd56540f-StreamThread-1] Encountered 
the following error during processing:}}
{{! java.lang.NullPointerException: null}}
{{! at 
org.apache.kafka.streams.state.internals.InMemorySessionStore.remove(InMemorySessionStore.java:123)}}
{{! at 
org.apache.kafka.streams.state.internals.InMemorySessionStore.put(InMemorySessionStore.java:115)}}
{{! at 
org.apache.kafka.streams.state.internals.InMemorySessionStore.lambda$init$0(InMemorySessionStore.java:93)}}
{{! at 
org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.lambda$adapt$1(StateRestoreCallbackAdapter.java:47)}}
{{! at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)}}
{{! at 
org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)}}
{{! at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:317)}}
{{! at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)}}
{{! at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:328)}}
{{! at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:867)}}
{{! at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)}}
{{! at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)}}

 

Here's the Slack thread:

[https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1564438647169600]

 

Here's a current PR aimed at fixing the issue:

[https://github.com/apache/kafka/pull/7132]

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Build failed in Jenkins: kafka-trunk-jdk8 #3817

2019-07-29 Thread Apache Jenkins Server
See 


Changes:

[cmccabe] KAFKA-8345: KIP-455 Protocol changes (part 1) (#7114)

--
[...truncated 6.47 MB...]

org.apache.kafka.connect.data.SchemaProjectorTest > testStructAddField PASSED

org.apache.kafka.connect.data.SchemaProjectorTest > testStructRemoveField 
STARTED

org.apache.kafka.connect.data.SchemaProjectorTest > testStructRemoveField PASSED

org.apache.kafka.connect.data.SchemaProjectorTest > testStructDefaultValue 
STARTED

org.apache.kafka.connect.data.SchemaProjectorTest > testStructDefaultValue 
PASSED

org.apache.kafka.connect.data.SchemaProjectorTest > 
testProjectMissingDefaultValuedStructField STARTED

org.apache.kafka.connect.data.SchemaProjectorTest > 
testProjectMissingDefaultValuedStructField PASSED

org.apache.kafka.connect.data.SchemaProjectorTest > 
testPrimitiveOptionalProjection STARTED

org.apache.kafka.connect.data.SchemaProjectorTest > 
testPrimitiveOptionalProjection PASSED

org.apache.kafka.connect.data.SchemaProjectorTest > 
testProjectMissingOptionalStructField STARTED

org.apache.kafka.connect.data.SchemaProjectorTest > 
testProjectMissingOptionalStructField PASSED

org.apache.kafka.connect.data.SchemaProjectorTest > testMaybeCompatible STARTED

org.apache.kafka.connect.data.SchemaProjectorTest > testMaybeCompatible PASSED

org.apache.kafka.connect.data.FieldTest > testEquality STARTED

org.apache.kafka.connect.data.FieldTest > testEquality PASSED

org.apache.kafka.connect.storage.StringConverterTest > testBytesNullToString 
STARTED

org.apache.kafka.connect.storage.StringConverterTest > testBytesNullToString 
PASSED

org.apache.kafka.connect.storage.StringConverterTest > 
testToBytesNonUtf8Encoding STARTED

org.apache.kafka.connect.storage.StringConverterTest > 
testToBytesNonUtf8Encoding PASSED

org.apache.kafka.connect.storage.StringConverterTest > testNonStringToBytes 
STARTED

org.apache.kafka.connect.storage.StringConverterTest > testNonStringToBytes 
PASSED

org.apache.kafka.connect.storage.StringConverterTest > 
testBytesToStringNonUtf8Encoding STARTED

org.apache.kafka.connect.storage.StringConverterTest > 
testBytesToStringNonUtf8Encoding PASSED

org.apache.kafka.connect.storage.StringConverterTest > testNullToBytes STARTED

org.apache.kafka.connect.storage.StringConverterTest > testNullToBytes PASSED

org.apache.kafka.connect.storage.StringConverterTest > 
testNullHeaderValueToBytes STARTED

org.apache.kafka.connect.storage.StringConverterTest > 
testNullHeaderValueToBytes PASSED

org.apache.kafka.connect.storage.StringConverterTest > 
testStringHeaderValueToBytes STARTED

org.apache.kafka.connect.storage.StringConverterTest > 
testStringHeaderValueToBytes PASSED

org.apache.kafka.connect.storage.StringConverterTest > testToBytesIgnoresSchema 
STARTED

org.apache.kafka.connect.storage.StringConverterTest > testToBytesIgnoresSchema 
PASSED

org.apache.kafka.connect.storage.StringConverterTest > 
testNonStringHeaderValueToBytes STARTED

org.apache.kafka.connect.storage.StringConverterTest > 
testNonStringHeaderValueToBytes PASSED

org.apache.kafka.connect.storage.StringConverterTest > testStringToBytes STARTED

org.apache.kafka.connect.storage.StringConverterTest > testStringToBytes PASSED

org.apache.kafka.connect.storage.StringConverterTest > testBytesToString STARTED

org.apache.kafka.connect.storage.StringConverterTest > testBytesToString PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertEmptyListToListWithoutSchema STARTED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertEmptyListToListWithoutSchema PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertStringWithQuotesAndOtherDelimiterCharacters STARTED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertStringWithQuotesAndOtherDelimiterCharacters PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertListWithMixedValuesToListWithoutSchema STARTED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertListWithMixedValuesToListWithoutSchema PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertMapWithStringKeys STARTED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertMapWithStringKeys PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldParseStringOfMapWithStringValuesWithWhitespaceAsMap STARTED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldParseStringOfMapWithStringValuesWithWhitespaceAsMap PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertMapWithStringKeysAndShortValues STARTED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertMapWithStringKeysAndShortValues PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldParseStringOfMapWithStringValuesWi