[jira] [Created] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message

2018-06-13 Thread Robin Moffatt (JIRA)
Robin Moffatt created KAFKA-7052:


 Summary: ExtractField SMT throws NPE - needs clearer error message
 Key: KAFKA-7052
 URL: https://issues.apache.org/jira/browse/KAFKA-7052
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Robin Moffatt


With the following Single Message Transform: 
{code:java}
"transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractId.field":"id"{code}
Kafka Connect errors with : 
{code:java}
java.lang.NullPointerException
at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
at 
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code}
There should be a better error message here, identifying the reason for the NPE.

Version: Confluent Platform 4.1.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7053) Spring Kafka SSL producers throwing TimeoutExceptions

2018-06-13 Thread Guido Josquin (JIRA)
Guido Josquin created KAFKA-7053:


 Summary: Spring Kafka SSL producers throwing TimeoutExceptions
 Key: KAFKA-7053
 URL: https://issues.apache.org/jira/browse/KAFKA-7053
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 1.0.1
Reporter: Guido Josquin


# Problem description

Spring Kafka SSL producers are throwing timeouts when producing to a Kafka 
cluster with 3 brokers. I set up the cluster on Kubernetes according to 
[https://github.com/Yolean/kubernetes-kafka] , altering only the broker config 
and services to arrange outside access.

Some observations:

- The SSL configuration seems correct, since messages are sent and received for 
large periods of time (e.g. hours) before showing intermittent problems. 
- Both the server and clients have plenty of resources left.
- The network is stable and powerful enough (200 mbit, no downtime in the last 
year).
- The errors do not appear in a PLAINTEXT setup.
- Configurations for the three brokers are identical apart from their broker ID 
and listeners.
- No errors appear in the broker log or Spring Kafka consumers.

`2018-06-06 11:15:44.103 ERROR 1 --- [ad | producer-1] 
o.s.k.support.LoggingProducerListener : Exception thrown when sending a message 
with key='null' and payload='[...redacted...]':
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
topicname-0: 30001 ms has passed since last append`

# Detailed description

https://stackoverflow.com/questions/50725643/spring-kafka-producers-throwing-timeoutexceptions

# Question

How can I find the cause of this problem?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7053) Spring Kafka SSL producers throwing TimeoutExceptions

2018-06-13 Thread Guido Josquin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guido Josquin updated KAFKA-7053:
-
Description: 
h1. Problem description

Spring Kafka SSL producers are throwing timeouts when producing to a Kafka 
cluster with 3 brokers. I set up the cluster on Kubernetes according to 
[https://github.com/Yolean/kubernetes-kafka] , altering only the broker config 
and services to arrange outside access.

Some observations:
 - The SSL configuration seems correct, since messages are sent and received 
for large periods of time (e.g. hours) before showing intermittent problems.
 - Both the server and clients have plenty of resources left.
 - The network is stable and powerful enough (200 mbit, no downtime in the last 
year).
 - The errors do not appear in a PLAINTEXT setup.
 - Configurations for the three brokers are identical apart from their broker 
ID and listeners.
 - No errors appear in the broker log or Spring Kafka consumers.

{code:java}
2018-06-06 11:15:44.103 ERROR 1 — [ad | producer-1] 
o.s.k.support.LoggingProducerListener : Exception thrown when sending a message 
with key='null' and payload='[...redacted...]':
 org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
topicname-0: 30001 ms has passed since last append{code}
h1. Detailed description

[https://stackoverflow.com/questions/50725643/spring-kafka-producers-throwing-timeoutexceptions]
h1. Question

How can I find the cause of this problem?

  was:
# Problem description

Spring Kafka SSL producers are throwing timeouts when producing to a Kafka 
cluster with 3 brokers. I set up the cluster on Kubernetes according to 
[https://github.com/Yolean/kubernetes-kafka] , altering only the broker config 
and services to arrange outside access.

Some observations:

- The SSL configuration seems correct, since messages are sent and received for 
large periods of time (e.g. hours) before showing intermittent problems. 
- Both the server and clients have plenty of resources left.
- The network is stable and powerful enough (200 mbit, no downtime in the last 
year).
- The errors do not appear in a PLAINTEXT setup.
- Configurations for the three brokers are identical apart from their broker ID 
and listeners.
- No errors appear in the broker log or Spring Kafka consumers.

`2018-06-06 11:15:44.103 ERROR 1 --- [ad | producer-1] 
o.s.k.support.LoggingProducerListener : Exception thrown when sending a message 
with key='null' and payload='[...redacted...]':
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
topicname-0: 30001 ms has passed since last append`

# Detailed description

https://stackoverflow.com/questions/50725643/spring-kafka-producers-throwing-timeoutexceptions

# Question

How can I find the cause of this problem?


> Spring Kafka SSL producers throwing TimeoutExceptions
> -
>
> Key: KAFKA-7053
> URL: https://issues.apache.org/jira/browse/KAFKA-7053
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.0.1
>Reporter: Guido Josquin
>Priority: Major
>
> h1. Problem description
> Spring Kafka SSL producers are throwing timeouts when producing to a Kafka 
> cluster with 3 brokers. I set up the cluster on Kubernetes according to 
> [https://github.com/Yolean/kubernetes-kafka] , altering only the broker 
> config and services to arrange outside access.
> Some observations:
>  - The SSL configuration seems correct, since messages are sent and received 
> for large periods of time (e.g. hours) before showing intermittent problems.
>  - Both the server and clients have plenty of resources left.
>  - The network is stable and powerful enough (200 mbit, no downtime in the 
> last year).
>  - The errors do not appear in a PLAINTEXT setup.
>  - Configurations for the three brokers are identical apart from their broker 
> ID and listeners.
>  - No errors appear in the broker log or Spring Kafka consumers.
> {code:java}
> 2018-06-06 11:15:44.103 ERROR 1 — [ad | producer-1] 
> o.s.k.support.LoggingProducerListener : Exception thrown when sending a 
> message with key='null' and payload='[...redacted...]':
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> topicname-0: 30001 ms has passed since last append{code}
> h1. Detailed description
> [https://stackoverflow.com/questions/50725643/spring-kafka-producers-throwing-timeoutexceptions]
> h1. Question
> How can I find the cause of this problem?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message

2018-06-13 Thread Robin Moffatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510814#comment-16510814
 ] 

Robin Moffatt commented on KAFKA-7052:
--

Tested on Debezium 0.8 (AK 1.1.0) and Debezium 0.7.4 (AK 1.0.0) docker images. 

 

Connector: 
{code:java}
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "mysql-source-demo-customers-raw",
"config": {
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "43",
"database.server.name": "asgard",
"table.whitelist": "demo.customers",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "dbhistory.demo-raw" ,
"include.schema.changes": "true",
"transforms": "ExtractId",
"transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractId.field":"id" }
}'{code}
Source schema + data in MySQL: 
{code:java}
create table CUSTOMERS (
id INT PRIMARY KEY,
first_name VARCHAR(50),
last_name VARCHAR(50),
email VARCHAR(50),
gender VARCHAR(50),
club_status VARCHAR(8),
comments VARCHAR(90),
create_ts timestamp DEFAULT CURRENT_TIMESTAMP ,
update_ts timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

insert into CUSTOMERS (id, first_name, last_name, email, gender, club_status, 
comments) values (1, 'Rica', 'Blaisdell', 'rblaisde...@rambler.ru', 'Female', 
'bronze', 'Universal optimal hierarchy');
{code}

> ExtractField SMT throws NPE - needs clearer error message
> -
>
> Key: KAFKA-7052
> URL: https://issues.apache.org/jira/browse/KAFKA-7052
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>
> With the following Single Message Transform: 
> {code:java}
> "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
> "transforms.ExtractId.field":"id"{code}
> Kafka Connect errors with : 
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
> at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code}
> There should be a better error message here, identifying the reason for the 
> NPE.
> Version: Confluent Platform 4.1.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7054) Kafka describe command should throw topic doesn't exist exception.

2018-06-13 Thread Manohar Vanam (JIRA)
Manohar Vanam created KAFKA-7054:


 Summary: Kafka describe command should throw topic doesn't exist 
exception.
 Key: KAFKA-7054
 URL: https://issues.apache.org/jira/browse/KAFKA-7054
 Project: Kafka
  Issue Type: Improvement
  Components: admin
Reporter: Manohar Vanam


If topic doesn't exist then Kafka describe command should throw topic doesn't 
exist exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7054) Kafka describe command should throw topic doesn't exist exception.

2018-06-13 Thread Manohar Vanam (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510858#comment-16510858
 ] 

Manohar Vanam commented on KAFKA-7054:
--

I am not able to assign this issue to myself , can someone please add me as a 
contributer.

> Kafka describe command should throw topic doesn't exist exception.
> --
>
> Key: KAFKA-7054
> URL: https://issues.apache.org/jira/browse/KAFKA-7054
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Manohar Vanam
>Priority: Minor
>
> If topic doesn't exist then Kafka describe command should throw topic doesn't 
> exist exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7054) Kafka describe command should throw topic doesn't exist exception.

2018-06-13 Thread Manohar Vanam (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manohar Vanam updated KAFKA-7054:
-
Description: 
If topic doesn't exist then Kafka describe command should throw topic doesn't 
exist exception.

like alter and delete commands :
{code:java}
local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --delete --topic 
manu
Error while executing topic command : Topic manu does not exist on ZK path 
localhost:2181
[2018-06-13 15:08:13,111] ERROR java.lang.IllegalArgumentException: Topic manu 
does not exist on ZK path localhost:2181
 at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91)
 at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:184)
 at kafka.admin.TopicCommand$.main(TopicCommand.scala:71)
 at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)
local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --alter --topic 
manu
Error while executing topic command : Topic manu does not exist on ZK path 
localhost:2181
[2018-06-13 15:08:43,663] ERROR java.lang.IllegalArgumentException: Topic manu 
does not exist on ZK path localhost:2181
 at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91)
 at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:125)
 at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)
 at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$){code}

  was:If topic doesn't exist then Kafka describe command should throw topic 
doesn't exist exception.


> Kafka describe command should throw topic doesn't exist exception.
> --
>
> Key: KAFKA-7054
> URL: https://issues.apache.org/jira/browse/KAFKA-7054
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Manohar Vanam
>Priority: Minor
>
> If topic doesn't exist then Kafka describe command should throw topic doesn't 
> exist exception.
> like alter and delete commands :
> {code:java}
> local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --delete 
> --topic manu
> Error while executing topic command : Topic manu does not exist on ZK path 
> localhost:2181
> [2018-06-13 15:08:13,111] ERROR java.lang.IllegalArgumentException: Topic 
> manu does not exist on ZK path localhost:2181
>  at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91)
>  at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:184)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:71)
>  at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$)
> local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --alter 
> --topic manu
> Error while executing topic command : Topic manu does not exist on ZK path 
> localhost:2181
> [2018-06-13 15:08:43,663] ERROR java.lang.IllegalArgumentException: Topic 
> manu does not exist on ZK path localhost:2181
>  at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91)
>  at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:125)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)
>  at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6614) kafka-streams to configure internal topics message.timestamp.type=CreateTime

2018-06-13 Thread Meghana Gupta (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510897#comment-16510897
 ] 

Meghana Gupta commented on KAFKA-6614:
--

Hi, I'm new to contributing to Kafka. Can I take this up?

> kafka-streams to configure internal topics message.timestamp.type=CreateTime
> 
>
> Key: KAFKA-6614
> URL: https://issues.apache.org/jira/browse/KAFKA-6614
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Dmitry Vsekhvalnov
>Priority: Minor
>  Labels: newbie
>
> After fixing KAFKA-4785 all internal topics using built-in 
> *RecordMetadataTimestampExtractor* to read timestamps.
> Which doesn't seem to work correctly out of box with kafka brokers configured 
> with *log.message.timestamp.type=LogAppendTime* when using custom message 
> timestamp extractor.
> Example use-case windowed grouping + aggregation on late data:
> {code:java}
> KTable, Long> summaries = in
>    .groupBy((key, value) -> ..)
>    .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
>    .count();{code}
> when processing late events:
>  # custom timestamp extractor will pick up timestamp in the past from message 
> (let's say hour ago)
>  # re-partition topic during grouping phase will be written back to kafka 
> using timestamp from (1)
>  # kafka broker will ignore provided timestamp in (2) to favor ingestion time
>  # streams lib will read re-partitioned topic back with 
> RecordMetadataTimestampExtractor
>  # and will get ingestion timestamp (3), which usually close to "now"
>  # window start/end will be incorrectly set based on "now" instead of 
> original timestamp from payload
> Understand there are ways to configure per-topic timestamp type in kafka 
> brokers to solve this, but it will be really nice if kafka-streams library 
> can take care of it itself.
> To follow "least-surprise" principle.  If library relies on timestamp.type 
> for topic it manages it should enforce it.
> CC [~guozhang] based on user group email discussion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2018-06-13 Thread Ben Stopford (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510911#comment-16510911
 ] 

Ben Stopford commented on KAFKA-4113:
-

Whilst I like the 'time-aligned' approach to loading KTables very much, it 
definitely catches people out. I think this is compounded by the fact that 
GKTables don't behave like this (they bootstrap themselves on startup rather 
than being time aligned).

Different use cases actually better suit one or the other (as noted above). So 
for example, if you're joining Orders to Customers and doing reprocessing you 
might want the 'as at' version of the customer (say with an old email address) 
or the latest version of the customer (with their most recent email).

So I think KStreams should support both (a) preloaded or (b) event time ideally 
in both types of table, letting the user define the behaviour.

I've tried to explain the background to this in a bit more detail 
[here|http://www.benstopford.com/2018/06/13/things-can-trip-building-streams-apps/].
 

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7054) Kafka describe command should throw topic doesn't exist exception.

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510930#comment-16510930
 ] 

ASF GitHub Bot commented on KAFKA-7054:
---

ManoharVanam opened a new pull request #5211: [KAFKA-7054] Kafka describe 
command should throw topic doesn't exist exception
URL: https://github.com/apache/kafka/pull/5211
 
 
   User Interface Improvement : If topic doesn't exist then Kafka describe 
command should throw topic doesn't exist exception, like alter and delete 
commands
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka describe command should throw topic doesn't exist exception.
> --
>
> Key: KAFKA-7054
> URL: https://issues.apache.org/jira/browse/KAFKA-7054
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Manohar Vanam
>Priority: Minor
>
> If topic doesn't exist then Kafka describe command should throw topic doesn't 
> exist exception.
> like alter and delete commands :
> {code:java}
> local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --delete 
> --topic manu
> Error while executing topic command : Topic manu does not exist on ZK path 
> localhost:2181
> [2018-06-13 15:08:13,111] ERROR java.lang.IllegalArgumentException: Topic 
> manu does not exist on ZK path localhost:2181
>  at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91)
>  at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:184)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:71)
>  at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$)
> local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --alter 
> --topic manu
> Error while executing topic command : Topic manu does not exist on ZK path 
> localhost:2181
> [2018-06-13 15:08:43,663] ERROR java.lang.IllegalArgumentException: Topic 
> manu does not exist on ZK path localhost:2181
>  at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91)
>  at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:125)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)
>  at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7048) NPE when creating connector

2018-06-13 Thread Chia-Ping Tsai (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511008#comment-16511008
 ] 

Chia-Ping Tsai commented on KAFKA-7048:
---

This issue almost destroys the connector of kafka 2.x since only first connect 
won't encounter the NPE. All subsequent requests used to create connectors will 
fail because of the NPE.

> NPE when creating connector
> ---
>
> Key: KAFKA-7048
> URL: https://issues.apache.org/jira/browse/KAFKA-7048
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 2.0.0
>
>
> KAFKA-6886 introduced the ConfigTransformer to transform the given 
> configuration data. ConfigTransformer#transform(Map) expect 
> the passed config won't be null but DistributedHerder#putConnectorConfig call 
> the #transform before updating the snapshot (see below). Hence, it cause the 
> NPE. 
> {code:java}
> // Note that we use the updated connector config despite the fact that we 
> don't have an updated
> // snapshot yet. The existing task info should still be accurate.
> Map map = configState.connectorConfig(connName);
> ConnectorInfo info = new ConnectorInfo(connName, config, 
> configState.tasks(connName),
> map == null ? null : 
> connectorTypeForClass(map.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
> callback.onCompletion(null, new Created<>(!exists, info));
> return null;{code}
> We can add a null check to "configs" (see below) to resolve the NPE. It means 
> we WON'T pass the null configs to configTransformer
> {code:java}
> public Map connectorConfig(String connector) {
> Map configs = connectorConfigs.get(connector);
> if (configTransformer != null) { // add a condition "configs != null"
> configs = configTransformer.transform(connector, configs);
> }
> return configs;
> }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7035) Kafka Processor's init() method sometimes is not called

2018-06-13 Thread Antony Stubbs (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511063#comment-16511063
 ] 

Antony Stubbs commented on KAFKA-7035:
--

Call your punctuate processor from the close() method?

> Kafka Processor's init() method sometimes is not called
> ---
>
> Key: KAFKA-7035
> URL: https://issues.apache.org/jira/browse/KAFKA-7035
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Oleksandr Konopko
>Priority: Critical
> Attachments: TransformProcessor.java
>
>
> Scenario:
> 1. We have processing of Kafka Topic which is implemented with Processor API
> 2. We want to collect metrics (lets say just count number of processed 
> entities for simplicity)
> 3. How we tried to organize this
>  * process data with process() method and send it down the stream with context
>  * on each call of process() method update the counter
>  * schedule puctuate function which will send metric to special topic. Metric 
> is build with counter
> You can find the code (we removed all business sensitive code out of it, so 
> it should be easy to read) in attachment
>  
> Problematic Kafka Streams behaviour that i can see by logging every step:
> 1. We have 80 messages in the input topic
> 2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, 
> ProcessorB, ProcessorC and ProcessorD
> 3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed 
> correctly, results are sent down the stream. Counter is upated
> 4. init() method was not called for ProcessorA and ProcessorB
> 5. ProcessorC and ProcessorD are created and they start to receive all the 
> rest of data. 95-99%
> 6. init() method is called for both ProcessorC and ProcessorD. It initiates 
> punctuation, which causes Metrics message be created and sent down the metric 
> stream periodically
> 7. ProcessorA and ProcessorB are closed. init() was never called for them. So 
> Metric entity was not sent to metrics topic
> 8. Processing is finished.
>  
> In the end:
> Expected:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to 80
> Actual results:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to some number 3-6% less 
> than 80, for example 786543
>  
> Problem:
>  * init() method call is not guaranteed
>  * there is no way to guarantee that all work was done by punctuate method 
> before close()
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread Nikki Thean (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikki Thean updated KAFKA-7055:
---
Description: 
The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. Here is an 
[example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
 where you forward using name of downstream node rather than child index.

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg, using [this 
method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]].

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like [this one for when a user attempts to access a state 
store that is not connected to the 
processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].]

  was:
The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. Here is an 
[example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
 where you forward using name of downstream node rather than child index.

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg, using [this 
method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].]

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like [this one for when a user attempts to access a state 
store that is not connected to the 
processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].]


> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> -
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Nikki Thean
>Assignee: Nikki Thean
>Priority: Minor
>  Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an 
> [example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
>  where you forward using name of downstream node rather than child index.
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using [this 
> method|[https://github.com/apache/kafka/blob/trunk/

[jira] [Created] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread Nikki Thean (JIRA)
Nikki Thean created KAFKA-7055:
--

 Summary: Kafka Streams Processor API allows you to add sinks and 
processors without parent
 Key: KAFKA-7055
 URL: https://issues.apache.org/jira/browse/KAFKA-7055
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
Reporter: Nikki Thean
Assignee: Nikki Thean


The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. 
([example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
 where you forward using name of downstream node rather than child index)

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg (using [this 
method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]).]

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like [this one for when a user attempts to access a state 
store that is not connected to the 
processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread Nikki Thean (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikki Thean updated KAFKA-7055:
---
Description: 
The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. Here is an 
[example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
 where you forward using name of downstream node rather than child index.

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg, using 
[this|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]]
 method.

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like 
[this|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]]
 one for when a user attempts to access a state store that is not connected to 
the processor.

  was:
The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. Here is an 
[example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
 where you forward using name of downstream node rather than child index.

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg, using [this 
method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]].

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like [this one for when a user attempts to access a state 
store that is not connected to the 
processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].]


> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> -
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Nikki Thean
>Assignee: Nikki Thean
>Priority: Minor
>  Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an 
> [example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
>  where you forward using name of downstream node rather than child index.
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using 
> [this|[https://github.com/apache/kafka/blob/trunk/strea

[jira] [Commented] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread Nikki Thean (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511195#comment-16511195
 ] 

Nikki Thean commented on KAFKA-7055:


If project reviewers feel this is a reasonable approach to take, I'm happy to 
submit my own pull request and link it to the ticket!

> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> -
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Nikki Thean
>Assignee: Nikki Thean
>Priority: Minor
>  Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. 
> ([example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
>  where you forward using name of downstream node rather than child index)
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg (using [this 
> method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]).]
> As any attempt to forward a message to those nodes will throw a 
> StreamsException, I suggest throwing an exception if a processor or sink is 
> added without at least one upstream node. There is a method in 
> `InternalTopologyBuilder` that allows you to connect processors by name after 
> you add them to the topology, but it is not part of the external Processor 
> API.
> In addition (or alternatively), I suggest making [the error message for when 
> users try to forward messages to a node that is not 
> connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
>  more descriptive, like [this one for when a user attempts to access a state 
> store that is not connected to the 
> processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread Nikki Thean (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikki Thean updated KAFKA-7055:
---
Description: 
The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. Here is an example where you forward using name of downstream node 
rather than child index 
(https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117).

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg, using this method: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423.

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like this one for when a user attempts to access a state 
store that is not connected to the processor: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]]

  was:
The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. Here is an 
[example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
 where you forward using name of downstream node rather than child index.

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg, using 
[this|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]]
 method.

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like 
[this|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]]
 one for when a user attempts to access a state store that is not connected to 
the processor.


> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> -
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Nikki Thean
>Assignee: Nikki Thean
>Priority: Minor
>  Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an example where you forward using name of downstream node 
> rather than child index 
> (https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117).
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using this method: 
> https://github.com/apache/kafka/blob/trunk/streams

[jira] [Updated] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread Nikki Thean (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikki Thean updated KAFKA-7055:
---
Description: 
The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. Here is an 
[example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
 where you forward using name of downstream node rather than child index.

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg, using [this 
method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].]

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like [this one for when a user attempts to access a state 
store that is not connected to the 
processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].]

  was:
The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. 
([example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
 where you forward using name of downstream node rather than child index)

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg (using [this 
method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]).]

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like [this one for when a user attempts to access a state 
store that is not connected to the 
processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].]


> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> -
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Nikki Thean
>Assignee: Nikki Thean
>Priority: Minor
>  Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an 
> [example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
>  where you forward using name of downstream node rather than child index.
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using [this 
> method|[https://github.com/apache/kafka/blob/trunk/streams/s

[jira] [Updated] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread Nikki Thean (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikki Thean updated KAFKA-7055:
---
Description: 
The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. Here is an example where you forward using name of downstream node 
rather than child index 
([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]).

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg, using this method: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like this one for when a user attempts to access a state 
store that is not connected to the processor: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]

  was:
The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. Here is an example where you forward using name of downstream node 
rather than child index 
(https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117).

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg, using this method: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423.

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like this one for when a user attempts to access a state 
store that is not connected to the processor: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]]


> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> -
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Nikki Thean
>Assignee: Nikki Thean
>Priority: Minor
>  Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an example where you forward using name of downstream node 
> rather than child index 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]).
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using this method: 
> [https://github.com/apache/kafka/blob/trunk/streams/s

[jira] [Commented] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread Nikki Thean (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511202#comment-16511202
 ] 

Nikki Thean commented on KAFKA-7055:


Sorry about the inconsistent link format; Jira decided to convert only one of 
the links I formatted.

> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> -
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Nikki Thean
>Assignee: Nikki Thean
>Priority: Minor
>  Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an example where you forward using name of downstream node 
> rather than child index 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]).
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using this method: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].
> As any attempt to forward a message to those nodes will throw a 
> StreamsException, I suggest throwing an exception if a processor or sink is 
> added without at least one upstream node. There is a method in 
> `InternalTopologyBuilder` that allows you to connect processors by name after 
> you add them to the topology, but it is not part of the external Processor 
> API.
> In addition (or alternatively), I suggest making [the error message for when 
> users try to forward messages to a node that is not 
> connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
>  more descriptive, like this one for when a user attempts to access a state 
> store that is not connected to the processor: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7007) Use JSON for /kafka-acl-extended-changes path

2018-06-13 Thread Andy Coates (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511296#comment-16511296
 ] 

Andy Coates commented on KAFKA-7007:


Final design was not, as the Jira requested, to move all ACL changes to a 
single path. This proved unworkable. ACL changes are still on two paths, but 
now the new path has JSON values to help with any future changes.

> Use JSON for /kafka-acl-extended-changes path
> -
>
> Key: KAFKA-7007
> URL: https://issues.apache.org/jira/browse/KAFKA-7007
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, security
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
> Fix For: 2.0.0
>
>
> Relating to one of the outstanding work items in PR 
> [#5117|[https://github.com/apache/kafka/pull/5117]...]
>  
> Keep Literal ACLs on the old paths, using the old formats, to maintain 
> backwards compatibility.
> Have Prefixed, and any latter types, go on new paths, using JSON, (old 
> brokers are not aware of them).
> Add checks to reject any adminClient requests to add prefixed acls before the 
> cluster is fully upgraded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6614) kafka-streams to configure internal topics message.timestamp.type=CreateTime

2018-06-13 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6614:
--

Assignee: Meghana Gupta

> kafka-streams to configure internal topics message.timestamp.type=CreateTime
> 
>
> Key: KAFKA-6614
> URL: https://issues.apache.org/jira/browse/KAFKA-6614
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Dmitry Vsekhvalnov
>Assignee: Meghana Gupta
>Priority: Minor
>  Labels: newbie
>
> After fixing KAFKA-4785 all internal topics using built-in 
> *RecordMetadataTimestampExtractor* to read timestamps.
> Which doesn't seem to work correctly out of box with kafka brokers configured 
> with *log.message.timestamp.type=LogAppendTime* when using custom message 
> timestamp extractor.
> Example use-case windowed grouping + aggregation on late data:
> {code:java}
> KTable, Long> summaries = in
>    .groupBy((key, value) -> ..)
>    .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
>    .count();{code}
> when processing late events:
>  # custom timestamp extractor will pick up timestamp in the past from message 
> (let's say hour ago)
>  # re-partition topic during grouping phase will be written back to kafka 
> using timestamp from (1)
>  # kafka broker will ignore provided timestamp in (2) to favor ingestion time
>  # streams lib will read re-partitioned topic back with 
> RecordMetadataTimestampExtractor
>  # and will get ingestion timestamp (3), which usually close to "now"
>  # window start/end will be incorrectly set based on "now" instead of 
> original timestamp from payload
> Understand there are ways to configure per-topic timestamp type in kafka 
> brokers to solve this, but it will be really nice if kafka-streams library 
> can take care of it itself.
> To follow "least-surprise" principle.  If library relies on timestamp.type 
> for topic it manages it should enforce it.
> CC [~guozhang] based on user group email discussion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6614) kafka-streams to configure internal topics message.timestamp.type=CreateTime

2018-06-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511386#comment-16511386
 ] 

Matthias J. Sax commented on KAFKA-6614:


Thanks for your interest in contributing to Kafka! I added you to the list on 
contributors and assigned the ticket to you. You can know also self-assign 
tickets. Looking forward to your PR.

> kafka-streams to configure internal topics message.timestamp.type=CreateTime
> 
>
> Key: KAFKA-6614
> URL: https://issues.apache.org/jira/browse/KAFKA-6614
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Dmitry Vsekhvalnov
>Assignee: Meghana Gupta
>Priority: Minor
>  Labels: newbie
>
> After fixing KAFKA-4785 all internal topics using built-in 
> *RecordMetadataTimestampExtractor* to read timestamps.
> Which doesn't seem to work correctly out of box with kafka brokers configured 
> with *log.message.timestamp.type=LogAppendTime* when using custom message 
> timestamp extractor.
> Example use-case windowed grouping + aggregation on late data:
> {code:java}
> KTable, Long> summaries = in
>    .groupBy((key, value) -> ..)
>    .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
>    .count();{code}
> when processing late events:
>  # custom timestamp extractor will pick up timestamp in the past from message 
> (let's say hour ago)
>  # re-partition topic during grouping phase will be written back to kafka 
> using timestamp from (1)
>  # kafka broker will ignore provided timestamp in (2) to favor ingestion time
>  # streams lib will read re-partitioned topic back with 
> RecordMetadataTimestampExtractor
>  # and will get ingestion timestamp (3), which usually close to "now"
>  # window start/end will be incorrectly set based on "now" instead of 
> original timestamp from payload
> Understand there are ways to configure per-topic timestamp type in kafka 
> brokers to solve this, but it will be really nice if kafka-streams library 
> can take care of it itself.
> To follow "least-surprise" principle.  If library relies on timestamp.type 
> for topic it manages it should enforce it.
> CC [~guozhang] based on user group email discussion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7056) Connect's new numeric converters should be in a different package

2018-06-13 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-7056:


 Summary: Connect's new numeric converters should be in a different 
package
 Key: KAFKA-7056
 URL: https://issues.apache.org/jira/browse/KAFKA-7056
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
Reporter: Randall Hauch
Assignee: Randall Hauch
 Fix For: 2.0.0


KIP-305 added several new primitive converters, but placed them alongside 
{{StringConverter}} in the {{...connect.storage}} package rather than alongside 
{{ByteArrayConverter}} in the {{...connect.converters}} package. We should move 
them to the {{converters}} package.

Need to also update the plugins whitelist (see KAFKA-7043).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511401#comment-16511401
 ] 

Guozhang Wang commented on KAFKA-7055:
--

[~nthean] Thanks for reporting the issue, and I think it is a better idea to 
throw an exception at the parsing time, not at runtime (i.e. your first 
option). More specifically in `InternalTopologyBuilder#addProcessor / #addSink` 
we should check that `predecessorNames` size is at least one.

> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> -
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Nikki Thean
>Assignee: Nikki Thean
>Priority: Minor
>  Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an example where you forward using name of downstream node 
> rather than child index 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]).
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using this method: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].
> As any attempt to forward a message to those nodes will throw a 
> StreamsException, I suggest throwing an exception if a processor or sink is 
> added without at least one upstream node. There is a method in 
> `InternalTopologyBuilder` that allows you to connect processors by name after 
> you add them to the topology, but it is not part of the external Processor 
> API.
> In addition (or alternatively), I suggest making [the error message for when 
> users try to forward messages to a node that is not 
> connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
>  more descriptive, like this one for when a user attempts to access a state 
> store that is not connected to the processor: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7056) Connect's new numeric converters should be in a different package

2018-06-13 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-7056:
-
Description: 
KIP-305 added several new primitive converters, but placed them alongside 
{{StringConverter}} in the {{...connect.storage}} package rather than alongside 
{{ByteArrayConverter}} in the {{...connect.converters}} package. We should move 
them to the {{converters}} package. See 
https://github.com/apache/kafka/pull/5198 for a discussion.

Need to also update the plugins whitelist (see KAFKA-7043).

  was:
KIP-305 added several new primitive converters, but placed them alongside 
{{StringConverter}} in the {{...connect.storage}} package rather than alongside 
{{ByteArrayConverter}} in the {{...connect.converters}} package. We should move 
them to the {{converters}} package.

Need to also update the plugins whitelist (see KAFKA-7043).


> Connect's new numeric converters should be in a different package
> -
>
> Key: KAFKA-7056
> URL: https://issues.apache.org/jira/browse/KAFKA-7056
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
> Fix For: 2.0.0
>
>
> KIP-305 added several new primitive converters, but placed them alongside 
> {{StringConverter}} in the {{...connect.storage}} package rather than 
> alongside {{ByteArrayConverter}} in the {{...connect.converters}} package. We 
> should move them to the {{converters}} package. See 
> https://github.com/apache/kafka/pull/5198 for a discussion.
> Need to also update the plugins whitelist (see KAFKA-7043).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-4113) Allow KTable bootstrap

2018-06-13 Thread Ben Stopford (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510911#comment-16510911
 ] 

Ben Stopford edited comment on KAFKA-4113 at 6/13/18 4:49 PM:
--

Whilst I like the 'time-aligned' approach to loading KTables very much, it 
definitely catches people out. I think this is compounded by the fact that 
GKTables don't behave like this (they bootstrap themselves on startup rather 
than being time aligned).

Different use cases actually better suit one or the other (as noted above). So 
for example, if you're joining Orders to Customers and doing reprocessing you 
might want the 'as at' version of the customer (say with an old email address) 
or the latest version of the customer (with their most recent email).

So I think KStreams should support both (a) preloaded or (b) event time ideally 
in both types of table, letting the user define the behaviour.


was (Author: benstopford):
Whilst I like the 'time-aligned' approach to loading KTables very much, it 
definitely catches people out. I think this is compounded by the fact that 
GKTables don't behave like this (they bootstrap themselves on startup rather 
than being time aligned).

Different use cases actually better suit one or the other (as noted above). So 
for example, if you're joining Orders to Customers and doing reprocessing you 
might want the 'as at' version of the customer (say with an old email address) 
or the latest version of the customer (with their most recent email).

So I think KStreams should support both (a) preloaded or (b) event time ideally 
in both types of table, letting the user define the behaviour.

I've tried to explain the background to this in a bit more detail 
[here|http://www.benstopford.com/2018/06/13/things-can-trip-building-streams-apps/].
 

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511436#comment-16511436
 ] 

ASF GitHub Bot commented on KAFKA-7055:
---

nixsticks opened a new pull request #5214: KAFKA-7055: Update 
InternalTopologyBuilder to throw TopologyException…
URL: https://github.com/apache/kafka/pull/5214
 
 
   … if a processor or sink is added with no upstream node attached
   
   InternalTopologyBuilder throws an exception if a sink or a processor is 
added without at least one upstream node, as records cannot be forwarded 
downstream to an unconnected node. This does not prevent users from attempting 
to forward to unconnected nodes, but it does prevent them from attaching 
effectively useless downstream nodes, and the error message for forwarding to 
an unconnected node has been updated to be slightly more specific.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> -
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Nikki Thean
>Assignee: Nikki Thean
>Priority: Minor
>  Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an example where you forward using name of downstream node 
> rather than child index 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]).
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using this method: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].
> As any attempt to forward a message to those nodes will throw a 
> StreamsException, I suggest throwing an exception if a processor or sink is 
> added without at least one upstream node. There is a method in 
> `InternalTopologyBuilder` that allows you to connect processors by name after 
> you add them to the topology, but it is not part of the external Processor 
> API.
> In addition (or alternatively), I suggest making [the error message for when 
> users try to forward messages to a node that is not 
> connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
>  more descriptive, like this one for when a user attempts to access a state 
> store that is not connected to the processor: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511455#comment-16511455
 ] 

ASF GitHub Bot commented on KAFKA-6474:
---

guozhangwang closed pull request #4986: KAFKA-6474: Rewrite tests to use new 
public TopologyTestDriver [part 2]
URL: https://github.com/apache/kafka/pull/4986
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
new file mode 100644
index 000..bec4b5f79ff
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+
+import java.util.Properties;
+
+/**
+ * This class provides access to {@link TopologyTestDriver} protected methods.
+ * It should only be used for internal testing, in the rare occasions where the
+ * necessary functionality is not supported by {@link TopologyTestDriver}.
+ */
+public class TopologyTestDriverWrapper extends TopologyTestDriver {
+
+
+public TopologyTestDriverWrapper(final Topology topology,
+ final Properties config) {
+super(topology, config);
+}
+
+/**
+ * Get the processor context, setting the processor whose name is given as 
current node
+ *
+ * @param processorName processor name to set as current node
+ * @return the processor context
+ */
+public ProcessorContext setCurrentNodeForProcessorContext(final String 
processorName) {
+final ProcessorContext context = task.context();
+((ProcessorContextImpl) 
context).setCurrentNode(getProcessor(processorName));
+return context;
+}
+
+/**
+ * Get a processor by name
+ *
+ * @param name the name to search for
+ * @return the processor matching the search name
+ */
+public ProcessorNode getProcessor(final String name) {
+for (final ProcessorNode node : processorTopology.processors()) {
+if (node.name().equals(name)) {
+return node;
+}
+}
+for (final ProcessorNode node : globalTopology.processors()) {
+if (node.name().equals(name)) {
+return node;
+}
+}
+throw new StreamsException("Could not find a processor named '" + name 
+ "'");
+}
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index c37078df99c..2cf192b9f45 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -16,45 +16,40 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.s

[jira] [Commented] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511476#comment-16511476
 ] 

ASF GitHub Bot commented on KAFKA-7055:
---

nixsticks closed pull request #5214: KAFKA-7055: Update InternalTopologyBuilder 
to throw TopologyException…
URL: https://github.com/apache/kafka/pull/5214
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 36a2edc6766..cf50c1c4bed 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -442,6 +442,10 @@ public final void addSource(final Topology.AutoOffsetReset 
offsetReset,
  final String... predecessorNames) {
 Objects.requireNonNull(name, "name must not be null");
 Objects.requireNonNull(topic, "topic must not be null");
+if (predecessorNames.length == 0) {
+throw new TopologyException("Sink " + name + " must have at least 
one parent");
+}
+
 addSink(name, new StaticTopicNameExtractor(topic), 
keySerializer, valSerializer, partitioner, predecessorNames);
 nodeToSinkTopic.put(name, topic);
 }
@@ -457,6 +461,9 @@ public final void addSource(final Topology.AutoOffsetReset 
offsetReset,
 if (nodeFactories.containsKey(name)) {
 throw new TopologyException("Processor " + name + " is already 
added.");
 }
+if (predecessorNames.length == 0) {
+throw new TopologyException("Sink " + name + " must have at least 
one parent");
+}
 
 for (final String predecessor : predecessorNames) {
 Objects.requireNonNull(predecessor, "predecessor name can't be 
null");
@@ -484,6 +491,9 @@ public final void addProcessor(final String name,
 if (nodeFactories.containsKey(name)) {
 throw new TopologyException("Processor " + name + " is already 
added.");
 }
+if (predecessorNames.length == 0) {
+throw new TopologyException("Processor " + name + " must have at 
least one parent");
+}
 
 for (final String predecessor : predecessorNames) {
 Objects.requireNonNull(predecessor, "predecessor name must not be 
null");
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index a539a1bcda0..f1ee81ff367 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -116,7 +116,8 @@ public StateStore getStateStore(final String name) {
 if (sendTo != null) {
 final ProcessorNode child = currentNode().getChild(sendTo);
 if (child == null) {
-throw new StreamsException("Unknown processor name: " + 
sendTo);
+throw new StreamsException("Unknown downstream node: " + 
sendTo + " either does not exist or is not" +
+" connected to this processor.");
 }
 forward(child, key, value);
 } else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 8b478852c41..63d5b18a3ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -174,6 +174,15 @@ public void shouldNotAllowToAddProcessorWithSameName() {
 } catch (final TopologyException expected) { }
 }
 
+@Test
+public void shouldNotAllowToAddProcessorWithoutAtLeastOneParent() {
+topology.addSource("source", "topic-1");
+try {
+topology.addProcessor("processor", new MockProcessorSupplier());
+fail("Should throw TopologyException for processor without at 
least one parent node");
+} catch (final TopologyException expected) { }
+}
+
 @Test(expected = TopologyException.class)
 public void shouldFailOnUnknownSource() {
 topology.addProcessor("processor", new MockProcessorSupplier(), 
"source");
@@ -194,6 +203,16 @@ public void shouldNotAllowToAddSinkWithSameName() {
 } catch (final TopologyE

[jira] [Updated] (KAFKA-6917) Request handler deadlocks attempting to acquire group metadata lock

2018-06-13 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6917:
---
Affects Version/s: (was: 1.0.1)
   0.11.0.0
   1.0.0

> Request handler deadlocks attempting to acquire group metadata lock
> ---
>
> Key: KAFKA-6917
> URL: https://issues.apache.org/jira/browse/KAFKA-6917
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 0.11.0.0, 1.0.0, 1.1.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> We have noticed another deadlock with the group metadata lock with version 
> 1.1.
> {quote}
> Found one Java-level deadlock:
> =
> "executor-Heartbeat":
>   waiting for ownable synchronizer 0x0005ce477080, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-3"
> "kafka-request-handler-3":
>   waiting for ownable synchronizer 0x0005cbe7f698, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-4"
> "kafka-request-handler-4":
>   waiting for ownable synchronizer 0x0005ce477080, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-3"
> Java stack information for the threads listed above:
> ===
> "executor-Heartbeat":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0005ce477080> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:833)
> at 
> kafka.coordinator.group.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:34)
> at kafka.server.DelayedOperation.run(DelayedOperation.scala:144)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> "kafka-request-handler-3":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0005cbe7f698> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:801)
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:799)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
> at 
> kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:799)
> at 
> kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:496)
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1633)
> at 
> kafka.server.KafkaApis$

[jira] [Commented] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511485#comment-16511485
 ] 

ASF GitHub Bot commented on KAFKA-6906:
---

mjsax closed pull request #5196: MINOR: code cleanup follow up for KAFKA-6906
URL: https://github.com/apache/kafka/pull/5196
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4cea5280f86..c4305624ec1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -438,11 +438,6 @@ private void commitOffsets(final boolean 
startNewTransaction) {
 transactionInFlight = true;
 }
 }
-
-if (eosEnabled && !startNewTransaction && transactionInFlight) { 
// need to make sure to commit txn for suspend case
-producer.commitTransaction();
-transactionInFlight = false;
-}
 } catch (final CommitFailedException | ProducerFencedException fatal) {
 throw new TaskMigratedException(this, fatal);
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Streams does not commit transactions if data is produced via wall-clock 
> punctuation
> -
>
> Key: KAFKA-6906
> URL: https://issues.apache.org/jira/browse/KAFKA-6906
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Jagadesh Adireddi
>Priority: Major
> Fix For: 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Committing in Kafka Streams happens in regular intervals. However, committing 
> only happens if new input records got processed since the last commit (via 
> setting flag `commitOffsetNeeded` within `StreamTask#process()`)
> However, data could also be emitted via wall-clock based punctuation calls. 
> Especially if EOS is enabled, this is an issue (maybe also for non-EOS) 
> because the current running transaction is not committed and thus might time 
> out leading to a fatal error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511489#comment-16511489
 ] 

Matthias J. Sax commented on KAFKA-7055:


[~nthean] I just double checked the code. I think we can remove 
`InternalTopologyBuilder#connectProcessors()`, too. Can we picky-back this 
cleanup to your PR? Thx.

> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> -
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Nikki Thean
>Assignee: Nikki Thean
>Priority: Minor
>  Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an example where you forward using name of downstream node 
> rather than child index 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]).
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using this method: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].
> As any attempt to forward a message to those nodes will throw a 
> StreamsException, I suggest throwing an exception if a processor or sink is 
> added without at least one upstream node. There is a method in 
> `InternalTopologyBuilder` that allows you to connect processors by name after 
> you add them to the topology, but it is not part of the external Processor 
> API.
> In addition (or alternatively), I suggest making [the error message for when 
> users try to forward messages to a node that is not 
> connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
>  more descriptive, like this one for when a user attempts to access a state 
> store that is not connected to the processor: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread Nikki Thean (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511496#comment-16511496
 ] 

Nikki Thean commented on KAFKA-7055:


[~mjsax] Absolutely! Re-running tests, then will re-open with that change.

> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> -
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Nikki Thean
>Assignee: Nikki Thean
>Priority: Minor
>  Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an example where you forward using name of downstream node 
> rather than child index 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]).
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using this method: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].
> As any attempt to forward a message to those nodes will throw a 
> StreamsException, I suggest throwing an exception if a processor or sink is 
> added without at least one upstream node. There is a method in 
> `InternalTopologyBuilder` that allows you to connect processors by name after 
> you add them to the topology, but it is not part of the external Processor 
> API.
> In addition (or alternatively), I suggest making [the error message for when 
> users try to forward messages to a node that is not 
> connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
>  more descriptive, like this one for when a user attempts to access a state 
> store that is not connected to the processor: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion

2018-06-13 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511567#comment-16511567
 ] 

Vahid Hashemian commented on KAFKA-7037:


The reason is the {{--topic}} option also accepts regular expressions. In your 
example `+` is a regex symbol and the {{–delete --topic test+topic}} would 
delete topics {{testtopic}}, {{testttopic}}, {{tesopic}}, etc. To escape 
regular expression, the topic name should be places in double quotes, and regex 
symbols should be prefixed with a {{\}}.

> delete topic command replaces '+' from the topic name which leads incorrect 
> topic deletion
> --
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Assignee: Vahid Hashemian
>Priority: Major
>
> While executing a delete command kafka cli tool is removing the "+" symbol 
> and deleting the incorrect topic. In below case if  _"*test+topic"*_ is 
> deleted kafka deletes  _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  delete topic replaces '+' from the topic name  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread Nikki Thean (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511595#comment-16511595
 ] 

Nikki Thean commented on KAFKA-7055:


[~mjsax] [~guozhang] I may have screwed up the GitHub Bot by closing and 
opening a new PR :(

Sorry. PR is here: https://github.com/apache/kafka/pull/5215

> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> -
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Nikki Thean
>Assignee: Nikki Thean
>Priority: Minor
>  Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an example where you forward using name of downstream node 
> rather than child index 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]).
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using this method: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].
> As any attempt to forward a message to those nodes will throw a 
> StreamsException, I suggest throwing an exception if a processor or sink is 
> added without at least one upstream node. There is a method in 
> `InternalTopologyBuilder` that allows you to connect processors by name after 
> you add them to the topology, but it is not part of the external Processor 
> API.
> In addition (or alternatively), I suggest making [the error message for when 
> users try to forward messages to a node that is not 
> connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
>  more descriptive, like this one for when a user attempts to access a state 
> store that is not connected to the processor: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion

2018-06-13 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511567#comment-16511567
 ] 

Vahid Hashemian edited comment on KAFKA-7037 at 6/13/18 7:58 PM:
-

The reason is the {{--topic}} option also accepts regular expressions. In your 
example {{+}} is a regex symbol and the {{–delete --topic test+topic}} would 
delete topics {{testtopic}}, {{testttopic}}, {{tesopic}}, etc. To escape 
regular expression, the topic name should be places in double quotes, and regex 
symbols should be prefixed with a "\".


was (Author: vahid):
The reason is the {{--topic}} option also accepts regular expressions. In your 
example `+` is a regex symbol and the {{–delete --topic test+topic}} would 
delete topics {{testtopic}}, {{testttopic}}, {{tesopic}}, etc. To escape 
regular expression, the topic name should be places in double quotes, and regex 
symbols should be prefixed with a '\'.

> delete topic command replaces '+' from the topic name which leads incorrect 
> topic deletion
> --
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Assignee: Vahid Hashemian
>Priority: Major
>
> While executing a delete command kafka cli tool is removing the "+" symbol 
> and deleting the incorrect topic. In below case if  _"*test+topic"*_ is 
> deleted kafka deletes  _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  delete topic replaces '+' from the topic name  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion

2018-06-13 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511567#comment-16511567
 ] 

Vahid Hashemian edited comment on KAFKA-7037 at 6/13/18 7:58 PM:
-

The reason is the {{--topic}} option also accepts regular expressions. In your 
example `+` is a regex symbol and the {{–delete --topic test+topic}} would 
delete topics {{testtopic}}, {{testttopic}}, {{tesopic}}, etc. To escape 
regular expression, the topic name should be places in double quotes, and regex 
symbols should be prefixed with a '\'.


was (Author: vahid):
The reason is the {{--topic}} option also accepts regular expressions. In your 
example `+` is a regex symbol and the {{–delete --topic test+topic}} would 
delete topics {{testtopic}}, {{testttopic}}, {{tesopic}}, etc. To escape 
regular expression, the topic name should be places in double quotes, and regex 
symbols should be prefixed with a \{{\\}}.

> delete topic command replaces '+' from the topic name which leads incorrect 
> topic deletion
> --
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Assignee: Vahid Hashemian
>Priority: Major
>
> While executing a delete command kafka cli tool is removing the "+" symbol 
> and deleting the incorrect topic. In below case if  _"*test+topic"*_ is 
> deleted kafka deletes  _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  delete topic replaces '+' from the topic name  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion

2018-06-13 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511567#comment-16511567
 ] 

Vahid Hashemian edited comment on KAFKA-7037 at 6/13/18 7:58 PM:
-

The reason is the {{--topic}} option also accepts regular expressions. In your 
example `+` is a regex symbol and the {{–delete --topic test+topic}} would 
delete topics {{testtopic}}, {{testttopic}}, {{tesopic}}, etc. To escape 
regular expression, the topic name should be places in double quotes, and regex 
symbols should be prefixed with a \{{\\}}.


was (Author: vahid):
The reason is the {{--topic}} option also accepts regular expressions. In your 
example `+` is a regex symbol and the {{–delete --topic test+topic}} would 
delete topics {{testtopic}}, {{testttopic}}, {{tesopic}}, etc. To escape 
regular expression, the topic name should be places in double quotes, and regex 
symbols should be prefixed with a {{\}}.

> delete topic command replaces '+' from the topic name which leads incorrect 
> topic deletion
> --
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Assignee: Vahid Hashemian
>Priority: Major
>
> While executing a delete command kafka cli tool is removing the "+" symbol 
> and deleting the incorrect topic. In below case if  _"*test+topic"*_ is 
> deleted kafka deletes  _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  delete topic replaces '+' from the topic name  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511616#comment-16511616
 ] 

Matthias J. Sax commented on KAFKA-7055:


 No worries. All good :)

> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> -
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Nikki Thean
>Assignee: Nikki Thean
>Priority: Minor
>  Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an example where you forward using name of downstream node 
> rather than child index 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]).
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using this method: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].
> As any attempt to forward a message to those nodes will throw a 
> StreamsException, I suggest throwing an exception if a processor or sink is 
> added without at least one upstream node. There is a method in 
> `InternalTopologyBuilder` that allows you to connect processors by name after 
> you add them to the topology, but it is not part of the external Processor 
> API.
> In addition (or alternatively), I suggest making [the error message for when 
> users try to forward messages to a node that is not 
> connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
>  more descriptive, like this one for when a user attempts to access a state 
> store that is not connected to the processor: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6749) TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511683#comment-16511683
 ] 

ASF GitHub Bot commented on KAFKA-6749:
---

mjsax closed pull request #4912: KAFKA-6749: Fixed TopologyTestDriver to 
process stream processing guarantee as exactly once
URL: https://github.com/apache/kafka/pull/4912
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 7f752652da4..723780110ff 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -195,6 +195,7 @@
 private final Map offsetsByTopicPartition = 
new HashMap<>();
 
 private final Map>> 
outputRecordsByTopic = new HashMap<>();
+private final boolean eosEnabled;
 
 /**
  * Create a new test diver instance.
@@ -345,6 +346,7 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
 task = null;
 context = null;
 }
+eosEnabled = 
streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
 }
 
 /**
@@ -439,6 +441,10 @@ private void captureOutputRecords() {
 // Capture all the records sent to the producer ...
 final List> output = producer.history();
 producer.clear();
+if (eosEnabled && !producer.closed()) {
+producer.initTransactions();
+producer.beginTransaction();
+}
 for (final ProducerRecord record : output) {
 outputRecordsByTopic.computeIfAbsent(record.topic(), k -> new 
LinkedList<>()).add(record);
 
@@ -666,6 +672,9 @@ public void close() {
 }
 }
 captureOutputRecords();
+if (!eosEnabled) {
+producer.close();
+}
 stateDirectory.clean();
 }
 
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 7552637dc26..135fb3ffd8a 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -50,6 +50,8 @@
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -61,6 +63,7 @@
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.Arrays;
 import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -70,6 +73,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@RunWith(value = Parameterized.class)
 public class TopologyTestDriverTest {
 private final static String SOURCE_TOPIC_1 = "source-topic-1";
 private final static String SOURCE_TOPIC_2 = "source-topic-2";
@@ -108,6 +112,23 @@
 new StringSerializer(),
 new LongSerializer());
 
+private final boolean eosEnabled;
+
+@Parameterized.Parameters(name = "Eos enabled = {0}")
+public static Collection data() {
+final List values = new ArrayList<>();
+for (final boolean eosEnabled : Arrays.asList(true, false)) {
+values.add(new Object[] {eosEnabled});
+}
+return values;
+}
+
+public TopologyTestDriverTest(final boolean eosEnabled) {
+this.eosEnabled = eosEnabled;
+if (eosEnabled) {
+config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
+}
+}
 
 private final static class Record {
 private final Object key;
@@ -353,6 +374,8 @@ public void shouldCloseProcessor() {
 
 testDriver.close();
 assertTrue(mockProcessors.get(0).closed);
+// As testDriver is already closed, bypassing @After tearDown 
testDriver.close().
+testDriver = null;
 }
 
 @Test


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE
> 

[jira] [Commented] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511726#comment-16511726
 ] 

ASF GitHub Bot commented on KAFKA-6860:
---

mjsax closed pull request #5187: KAFKA-6860: Fix NPE in Kafka Streams with EOS 
enabled
URL: https://github.com/apache/kafka/pull/5187
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index b270e03f2e0..66ddec950c8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -36,17 +36,18 @@
 static final String CHECKPOINT_FILE_NAME = ".checkpoint";
 
 final File baseDir;
-final Map checkpointableOffsets = new HashMap<>();
-
+private final boolean eosEnabled;
 OffsetCheckpoint checkpoint;
 
+final Map checkpointableOffsets = new HashMap<>();
 final Map stores = new LinkedHashMap<>();
 final Map globalStores = new LinkedHashMap<>();
 
-AbstractStateManager(final File baseDir) {
+AbstractStateManager(final File baseDir,
+ final boolean eosEnabled) {
 this.baseDir = baseDir;
+this.eosEnabled = eosEnabled;
 this.checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
-
 }
 
 public void reinitializeStateStoresForPartitions(final Logger log,
@@ -62,11 +63,14 @@ public void reinitializeStateStoresForPartitions(final 
Logger log,
 checkpointableOffsets.remove(topicPartition);
 
storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));
 }
-try {
-checkpoint.write(checkpointableOffsets);
-} catch (final IOException fatalException) {
-log.error("Failed to write offset checkpoint file to {} while 
re-initializing {}: {}", checkpoint, stateStores, fatalException);
-throw new StreamsException("Failed to reinitialize global store.", 
fatalException);
+
+if (!eosEnabled) {
+try {
+checkpoint.write(checkpointableOffsets);
+} catch (final IOException fatalException) {
+log.error("Failed to write offset checkpoint file to {} while 
re-initializing {}: {}", checkpoint, stateStores, fatalException);
+throw new StreamsException("Failed to reinitialize global 
store.", fatalException);
+}
 }
 
 for (final Map.Entry entry : 
storesCopy.entrySet()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 79088d98806..78c4a363f29 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -69,7 +69,7 @@ public GlobalStateManagerImpl(final LogContext logContext,
   final StateDirectory stateDirectory,
   final StateRestoreListener 
stateRestoreListener,
   final StreamsConfig config) {
-super(stateDirectory.globalStateDir());
+super(stateDirectory.globalStateDir(), 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
 
 this.log = logContext.logger(GlobalStateManagerImpl.class);
 this.topology = topology;
@@ -92,16 +92,16 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext processorCo
 if (!stateDirectory.lockGlobalState()) {
 throw new LockException(String.format("Failed to lock the 
global state directory: %s", baseDir));
 }
-} catch (IOException e) {
+} catch (final IOException e) {
 throw new LockException(String.format("Failed to lock the global 
state directory: %s", baseDir));
 }
 
 try {
 this.checkpointableOffsets.putAll(checkpoint.read());
-} catch (IOException e) {
+} catch (final IOException e) {
 try {
 stateDirectory.unlockGlobalState();
-} catch (IOException e1) {
+} catch (final IOException e1) {
 log.error("Failed to unlock the global state directory", e);
 }
 throw new St

[jira] [Commented] (KAFKA-6978) Make Streams Window retention time strict

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511762#comment-16511762
 ] 

ASF GitHub Bot commented on KAFKA-6978:
---

vvcephei opened a new pull request #5218: KAFKA-6978: make window retention 
time strict
URL: https://github.com/apache/kafka/pull/5218
 
 
   Enforce window retention times strictly:
   * records for windows that are expired get dropped
   * queries for timestamps old enough to be expired immediately answered with 
`null`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make Streams Window retention time strict
> -
>
> Key: KAFKA-6978
> URL: https://issues.apache.org/jira/browse/KAFKA-6978
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Currently, the configured retention time for windows is a lower bound. We 
> actually keep the window around until it's time to roll a new segment. At 
> that time, we drop all windows in the oldest segment.
> As long as a window is still in a segment, we will continue to add 
> late-arriving records to it and also serve IQ queries from it. This is sort 
> of nice, because it makes optimistic use of the fact that the windows live 
> for some time after their retention expires. However, it is also a source of 
> (apparent) non-determinism, and it's arguably better for programability if we 
> adhere strictly to the configured constraints.
> Therefore, the new behavior will be:
>  * once the retention time for a window passes, Streams will drop any 
> later-arriving records (with a warning log and a metric)
>  * likewise, IQ will first check whether the window is younger than its 
> retention time before answering queries.
> No changes need to be made to the underlying segment management, this is 
> purely to make the behavior more strict wrt the configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7050) Decrease consumer request timeout to 30s

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511784#comment-16511784
 ] 

ASF GitHub Bot commented on KAFKA-7050:
---

hachikuji closed pull request #5203: KAFKA-7050: Decrease default consumer 
request timeout to 30s
URL: https://github.com/apache/kafka/pull/5203
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index 9b62946fb96..7b44ca3ad3c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -31,6 +31,7 @@
 private final String clientId;
 private final long createdTimeMs;
 private final boolean expectResponse;
+private final int requestTimeoutMs;
 private final RequestCompletionHandler callback;
 
 /**
@@ -48,6 +49,7 @@ public ClientRequest(String destination,
  String clientId,
  long createdTimeMs,
  boolean expectResponse,
+ int requestTimeoutMs,
  RequestCompletionHandler callback) {
 this.destination = destination;
 this.requestBuilder = requestBuilder;
@@ -55,6 +57,7 @@ public ClientRequest(String destination,
 this.clientId = clientId;
 this.createdTimeMs = createdTimeMs;
 this.expectResponse = expectResponse;
+this.requestTimeoutMs = requestTimeoutMs;
 this.callback = callback;
 }
 
@@ -101,4 +104,8 @@ public long createdTimeMs() {
 public int correlationId() {
 return correlationId;
 }
+
+public int requestTimeoutMs() {
+return requestTimeoutMs;
+}
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index 5caee2d4c87..5b7ba611714 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -17,11 +17,11 @@
 package org.apache.kafka.clients;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -162,25 +162,28 @@ public boolean isEmpty() {
 }
 }
 
+private Boolean hasExpiredRequest(long now, 
Deque deque) {
+for (NetworkClient.InFlightRequest request : deque) {
+long timeSinceSend = Math.max(0, now - request.sendTimeMs);
+if (timeSinceSend > request.requestTimeoutMs)
+return true;
+}
+return false;
+}
+
 /**
  * Returns a list of nodes with pending in-flight request, that need to be 
timed out
  *
  * @param now current time in milliseconds
- * @param requestTimeoutMs max time to wait for the request to be completed
  * @return list of nodes
  */
-public List getNodesWithTimedOutRequests(long now, int 
requestTimeoutMs) {
-List nodeIds = new LinkedList<>();
+public List nodesWithTimedOutRequests(long now) {
+List nodeIds = new ArrayList<>();
 for (Map.Entry> 
requestEntry : requests.entrySet()) {
 String nodeId = requestEntry.getKey();
 Deque deque = 
requestEntry.getValue();
-
-if (!deque.isEmpty()) {
-NetworkClient.InFlightRequest request = deque.peekLast();
-long timeSinceSend = now - request.sendTimeMs;
-if (timeSinceSend > requestTimeoutMs)
-nodeIds.add(nodeId);
-}
+if (hasExpiredRequest(now, deque))
+nodeIds.add(nodeId);
 }
 return nodeIds;
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 49bf3a3eab9..448932e358b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -185,9 +185,16 @@ ClientRequest newClientRequest(String nodeId, 
AbstractRequest.Builder request
  * @param requestBuilder the request builder to use
  * @param createdTimeMs the time in milliseconds to use as the creation 
time of the request
  * @param expectResponse true iff we expect a response
+ * @param requestTimeoutMs Upper bo

[jira] [Resolved] (KAFKA-7050) Decrease consumer request timeout to 30s

2018-06-13 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7050.

Resolution: Fixed

> Decrease consumer request timeout to 30s
> 
>
> Key: KAFKA-7050
> URL: https://issues.apache.org/jira/browse/KAFKA-7050
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.0.0
>
>
> Per KIP-266 discussion, we should lower the request timeout. We should also 
> add new logic to override this timeout for the JoinGroup request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511822#comment-16511822
 ] 

ASF GitHub Bot commented on KAFKA-6711:
---

mjsax opened a new pull request #5219: KAFKA-6711: GlobalStateManagerImpl 
should not write offsets of in-memory stores in checkpoint file
URL: https://github.com/apache/kafka/pull/5219
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> GlobalStateManagerImpl should not write offsets of in-memory stores in 
> checkpoint file
> --
>
> Key: KAFKA-6711
> URL: https://issues.apache.org/jira/browse/KAFKA-6711
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Assignee: Cemalettin Koç
>Priority: Major
>  Labels: newbie
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7019) Reduction the contention between metadata update and metadata read operation

2018-06-13 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7019:

Description: 
Currently MetadataCache.updateCache() grabs a write lock in order to process 
the UpdateMetadataRequest from controller. And a read lock is needed in order 
to handle the MetadataRequest from clients. Thus the handling of 
MetadataRequest and UpdateMetadataRequest blocks each other and the broker can 
only process such request at a time even if there are multiple request handler 
threads. Note that broker can not process MetadataRequest in parallel if there 
is a UpdateMetadataRequest waiting for the write lock, even if MetadataRequest 
only requires the read lock to e processed.

For large cluster which has tens of thousands of partitions, it can take e.g. 
200 ms to process UpdateMetadataRequest and MetadataRequest from large clients 
(e.g. MM). During the period when user is rebalancinng cluster, the leadership 
change will cause both UpdateMetadataRequest from controller and also 
MetadataRequest from client. If a broker receives 10 MetadataRequest per second 
and 2 UpdateMetadataRequest per second on average, since these requests need to 
be processed one-at-a-time, it can reduce the request handler thread idle ratio 
to 0 which makes this broker unavailable to user.

We can address this problem by removing the read lock in MetadataCache. The 
idea is that MetadataCache.updateCache() can instantiate a new copy of the 
cache as method local variable when it is processing the UpdateMetadataRequest 
and replace the class private varaible with newly instantiated method local 
varaible at the end of MetadataCache.updateCache(). The handling of 
MetadataRequest only requires access to the read-only class-private variable. 

  was:
Currently MetadataCache.updateCache() grabs a write lock in order to process 
the UpdateMetadataRequest from controller. And a read lock is needed in order 
to handle the MetadataRequest from clients. Thus the handling of 
MetadataRequest and UpdateMetadataRequest blocks each other and the broker can 
only process such request at a time even if there are multiple request handler 
threads. Note that broker can not process MetadataRequest in parallel if there 
is a UpdateMetadataRequest waiting for the write lock, even if MetadataRequest 
only requires the read lock to e processed.

For large cluster which has tens of thousands of partitions, it can take e.g. 
200 ms to process UpdateMetadataRequest and MetadataRequest from large clients 
(e.g. MM). During the period when user is rebalancinng cluster, the leadership 
change will cause both UpdateMetadataRequest from controller and also 
MetadataRequest from client. If a broker receives 10 MetadataRequest per second 
and 2 UpdateMetadataRequest per second on average, since these requests need to 
be processed one-at-a-time, it can reduce the request handler thread idle ratio 
to 0 which makes this broker unavailable to user.

We can address this problem by removing the read/write lock in MetadataCache. 
The idea is that MetadataCache.updateCache() can instantiate a new copy of the 
cache as method local variable when it is processing the UpdateMetadataRequest 
and replace the class private varaible with newly instantiated method local 
varaible at the end of MetadataCache.updateCache(). All these can be done 
without grabbing any lock. The handling of MetadataRequest only requires access 
to the read-only class-private variable.

 


> Reduction the contention between metadata update and metadata read operation
> 
>
> Key: KAFKA-7019
> URL: https://issues.apache.org/jira/browse/KAFKA-7019
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Dong Lin
>Assignee: Radai Rosenblatt
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently MetadataCache.updateCache() grabs a write lock in order to process 
> the UpdateMetadataRequest from controller. And a read lock is needed in order 
> to handle the MetadataRequest from clients. Thus the handling of 
> MetadataRequest and UpdateMetadataRequest blocks each other and the broker 
> can only process such request at a time even if there are multiple request 
> handler threads. Note that broker can not process MetadataRequest in parallel 
> if there is a UpdateMetadataRequest waiting for the write lock, even if 
> MetadataRequest only requires the read lock to e processed.
> For large cluster which has tens of thousands of partitions, it can take e.g. 
> 200 ms to process UpdateMetadataRequest and MetadataRequest from large 
> clients (e.g. MM). During the period when user is rebalancinng cluster, the 
> leadership change will cause both UpdateMetadataRequest from controller and 
> also MetadataReq

[jira] [Updated] (KAFKA-5235) GetOffsetShell: retrieve offsets for all given topics and partitions with single request to the broker

2018-06-13 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5235:
---
Labels: kip tool  (was: tool)

> GetOffsetShell: retrieve offsets for all given topics and partitions with 
> single request to the broker
> --
>
> Key: KAFKA-5235
> URL: https://issues.apache.org/jira/browse/KAFKA-5235
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Arseniy Tashoyan
>Priority: Major
>  Labels: kip, tool
> Fix For: 2.1.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> GetOffsetShell is implemented on old SimpleConsumer. It needs Zookeeper to 
> retrieve metadata about topics and partitions. At present, GetOffsetShell 
> does the following:
> - get metadata from Zookeeper
> - iterate over partitions
> - for each partition, connect to its leader broker and request offsets
> Instead, GetOffsetShell can use new KafkaConsumer and retrieve offsets by 
> means of endOffsets(), beginningOffsets() and offsetsForTimes() methods. One 
> request is sufficient for all topics and partitions.
> As far as GetOffsetShell is re-implemented with new KafkaConsumer API, it 
> will not depend on obsolete API: SimpleConsumer, old producer API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5235) GetOffsetShell: retrieve offsets for all given topics and partitions with single request to the broker

2018-06-13 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511854#comment-16511854
 ] 

Ismael Juma commented on KAFKA-5235:


KIP link:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-308%3A+GetOffsetShell%3A+new+KafkaConsumer+API%2C+support+for+multiple+topics%2C+minimize+the+number+of+requests+to+server

> GetOffsetShell: retrieve offsets for all given topics and partitions with 
> single request to the broker
> --
>
> Key: KAFKA-5235
> URL: https://issues.apache.org/jira/browse/KAFKA-5235
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Arseniy Tashoyan
>Priority: Major
>  Labels: kip, tool
> Fix For: 2.1.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> GetOffsetShell is implemented on old SimpleConsumer. It needs Zookeeper to 
> retrieve metadata about topics and partitions. At present, GetOffsetShell 
> does the following:
> - get metadata from Zookeeper
> - iterate over partitions
> - for each partition, connect to its leader broker and request offsets
> Instead, GetOffsetShell can use new KafkaConsumer and retrieve offsets by 
> means of endOffsets(), beginningOffsets() and offsetsForTimes() methods. One 
> request is sufficient for all topics and partitions.
> As far as GetOffsetShell is re-implemented with new KafkaConsumer API, it 
> will not depend on obsolete API: SimpleConsumer, old producer API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7019) Reduction the contention between metadata update and metadata read operation

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511899#comment-16511899
 ] 

ASF GitHub Bot commented on KAFKA-7019:
---

radai-rosenblatt opened a new pull request #5221: KAFKA-7019 - make reading 
metadata lock-free by maintaining an atomically-updated read snapshot
URL: https://github.com/apache/kafka/pull/5221
 
 
   we've seen cases where under a constant metadata update load (automated 
partition rebalance) request handling threads can block for a significant 
amount of time on the metadata lock.
   
   the reason is that for large enough clusters (~200,000 topic partitions) a 
read operation can actually take a long while to compose a response. under a 
constant stream of reads + writes we see situations where a reader is currently 
in, a writer is pending (blocked) and then a big pile-up of more readers that 
are blocked behind the pending writer.
   
   this patch makes the read path lock-free. the metadata is now stored in a 
logically-immutable snapshot. all read operations grab a snapshot and serve 
data out of it. write paths create an entirely new snapshot and atomically 
assign it. writers are still under a lock, for mutual exclusion.
   
   here's the benchmark code i used to measure the effects of this patch:
   ```java
   public class MetadataCacheBenchmark {
   private volatile boolean running = true;
   
   int numBrokers = 150;
   int numTopics = 3500;
   int maxPartitionsPerTopic = 100;
   int replicationFactor = 2;
   int numUpdaters = 1;
   double updateRateLimit = 10.0; //qps
   int numReaders = 5;
   boolean partialUpdate = true;
   
   private final ListenerName listener = new ListenerName("listener");
   
   private final AtomicLong updateCounter = new AtomicLong();
   private final AtomicLong readCounter = new AtomicLong();
   
   @Test
   public void benchmarkAllTheThings() throws Exception {
   //long seed = System.currentTimeMillis();
   long seed = 666;
   System.err.println("seed is " + seed);
   Random r = new Random(seed);
   
   MetadataCache cache = new MetadataCache(666);
   UpdateMetadataRequest fullRequest = buildRequest(r, -1);
   UpdateMetadataRequest partialRequest = buildRequest(r, 1);
   
   cache.updateCache(0, fullRequest); //initial data (useful in case 
there are no writers)
   
   Set allTopics = new HashSet<>();
   for (int i = 0; i < numTopics; i++) {
   allTopics.add("topic-" + i);
   }
   scala.collection.mutable.Set topicsScalaSet = 
JavaConverters.asScalaSetConverter(allTopics).asScala();
   
   Thread.UncaughtExceptionHandler exceptionHandler = new 
Thread.UncaughtExceptionHandler() {
   @Override
   public void uncaughtException(Thread t, Throwable e) {
   running = false;
   System.err.println("thread " + t + " died");
   e.printStackTrace(System.err);
   System.exit(1);
   }
   };
   List threads = new ArrayList<>();
   
   for (int i = 0; i < numUpdaters; i++) {
   UpdateMetadataRequest req = partialUpdate ? partialRequest : 
fullRequest;
   
   Runnable updaterRunnable;
   if (updateRateLimit > 0) {
   updaterRunnable = new 
RateLimitedUpdateRunnable(updateRateLimit, cache, req);
   } else {
   updaterRunnable = new UpdateRunnable(cache, req);
   }
   Thread updaterThread = new Thread(updaterRunnable, "updater-" + 
i);
   updaterThread.setDaemon(true);
   updaterThread.setUncaughtExceptionHandler(exceptionHandler);
   threads.add(updaterThread);
   }
   
   for (int i = 0; i < numReaders; i++) {
   ReadRunnable readRunnable = new ReadRunnable(cache, 
topicsScalaSet);
   Thread readerThread = new Thread(readRunnable, "reader-" + i);
   readerThread.setDaemon(true);
   readerThread.setUncaughtExceptionHandler(exceptionHandler);
   threads.add(readerThread);
   }
   
   for (Thread t : threads) {
   t.start();
   }
   
   long prevTime = System.currentTimeMillis();
   long prevUpdates = 0;
   long prevReads = 0;
   
   long now;
   long updates;
   long reads;
   
   long timeDiff;
   long updateDiff;
   long readDiff;
   
   double updateQps;
   double readQps;
   
   while (running) {
   Thread.sleep(TimeUnit.SECONDS.toMillis(30));
   now = System.currentTimeMillis();
   updates = updateCounter.longValue();
   reads = readCounter.long

[jira] [Commented] (KAFKA-7056) Connect's new numeric converters should be in a different package

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511952#comment-16511952
 ] 

ASF GitHub Bot commented on KAFKA-7056:
---

rhauch opened a new pull request #5222: KAFKA-7056: Moved Connect’s new numeric 
converters to runtime (KIP-305)
URL: https://github.com/apache/kafka/pull/5222
 
 
   KIP-305 added numeric converters to Connect, but these were added in 
Connect’s API module in the same package as the `StringConverter`. This commit 
moves them into the Runtime module and into the `converters` package where the 
`ByteArrayConverter` already lives. These numeric converters have not yet been 
included in a release, and so they can be moved without concern.
   
   All of Connect’s converters must be referenced in worker / connector 
configurations and are therefore part of the API, but otherwise do not need to 
be in the “api” module as they do not need to be instantiated or directly used 
by extensions. This change makes them more similar to and aligned with the 
`ByteArrayConverter`.
   
   It also gives us the opportunity to move them into the “api” module in the 
future (keeping the same package name), should we ever want or need to do so. 
However, if we were to start out with them in the “api” module, we would never 
be able to move them out into the “runtime” module, even if we kept the same 
package name. Therefore, moving them to “runtime” now gives us a bit more 
flexibility.
   
   This PR moves the unit tests for the numeric converters accordingly, and 
updates the `PluginsUtil` and `PluginUtilsTest` as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connect's new numeric converters should be in a different package
> -
>
> Key: KAFKA-7056
> URL: https://issues.apache.org/jira/browse/KAFKA-7056
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
> Fix For: 2.0.0
>
>
> KIP-305 added several new primitive converters, but placed them alongside 
> {{StringConverter}} in the {{...connect.storage}} package rather than 
> alongside {{ByteArrayConverter}} in the {{...connect.converters}} package. We 
> should move them to the {{converters}} package. See 
> https://github.com/apache/kafka/pull/5198 for a discussion.
> Need to also update the plugins whitelist (see KAFKA-7043).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7030) Add configuration to disable message down-conversion

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511971#comment-16511971
 ] 

ASF GitHub Bot commented on KAFKA-7030:
---

hachikuji closed pull request #5192: KAFKA-7030: Add configuration to disable 
message down-conversion (KIP-283)
URL: https://github.com/apache/kafka/pull/5192
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index d6b70032626..fb2208c0328 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -165,4 +165,11 @@
 "the timestamp when a broker receives a message and the timestamp 
specified in the message. If " +
 "message.timestamp.type=CreateTime, a message will be rejected if the 
difference in timestamp " +
 "exceeds this threshold. This configuration is ignored if 
message.timestamp.type=LogAppendTime.";
+
+public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = 
"message.downconversion.enable";
+public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This 
configuration controls whether " +
+"down-conversion of message formats is enabled to satisfy consume 
requests. When set to false, " +
+"broker will not perform down-conversion for consumers expecting an 
older message format. The broker responds " +
+"with UNSUPPORTED_VERSION error for consume requests from 
such older clients. This configuration" +
+"does not apply to any message format conversion that might be 
required for replication to followers.";
 }
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
index 158209a1fc0..59269fe18d3 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -63,6 +63,7 @@ object Defaults {
   val LeaderReplicationThrottledReplicas = Collections.emptyList[String]()
   val FollowerReplicationThrottledReplicas = Collections.emptyList[String]()
   val MaxIdMapSnapshots = kafka.server.Defaults.MaxIdMapSnapshots
+  val MessageDownConversionEnable = 
kafka.server.Defaults.MessageDownConversionEnable
 }
 
 case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: 
Set[String] = Set.empty)
@@ -96,6 +97,7 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
   val messageTimestampDifferenceMaxMs = 
getLong(LogConfig.MessageTimestampDifferenceMaxMsProp).longValue
   val LeaderReplicationThrottledReplicas = 
getList(LogConfig.LeaderReplicationThrottledReplicasProp)
   val FollowerReplicationThrottledReplicas = 
getList(LogConfig.FollowerReplicationThrottledReplicasProp)
+  val messageDownConversionEnable = 
getBoolean(LogConfig.MessageDownConversionEnableProp)
 
   def randomSegmentJitter: Long =
 if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % 
math.min(segmentJitterMs, segmentMs)
@@ -131,6 +133,7 @@ object LogConfig {
   val MessageFormatVersionProp = TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG
   val MessageTimestampTypeProp = TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG
   val MessageTimestampDifferenceMaxMsProp = 
TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG
+  val MessageDownConversionEnableProp = 
TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG
 
   // Leave these out of TopicConfig for now as they are replication quota 
configs
   val LeaderReplicationThrottledReplicasProp = 
"leader.replication.throttled.replicas"
@@ -158,6 +161,7 @@ object LogConfig {
   val MessageFormatVersionDoc = TopicConfig.MESSAGE_FORMAT_VERSION_DOC
   val MessageTimestampTypeDoc = TopicConfig.MESSAGE_TIMESTAMP_TYPE_DOC
   val MessageTimestampDifferenceMaxMsDoc = 
TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC
+  val MessageDownConversionEnableDoc = 
TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC
 
   val LeaderReplicationThrottledReplicasDoc = "A list of replicas for which 
log replication should be throttled on " +
 "the leader side. The list should describe a set of replicas in the form " 
+
@@ -262,6 +266,8 @@ object LogConfig {
 LeaderReplicationThrottledReplicasDoc, 
LeaderReplicationThrottledReplicasProp)
   .define(FollowerReplicationThrottledReplicasProp, LIST, 
Defaults.FollowerReplicationThrottledReplicas, ThrottledReplicaListValidator, 
MEDIUM,
 FollowerReplicationThrottledReplicasDoc, 
FollowerReplicationThrottledReplicasProp)
+  .define(MessageDownConversionEnableProp, BOOLEAN, 
Defaults.MessageD

[jira] [Updated] (KAFKA-5234) GetOffsetShell: retrieve offsets for multiple topics with single request

2018-06-13 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5234:
---
Fix Version/s: (was: 2.1.0)

> GetOffsetShell: retrieve offsets for multiple topics with single request
> 
>
> Key: KAFKA-5234
> URL: https://issues.apache.org/jira/browse/KAFKA-5234
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Arseniy Tashoyan
>Priority: Major
>  Labels: tool
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> At present, GetOffsetShell is able to retrieve offsets for one topic only:
> --topic  REQUIRED: The topic to get offsets from.
> If user wants to get offsets for several topics, he has to call 
> GetOffsetShell as many times as the number of topics to explore. Some 
> solutions may have dozens of topics. Monitoring of a large Kafka cluster with 
> GetOffsetShell requires additional scripting efforts and produces visible 
> performance drawback due to multiple requests to the broker.
> Instead, GetOffsetShell should support multiple topics, for example:
> --topics topic1,topic2,topic3
> Moreover, GetOffsetShell should be able to retrieve offsets for _all_ topics, 
> when user specified none topics in command line.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3355) GetOffsetShell command doesn't work with SASL enabled Kafka

2018-06-13 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-3355.

   Resolution: Duplicate
Fix Version/s: (was: 2.1.0)

This will be done as part of KAFKA-5235, so marking as duplicate.

> GetOffsetShell command doesn't work with SASL enabled Kafka
> ---
>
> Key: KAFKA-3355
> URL: https://issues.apache.org/jira/browse/KAFKA-3355
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.9.0.1
> Environment: Kafka 0.9.0.1
>Reporter: TAO XIAO
>Assignee: Ashish Singh
>Priority: Major
>
> I found that GetOffsetShell doesn't work with SASL enabled Kafka. I believe 
> this is due to old producer being used in GetOffsetShell.
> Kafka version 0.9.0.1
> Exception
> % bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> localhost:9092 --topic test --time -1
> [2016-03-04 21:43:56,597] INFO Verifying properties 
> (kafka.utils.VerifiableProperties)
> [2016-03-04 21:43:56,613] INFO Property client.id is overridden to 
> GetOffsetShell (kafka.utils.VerifiableProperties)
> [2016-03-04 21:43:56,613] INFO Property metadata.broker.list is overridden to 
> localhost:9092 (kafka.utils.VerifiableProperties)
> [2016-03-04 21:43:56,613] INFO Property request.timeout.ms is overridden to 
> 1000 (kafka.utils.VerifiableProperties)
> [2016-03-04 21:43:56,674] INFO Fetching metadata from broker 
> BrokerEndPoint(0,localhost,9092) with correlation id 0 for 1 topic(s) 
> Set(test) (kafka.client.ClientUtils$)
> [2016-03-04 21:43:56,689] INFO Connected to localhost:9092 for producing 
> (kafka.producer.SyncProducer)
> [2016-03-04 21:43:56,705] WARN Fetching topic metadata with correlation id 0 
> for topics [Set(test)] from broker [BrokerEndPoint(0,localhost,9092)] failed 
> (kafka.client.ClientUtils$)
> java.nio.BufferUnderflowException
>   at java.nio.Buffer.nextGetIndex(Buffer.java:498)
>   at java.nio.HeapByteBuffer.getShort(HeapByteBuffer.java:304)
>   at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:36)
>   at kafka.cluster.BrokerEndPoint$.readFrom(BrokerEndPoint.scala:52)
>   at 
> kafka.api.TopicMetadataResponse$$anonfun$1.apply(TopicMetadataResponse.scala:28)
>   at 
> kafka.api.TopicMetadataResponse$$anonfun$1.apply(TopicMetadataResponse.scala:28)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at scala.collection.immutable.Range.foreach(Range.scala:166)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:28)
>   at kafka.producer.SyncProducer.send(SyncProducer.scala:120)
>   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
>   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
>   at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:78)
>   at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5234) GetOffsetShell: retrieve offsets for multiple topics with single request

2018-06-13 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-5234.

Resolution: Duplicate

Let's consolidate GetOffsetShell improvements under KAFKA-5235.

> GetOffsetShell: retrieve offsets for multiple topics with single request
> 
>
> Key: KAFKA-5234
> URL: https://issues.apache.org/jira/browse/KAFKA-5234
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Arseniy Tashoyan
>Priority: Major
>  Labels: tool
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> At present, GetOffsetShell is able to retrieve offsets for one topic only:
> --topic  REQUIRED: The topic to get offsets from.
> If user wants to get offsets for several topics, he has to call 
> GetOffsetShell as many times as the number of topics to explore. Some 
> solutions may have dozens of topics. Monitoring of a large Kafka cluster with 
> GetOffsetShell requires additional scripting efforts and produces visible 
> performance drawback due to multiple requests to the broker.
> Instead, GetOffsetShell should support multiple topics, for example:
> --topics topic1,topic2,topic3
> Moreover, GetOffsetShell should be able to retrieve offsets for _all_ topics, 
> when user specified none topics in command line.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5235) GetOffsetShell: retrieve offsets for all given topics and partitions with single request to the broker

2018-06-13 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511986#comment-16511986
 ] 

Ismael Juma commented on KAFKA-5235:


Note that https://github.com/apache/kafka/pull/5220 does a small part of what's 
described in this JIRA. We should use this JIRA for the remaining KIP-308 work 
(the user visible parts).

> GetOffsetShell: retrieve offsets for all given topics and partitions with 
> single request to the broker
> --
>
> Key: KAFKA-5235
> URL: https://issues.apache.org/jira/browse/KAFKA-5235
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Arseniy Tashoyan
>Priority: Major
>  Labels: kip, tool
> Fix For: 2.1.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> GetOffsetShell is implemented on old SimpleConsumer. It needs Zookeeper to 
> retrieve metadata about topics and partitions. At present, GetOffsetShell 
> does the following:
> - get metadata from Zookeeper
> - iterate over partitions
> - for each partition, connect to its leader broker and request offsets
> Instead, GetOffsetShell can use new KafkaConsumer and retrieve offsets by 
> means of endOffsets(), beginningOffsets() and offsetsForTimes() methods. One 
> request is sufficient for all topics and partitions.
> As far as GetOffsetShell is re-implemented with new KafkaConsumer API, it 
> will not depend on obsolete API: SimpleConsumer, old producer API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)