[jira] [Created] (KAFKA-5560) LogManager should be able to create new logs based on free disk space
huxihx created KAFKA-5560: - Summary: LogManager should be able to create new logs based on free disk space Key: KAFKA-5560 URL: https://issues.apache.org/jira/browse/KAFKA-5560 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.11.0.0 Reporter: huxihx Currently, log manager chooses a directory configured in `log.dirs` by calculating the number partitions in each directory and then choosing the one with the fewest partitions. But in some real production scenarios where data volumes of partitions are not even, some disks nearly become full whereas the others have a lot of spaces which lead to a poor data distribution. We should offer a new strategy to users to have log manager honor the real disk free spaces and choose the directory with the most disk space. Maybe a new broker configuration parameter is needed, `log.directory.strategy` for instance. Perhaps this needs a new KIP also. Does it make sense? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5464) StreamsKafkaClient should not use StreamsConfig.POLL_MS_CONFIG
[ https://issues.apache.org/jira/browse/KAFKA-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076174#comment-16076174 ] ASF GitHub Bot commented on KAFKA-5464: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3439 > StreamsKafkaClient should not use StreamsConfig.POLL_MS_CONFIG > -- > > Key: KAFKA-5464 > URL: https://issues.apache.org/jira/browse/KAFKA-5464 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > Fix For: 0.10.2.2, 0.11.0.1, 0.11.1.0 > > > In {{StreamsKafkaClient}} we use an {{NetworkClient}} internally and call > {{poll}} using {{StreamsConfig.POLL_MS_CONFIG}} as timeout. > However, {{StreamsConfig.POLL_MS_CONFIG}} is solely meant to be applied to > {{KafkaConsumer.poll()}} and it's incorrect to use it for the > {{NetworkClient}}. If the config is increased, this can lead to a infinite > rebalance and rebalance on the client side is increased and thus, the client > is not able to meet broker enforced timeouts anymore. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5372) Unexpected state transition Dead to PendingShutdown
[ https://issues.apache.org/jira/browse/KAFKA-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076203#comment-16076203 ] ASF GitHub Bot commented on KAFKA-5372: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3432 > Unexpected state transition Dead to PendingShutdown > --- > > Key: KAFKA-5372 > URL: https://issues.apache.org/jira/browse/KAFKA-5372 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Jason Gustafson >Assignee: Eno Thereska > Fix For: 0.11.1.0 > > > I often see this running integration tests: > {code} > [2017-06-02 15:36:03,411] WARN stream-thread > [appId-1-c382ef0a-adbd-422b-9717-9b2bc52b55eb-StreamThread-13] Unexpected > state transition from DEAD to PENDING_SHUTDOWN. > (org.apache.kafka.streams.processor.internals.StreamThread:976) > {code} > Maybe a race condition on shutdown or something? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5508) Documentation for altering topics
[ https://issues.apache.org/jira/browse/KAFKA-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5508. --- Resolution: Fixed Fix Version/s: 0.11.0.1 0.11.1.0 Issue resolved by pull request 3429 [https://github.com/apache/kafka/pull/3429] > Documentation for altering topics > - > > Key: KAFKA-5508 > URL: https://issues.apache.org/jira/browse/KAFKA-5508 > Project: Kafka > Issue Type: Bug > Components: documentation >Reporter: Tom Bentley >Assignee: huxihx >Priority: Minor > Fix For: 0.11.1.0, 0.11.0.1 > > > According to the upgrade documentation: > bq. Altering topic configuration from the kafka-topics.sh script > (kafka.admin.TopicCommand) has been deprecated. Going forward, please use the > kafka-configs.sh script (kafka.admin.ConfigCommand) for this functionality. > But the Operations documentation still tells people to use kafka-topics.sh to > alter their topic configurations. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5508) Documentation for altering topics
[ https://issues.apache.org/jira/browse/KAFKA-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076207#comment-16076207 ] ASF GitHub Bot commented on KAFKA-5508: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3429 > Documentation for altering topics > - > > Key: KAFKA-5508 > URL: https://issues.apache.org/jira/browse/KAFKA-5508 > Project: Kafka > Issue Type: Bug > Components: documentation >Reporter: Tom Bentley >Assignee: huxihx >Priority: Minor > Fix For: 0.11.0.1, 0.11.1.0 > > > According to the upgrade documentation: > bq. Altering topic configuration from the kafka-topics.sh script > (kafka.admin.TopicCommand) has been deprecated. Going forward, please use the > kafka-configs.sh script (kafka.admin.ConfigCommand) for this functionality. > But the Operations documentation still tells people to use kafka-topics.sh to > alter their topic configurations. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5561) Rewrite TopicCommand using the new Admin client
Paolo Patierno created KAFKA-5561: - Summary: Rewrite TopicCommand using the new Admin client Key: KAFKA-5561 URL: https://issues.apache.org/jira/browse/KAFKA-5561 Project: Kafka Issue Type: Improvement Components: tools Reporter: Paolo Patierno Assignee: Paolo Patierno Hi, as suggested in the https://issues.apache.org/jira/browse/KAFKA-3331, it could be great to have the TopicCommand using the new Admin client instead of the way it works today. As pushed by [~gwenshap] in the above JIRA, I'm going to work on it. Thanks, Paolo -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3331) Refactor TopicCommand to make it testable and add unit tests
[ https://issues.apache.org/jira/browse/KAFKA-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076252#comment-16076252 ] Paolo Patierno commented on KAFKA-3331: --- [~gwenshap] I opened this JIRA https://issues.apache.org/jira/browse/KAFKA-5561 for working on that. Thanks ! > Refactor TopicCommand to make it testable and add unit tests > > > Key: KAFKA-3331 > URL: https://issues.apache.org/jira/browse/KAFKA-3331 > Project: Kafka > Issue Type: Wish >Affects Versions: 0.9.0.1 >Reporter: Ashish Singh >Assignee: Ashish Singh > Fix For: 0.11.1.0 > > > TopicCommand has become a functionality packed, hard to read, class. Adding > or changing it with confidence requires some unit tests around it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5562) Do streams state directory cleanup on a single thread
Damian Guy created KAFKA-5562: - Summary: Do streams state directory cleanup on a single thread Key: KAFKA-5562 URL: https://issues.apache.org/jira/browse/KAFKA-5562 Project: Kafka Issue Type: Bug Reporter: Damian Guy Assignee: Damian Guy Currently in streams we clean up old state directories every so often (as defined by {{state.cleanup.delay.ms}}). However, every StreamThread runs the cleanup, which is both unnecessary and can potentially lead to race conditions. It would be better to perform the state cleanup on a single thread and only when the {{KafkaStreams}} instance is in a running state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5529) ConsoleProducer uses deprecated BaseProducer
[ https://issues.apache.org/jira/browse/KAFKA-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5529: --- Fix Version/s: 0.12.0.0 > ConsoleProducer uses deprecated BaseProducer > > > Key: KAFKA-5529 > URL: https://issues.apache.org/jira/browse/KAFKA-5529 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Evgeny Veretennikov >Assignee: Evgeny Veretennikov >Priority: Minor > Fix For: 0.12.0.0 > > > BaseProducer is deprecated, should use > org.apache.kafka.clients.producer.KafkaProducer instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5512) KafkaConsumer: High memory allocation rate when idle
[ https://issues.apache.org/jira/browse/KAFKA-5512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5512: --- Fix Version/s: 0.10.2.2 > KafkaConsumer: High memory allocation rate when idle > > > Key: KAFKA-5512 > URL: https://issues.apache.org/jira/browse/KAFKA-5512 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.1.1 >Reporter: Stephane Roset > Labels: performance > Fix For: 0.10.2.2, 0.11.0.1 > > > Hi, > We noticed in our application that the memory allocation rate increased > significantly when we have no Kafka messages to consume. We isolated the > issue by using a JVM that simply runs 128 Kafka consumers. These consumers > consume 128 partitions (so each consumer consumes one partition). The > partitions are empty and no message has been sent during the test. The > consumers were configured with default values (session.timeout.ms=3, > fetch.max.wait.ms=500, receive.buffer.bytes=65536, > heartbeat.interval.ms=3000, max.poll.interval.ms=30, > max.poll.records=500). The Kafka cluster was made of 3 brokers. Within this > context, the allocation rate was about 55 MiB/s. This high allocation rate > generates a lot of GC activity (to garbage the young heap) and was an issue > for our project. > We profiled the JVM with JProfiler. We noticed that there were a huge > quantity of ArrayList$Itr in memory. These collections were mainly > instantiated by the methods handleCompletedReceives, handleCompletedSends, > handleConnecions and handleDisconnections of the class NetWorkClient. We also > noticed that we had a lot of calls to the method pollOnce of the class > KafkaConsumer. > So we decided to run only one consumer and to profile the calls to the method > pollOnce. We noticed that regularly a huge number of calls is made to this > method, up to 268000 calls within 100ms. The pollOnce method calls the > NetworkClient.handle* methods. These methods iterate on collections (even if > they are empty), so that explains the huge number of iterators in memory. > The large number of calls is related to the heartbeat mechanism. The pollOnce > method calculates the poll timeout; if a heartbeat needs to be done, the > timeout will be set to 0. The problem is that the heartbeat thread checks > every 100 ms (default value of retry.backoff.ms) if a heartbeat should be > sent, so the KafkaConsumer will call the poll method in a loop without > timeout until the heartbeat thread awakes. For example: the heartbeat thread > just started to wait and will awake in 99ms. So during 99ms, the > KafkaConsumer will call in a loop the pollOnce method and will use a timeout > of 0. That explains how we can have 268000 calls within 100ms. > The heartbeat thread calls the method AbstractCoordinator.wait() to sleep, so > I think the Kafka consumer should awake the heartbeat thread with a notify > when needed. > We made two quick fixes to solve this issue: > - In NetworkClient.handle*(), we don't iterate on collections if they are > empty (to avoid unnecessary iterators instantiations). > - In KafkaConsumer.pollOnce(), if the poll timeout is equal to 0 we notify > the heartbeat thread to awake it (dirty fix because we don't handle the > autocommit case). > With these 2 quick fixes and 128 consumers, the allocation rate drops down > from 55 MiB/s to 4 MiB/s. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4468) Correctly calculate the window end timestamp after read from state stores
[ https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076281#comment-16076281 ] Evgeny Veretennikov commented on KAFKA-4468: I have researched windowed stores a bit more and noticed, that {{WindowedDeserializer}} isn't used in Kafka source codes, including internals: {noformat} $ grep -Rl WindowedDeserializer | grep java | grep -v test streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java {noformat} It's not a part of API too (as it's in {{internals}} package). {{RocksDBWindowStore}} doesn't use {{WindowedDeserializer}} to deserialize windowed keys, but uses {{WindowStoreUtils}} static methods instead, including {{timeWindowForSize()}} method, which already uses {{windowSize}} to calculate proper {{TimeWindow}}. So, shouldn't we just remove {{WindowedDeserializer}}? > Correctly calculate the window end timestamp after read from state stores > - > > Key: KAFKA-4468 > URL: https://issues.apache.org/jira/browse/KAFKA-4468 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: architecture > > When storing the WindowedStore on the persistent KV store, we only use the > start timestamp of the window as part of the combo-key as (start-timestamp, > key). The reason that we do not add the end-timestamp as well is that we can > always calculate it from the start timestamp + window_length, and hence we > can save 8 bytes per key on the persistent KV store. > However, after read it (via {{WindowedDeserializer}}) we do not set its end > timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix > this by calculating its end timestamp as mentioned above. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5563) Clarify handling of connector name in config
Sönke Liebau created KAFKA-5563: --- Summary: Clarify handling of connector name in config Key: KAFKA-5563 URL: https://issues.apache.org/jira/browse/KAFKA-5563 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 0.11.0.0 Reporter: Sönke Liebau Priority: Minor The connector name is currently being stored in two places, once at the root level of the connector and once in the config: {code:java} { "name": "test", "config": { "connector.class": "org.apache.kafka.connect.tools.MockSinkConnector", "tasks.max": "3", "topics": "test-topic", "name": "test" }, "tasks": [ { "connector": "test", "task": 0 } ] } {code} If no name is provided in the "config" element, then the name from the root level is [copied there when the connector is being created|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L95]. If however a name is provided in the config then it is not touched, which means it is possible to create a connector with a different name at the root level and in the config like this: {code:java} { "name": "test1", "config": { "connector.class": "org.apache.kafka.connect.tools.MockSinkConnector", "tasks.max": "3", "topics": "test-topic", "name": "differentname" }, "tasks": [ { "connector": "test1", "task": 0 } ] } {code} I am not aware of any issues that this currently causes, but it is at least confusing and probably not intended behavior and definitely bears potential for bugs, if different functions take the name from different places. Would it make sense to add a check to reject requests that provide different names in the request and the config section? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4468) Correctly calculate the window end timestamp after read from state stores
[ https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076373#comment-16076373 ] Evgeny Veretennikov commented on KAFKA-4468: Sorry, guys, I missed javadoc comment to {{WindowedDeserializer}}... :) {noformat} The inner deserializer class can be specified by setting the property key.deserializer.inner.class, value.deserializer.inner.class or deserializer.inner.class {noformat} So, users actually can use this class via property. By the way, why Kafka users are allowed to use internal Kafka streams classes? Doesn't it make sense to move such classes, which users can use, outside of {{internals}} package? > Correctly calculate the window end timestamp after read from state stores > - > > Key: KAFKA-4468 > URL: https://issues.apache.org/jira/browse/KAFKA-4468 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: architecture > > When storing the WindowedStore on the persistent KV store, we only use the > start timestamp of the window as part of the combo-key as (start-timestamp, > key). The reason that we do not add the end-timestamp as well is that we can > always calculate it from the start timestamp + window_length, and hence we > can save 8 bytes per key on the persistent KV store. > However, after read it (via {{WindowedDeserializer}}) we do not set its end > timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix > this by calculating its end timestamp as mentioned above. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5534) KafkaConsumer offsetsForTimes should include partitions in result even if no offset could be found
[ https://issues.apache.org/jira/browse/KAFKA-5534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5534: --- Fix Version/s: 0.10.2.2 > KafkaConsumer offsetsForTimes should include partitions in result even if no > offset could be found > -- > > Key: KAFKA-5534 > URL: https://issues.apache.org/jira/browse/KAFKA-5534 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Jason Gustafson >Assignee: Vahid Hashemian > Fix For: 0.10.2.2, 0.11.0.1 > > > From the javadoc for {{offsetsForTimes}}: > {code} > * @return a mapping from partition to the timestamp and offset of the > first message with timestamp greater > * than or equal to the target timestamp. {@code null} will be > returned for the partition if there is no > * such message. > {code} > If the topic does not support timestamp search (i.e. magic 1 and above), we > include the partition in the map with a null value, as described above. If > the topic supports timestamp search but no offset could be found, we just > leave the partition out of the map. We should make this behavior consistent > and include the partition with a null value in the result. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3362) Update protocol schema and field doc strings
[ https://issues.apache.org/jira/browse/KAFKA-3362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076459#comment-16076459 ] ASF GitHub Bot commented on KAFKA-3362: --- GitHub user andrasbeni opened a pull request: https://github.com/apache/kafka/pull/3495 KAFKA-3362: Update protocol schema and field doc strings Added doc to protocol fields You can merge this pull request into a Git repository by running: $ git pull https://github.com/andrasbeni/kafka KAFKA-3362 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3495.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3495 commit ad144271aa8a491faa31be998bf2bfa32ac87847 Author: Andras Beni Date: 2017-06-22T07:10:43Z KAFKA-3362: Update protocol schema and field doc strings Added doc to protocol fields > Update protocol schema and field doc strings > > > Key: KAFKA-3362 > URL: https://issues.apache.org/jira/browse/KAFKA-3362 > Project: Kafka > Issue Type: Sub-task >Reporter: Grant Henke >Assignee: Andras Beni > > In KAFKA-3361, auto generation of docs based on the definitions in > Protocol.java was added. There are some inconsistencies and missing > information in the docs strings in the code vs. the [wiki > page|https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol]. > > The code documentation strings should be reviewed and updated to be complete > and accurate. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4468) Correctly calculate the window end timestamp after read from state stores
[ https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076472#comment-16076472 ] Evgeny Veretennikov commented on KAFKA-4468: Let me show you such example: {code:java} final Serde> windowedSerde = new WrapperSerde( new WindowedSerializer<>(new StringSerializer()), new WindowedDeserializer<>(new StringDeserializer()) ); final String topic = "name"; final RocksDBStore, String> store = new RocksDBStore<>(topic, windowedSerde, Serdes.String()); final MockProcessorContext context = ...; context.setRecordContext(...); store.init(context, store); store.put(new Windowed<>("key1", new TimeWindow(100, 123)), "value1"); store.put(new Windowed<>("key2", new TimeWindow(101, 456)), "value2"); final KeyValueIterator, String> all = store.all(); all.next(); // KeyValue([key1@100/9223372036854775807], value1) all.next(); // KeyValue([key2@101/9223372036854775807], value2) {code} We are able to put in store two time windows with different window sizes. When we try to get them back from store, we get two windows with proper begins, but broken ends ({{Long.MAX_VALUE}}, as in {{WindowedDeserializer}}). So, we are unable to calculate window end without saving it in {{WindowSerializer}}. Now it seems, that [~bbejeck] was actually correct about this: {noformat} Unless I'm missing something, this task implies we'll need to include the window_size (and forgo the 8 bytes per key storage savings) on serialization with WindowedSerializer. As after we've read it via the WindowedDeserializer we only have the key and the start timestamp and don't have access to the original window_size to do the calculation. {noformat} > Correctly calculate the window end timestamp after read from state stores > - > > Key: KAFKA-4468 > URL: https://issues.apache.org/jira/browse/KAFKA-4468 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: architecture > > When storing the WindowedStore on the persistent KV store, we only use the > start timestamp of the window as part of the combo-key as (start-timestamp, > key). The reason that we do not add the end-timestamp as well is that we can > always calculate it from the start timestamp + window_length, and hence we > can save 8 bytes per key on the persistent KV store. > However, after read it (via {{WindowedDeserializer}}) we do not set its end > timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix > this by calculating its end timestamp as mentioned above. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-4468) Correctly calculate the window end timestamp after read from state stores
[ https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076472#comment-16076472 ] Evgeny Veretennikov edited comment on KAFKA-4468 at 7/6/17 1:13 PM: Let me show you such example: {code:java} final Serde> windowedSerde = new WrapperSerde( new WindowedSerializer<>(new StringSerializer()), new WindowedDeserializer<>(new StringDeserializer()) ); final String topic = "name"; final RocksDBStore, String> store = new RocksDBStore<>(topic, windowedSerde, Serdes.String()); final MockProcessorContext context = ...; context.setRecordContext(...); store.init(context, store); store.put(new Windowed<>("key1", new TimeWindow(100, 123)), "value1"); store.put(new Windowed<>("key2", new TimeWindow(101, 456)), "value2"); final KeyValueIterator, String> all = store.all(); all.next(); // KeyValue([key1@100/9223372036854775807], value1) all.next(); // KeyValue([key2@101/9223372036854775807], value2) {code} We are able to put in store two time windows with different window sizes. When we try to get them back from store, we get two windows with proper begins, but broken ends ({{Long.MAX_VALUE}}, as in {{WindowedDeserializer}}). So, we are unable to calculate window end without saving it in {{WindowSerializer}}. Now it seems, that [~bbejeck] was actually correct about this: {noformat} Unless I'm missing something, this task implies we'll need to include the window_size (and forgo the 8 bytes per key storage savings) on serialization with WindowedSerializer. As after we've read it via the WindowedDeserializer we only have the key and the start timestamp and don't have access to the original window_size to do the calculation. {noformat} was (Author: evis): Let me show you such example: {code:java} final Serde> windowedSerde = new WrapperSerde( new WindowedSerializer<>(new StringSerializer()), new WindowedDeserializer<>(new StringDeserializer()) ); final String topic = "name"; final RocksDBStore, String> store = new RocksDBStore<>(topic, windowedSerde, Serdes.String()); final MockProcessorContext context = ...; context.setRecordContext(...); store.init(context, store); store.put(new Windowed<>("key1", new TimeWindow(100, 123)), "value1"); store.put(new Windowed<>("key2", new TimeWindow(101, 456)), "value2"); final KeyValueIterator, String> all = store.all(); all.next(); // KeyValue([key1@100/9223372036854775807], value1) all.next(); // KeyValue([key2@101/9223372036854775807], value2) {code} We are able to put in store two time windows with different window sizes. When we try to get them back from store, we get two windows with proper begins, but broken ends ({{Long.MAX_VALUE}}, as in {{WindowedDeserializer}}). So, we are unable to calculate window end without saving it in {{WindowSerializer}}. Now it seems, that [~bbejeck] was actually correct about this: {noformat} Unless I'm missing something, this task implies we'll need to include the window_size (and forgo the 8 bytes per key storage savings) on serialization with WindowedSerializer. As after we've read it via the WindowedDeserializer we only have the key and the start timestamp and don't have access to the original window_size to do the calculation. {noformat} > Correctly calculate the window end timestamp after read from state stores > - > > Key: KAFKA-4468 > URL: https://issues.apache.org/jira/browse/KAFKA-4468 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: architecture > > When storing the WindowedStore on the persistent KV store, we only use the > start timestamp of the window as part of the combo-key as (start-timestamp, > key). The reason that we do not add the end-timestamp as well is that we can > always calculate it from the start timestamp + window_length, and hence we > can save 8 bytes per key on the persistent KV store. > However, after read it (via {{WindowedDeserializer}}) we do not set its end > timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix > this by calculating its end timestamp as mentioned above. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5564) Fail to create topics with error 'While recording the replica LEO, the partition [topic2,0] hasn't been created'
Klearchos Chaloulos created KAFKA-5564: -- Summary: Fail to create topics with error 'While recording the replica LEO, the partition [topic2,0] hasn't been created' Key: KAFKA-5564 URL: https://issues.apache.org/jira/browse/KAFKA-5564 Project: Kafka Issue Type: Bug Affects Versions: 0.9.0.1 Reporter: Klearchos Chaloulos Hello, *Short version* we have seen sporadic occurrences of the following issue: Topics whose leader is a specific broker fail to be created properly, and it is impossible to produce to them or consume from them. The following logs appears in the broker that is the leader of the faulty topics: {noformat} [2017-07-05 05:22:15,564] WARN [Replica Manager on Broker 3]: While recording the replica LEO, the partition [topic2,0] hasn't been created. (kafka.server.ReplicaManager) {noformat} *Detailed version*: Our setup consists of three brokers with ids 1, 2, 3. Broker 2 is the controller. We create 7 topics called topic1, topic2, topic3, topic4, topic5, topic6, topic7. Sometimes (sporadically) some of the topics are faulty. In the particular example I describe here the faulty topics are topics are topic6, topic4, topic2, topic3. The faulty topics all have the same leader broker 3. If we do a kafka-topics.sh --describe on the topics we see that for topics that do not have broker 3 as leader, the in sync replicas report that broker 3 is not synced: {noformat} bin/kafka-topics.sh --describe --zookeeper zookeeper:2181/kafka Topic:topic6PartitionCount:1ReplicationFactor:3 Configs: Topic: topic6 Partition: 0Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic:topic5PartitionCount:1ReplicationFactor:3 Configs:retention.ms=30 Topic: topic5 Partition: 0Leader: 2 Replicas: 2,3,1 Isr: 2,1 Topic:topic7PartitionCount:1ReplicationFactor:3 Configs: Topic: topic7 Partition: 0Leader: 1 Replicas: 1,3,2 Isr: 1,2 Topic:topic4PartitionCount:1ReplicationFactor:3 Configs: Topic: topic4 Partition: 0Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic:topic1PartitionCount:1ReplicationFactor:3 Configs: Topic: topic1 Partition: 0Leader: 2 Replicas: 2,1,3 Isr: 2,1 Topic:topic2PartitionCount:1ReplicationFactor:3 Configs: Topic: topic2 Partition: 0Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic:topic3PartitionCount:1ReplicationFactor:3 Configs: Topic: topic3 Partition: 0Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 {noformat} While for the faulty topics it is reported that all replicas are in sync. Also, the topic directories under the log.dir folder were not created in the faulty broker 3. We see the following logs in broker 3, which is the leader of the faulty topics: {noformat} [2017-07-05 05:22:15,564] WARN [Replica Manager on Broker 3]: While recording the replica LEO, the partition [topic2,0] hasn't been created. (kafka.server.ReplicaManager) {noformat} The above log is logged continuously. and the following error logs in the other 2 brokers, the replicas: {noformat} ERROR [ReplicaFetcherThread-0-3], Error for partition [topic3,0] to broker 3:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition {noformat} Again the above log is logged continuously. The issue described above occurs immediately after the deployment of the kafka cluster. A restart of the faulty broker (3 in this case) fixes the problem and the faulty topics work normally. I have also attached the broker configuration we use. Do you have any idea what might cause this issue? Best regards, Klearchos -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5070) org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the state directory: /opt/rocksdb/pulse10/0_18
[ https://issues.apache.org/jira/browse/KAFKA-5070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076489#comment-16076489 ] Damian Guy commented on KAFKA-5070: --- [~mjsax] https://issues.apache.org/jira/browse/KAFKA-5167 hasn't fixed the issue. > org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the > state directory: /opt/rocksdb/pulse10/0_18 > > > Key: KAFKA-5070 > URL: https://issues.apache.org/jira/browse/KAFKA-5070 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 > Environment: Linux Version >Reporter: Dhana >Assignee: Matthias J. Sax > Attachments: RocksDB_LockStateDirec.7z > > > Notes: we run two instance of consumer in two difference machines/nodes. > we have 400 partitions. 200 stream threads/consumer, with 2 consumer. > We perform HA test(on rebalance - shutdown of one of the consumer/broker), we > see this happening > Error: > 2017-04-05 11:36:09.352 WARN StreamThread:1184 StreamThread-66 - Could not > create task 0_115. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_115] Failed to lock > the state directory: /opt/rocksdb/pulse10/0_115 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102) > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5563) Clarify handling of connector name in config
[ https://issues.apache.org/jira/browse/KAFKA-5563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076494#comment-16076494 ] Sönke Liebau commented on KAFKA-5563: - The fix for this issue blacklists a few problematic characters for Connector names as a first step. > Clarify handling of connector name in config > - > > Key: KAFKA-5563 > URL: https://issues.apache.org/jira/browse/KAFKA-5563 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Sönke Liebau >Priority: Minor > > The connector name is currently being stored in two places, once at the root > level of the connector and once in the config: > {code:java} > { > "name": "test", > "config": { > "connector.class": > "org.apache.kafka.connect.tools.MockSinkConnector", > "tasks.max": "3", > "topics": "test-topic", > "name": "test" > }, > "tasks": [ > { > "connector": "test", > "task": 0 > } > ] > } > {code} > If no name is provided in the "config" element, then the name from the root > level is [copied there when the connector is being > created|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L95]. > If however a name is provided in the config then it is not touched, which > means it is possible to create a connector with a different name at the root > level and in the config like this: > {code:java} > { > "name": "test1", > "config": { > "connector.class": > "org.apache.kafka.connect.tools.MockSinkConnector", > "tasks.max": "3", > "topics": "test-topic", > "name": "differentname" > }, > "tasks": [ > { > "connector": "test1", > "task": 0 > } > ] > } > {code} > I am not aware of any issues that this currently causes, but it is at least > confusing and probably not intended behavior and definitely bears potential > for bugs, if different functions take the name from different places. > Would it make sense to add a check to reject requests that provide different > names in the request and the config section? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Issue Comment Deleted] (KAFKA-5070) org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the state directory: /opt/rocksdb/pulse10/0_18
[ https://issues.apache.org/jira/browse/KAFKA-5070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5070: -- Comment: was deleted (was: [~mjsax] https://issues.apache.org/jira/browse/KAFKA-5167 hasn't fixed the issue.) > org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the > state directory: /opt/rocksdb/pulse10/0_18 > > > Key: KAFKA-5070 > URL: https://issues.apache.org/jira/browse/KAFKA-5070 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 > Environment: Linux Version >Reporter: Dhana >Assignee: Matthias J. Sax > Attachments: RocksDB_LockStateDirec.7z > > > Notes: we run two instance of consumer in two difference machines/nodes. > we have 400 partitions. 200 stream threads/consumer, with 2 consumer. > We perform HA test(on rebalance - shutdown of one of the consumer/broker), we > see this happening > Error: > 2017-04-05 11:36:09.352 WARN StreamThread:1184 StreamThread-66 - Could not > create task 0_115. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_115] Failed to lock > the state directory: /opt/rocksdb/pulse10/0_115 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102) > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4490) Add Global Table support to Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076675#comment-16076675 ] frank t commented on KAFKA-4490: in my project we need of this: KTable join(final GlobalKTable globalTable, final KeyValueMapper keyMapper, final ValueJoiner joiner); when please you release it ? it is strictly necessary there is another way to have the same results ? > Add Global Table support to Kafka Streams > - > > Key: KAFKA-4490 > URL: https://issues.apache.org/jira/browse/KAFKA-4490 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 0.10.2.0 > > > As per KIP-99 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams > Add support for Global Tables -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4468) Correctly calculate the window end timestamp after read from state stores
[ https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076830#comment-16076830 ] Matthias J. Sax commented on KAFKA-4468: Thanks for following up. About storing the window size -- it would be good if we could avoid this. The example you are doing using windows with different sized -- but at DSL level, we know that all windows of a single operator have the same size. Thus, we might be able to avoid storing the size. Of course, if would be more flexible if we just store the size and sacrifice the space saving. Don't have a strong opinion on this. Would be nice to get the opinion of others, too. > Correctly calculate the window end timestamp after read from state stores > - > > Key: KAFKA-4468 > URL: https://issues.apache.org/jira/browse/KAFKA-4468 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: architecture > > When storing the WindowedStore on the persistent KV store, we only use the > start timestamp of the window as part of the combo-key as (start-timestamp, > key). The reason that we do not add the end-timestamp as well is that we can > always calculate it from the start timestamp + window_length, and hence we > can save 8 bytes per key on the persistent KV store. > However, after read it (via {{WindowedDeserializer}}) we do not set its end > timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix > this by calculating its end timestamp as mentioned above. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4490) Add Global Table support to Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076901#comment-16076901 ] Matthias J. Sax commented on KAFKA-4490: See: https://issues.apache.org/jira/browse/KAFKA-4628 > Add Global Table support to Kafka Streams > - > > Key: KAFKA-4490 > URL: https://issues.apache.org/jira/browse/KAFKA-4490 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 0.10.2.0 > > > As per KIP-99 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams > Add support for Global Tables -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5464) StreamsKafkaClient should not use StreamsConfig.POLL_MS_CONFIG
[ https://issues.apache.org/jira/browse/KAFKA-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076907#comment-16076907 ] ASF GitHub Bot commented on KAFKA-5464: --- GitHub user mjsax opened a pull request: https://github.com/apache/kafka/pull/3496 KAFKA-5464: Follow up. Increase poll timeout You can merge this pull request into a Git repository by running: $ git pull https://github.com/mjsax/kafka KAFKA-5464-follow-up Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3496.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3496 commit 4fdca2b613d0934ded999ca8a1d445954fdcb6c4 Author: Matthias J. Sax Date: 2017-07-06T17:04:37Z KAFKA-5464: Follow up. Increase poll timeout > StreamsKafkaClient should not use StreamsConfig.POLL_MS_CONFIG > -- > > Key: KAFKA-5464 > URL: https://issues.apache.org/jira/browse/KAFKA-5464 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > Fix For: 0.10.2.2, 0.11.0.1, 0.11.1.0 > > > In {{StreamsKafkaClient}} we use an {{NetworkClient}} internally and call > {{poll}} using {{StreamsConfig.POLL_MS_CONFIG}} as timeout. > However, {{StreamsConfig.POLL_MS_CONFIG}} is solely meant to be applied to > {{KafkaConsumer.poll()}} and it's incorrect to use it for the > {{NetworkClient}}. If the config is increased, this can lead to a infinite > rebalance and rebalance on the client side is increased and thus, the client > is not able to meet broker enforced timeouts anymore. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4107) Support offset reset capability in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076915#comment-16076915 ] Randall Hauch commented on KAFKA-4107: -- This is definitely a good tool to have. There's a new Consumer Reset Tool in Apache Kafka 0.11, but Kafka Connect source connector offsets are a different beast. We'll need a KIP that explains the proposal for the new tool and how it's used. > Support offset reset capability in Kafka Connect > > > Key: KAFKA-4107 > URL: https://issues.apache.org/jira/browse/KAFKA-4107 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jason Gustafson > > It would be useful in some cases to be able to reset connector offsets. For > example, if a topic in Kafka corresponding to a source database is > accidentally deleted (or deleted because of corrupt data), an administrator > may want to reset offsets and reproduce the log from the beginning. It may > also be useful to have support for overriding offsets, but that seems like a > less likely use case. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5565) Add a broker metric specifying the number of consumer group rebalances in progress
Colin P. McCabe created KAFKA-5565: -- Summary: Add a broker metric specifying the number of consumer group rebalances in progress Key: KAFKA-5565 URL: https://issues.apache.org/jira/browse/KAFKA-5565 Project: Kafka Issue Type: Improvement Reporter: Colin P. McCabe We should add a broker metric specifying the number of consumer group rebalances in progress. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5566) Instable test QueryableStateIntegrationTest.shouldAllowToQueryAfterThreadDied
Matthias J. Sax created KAFKA-5566: -- Summary: Instable test QueryableStateIntegrationTest.shouldAllowToQueryAfterThreadDied Key: KAFKA-5566 URL: https://issues.apache.org/jira/browse/KAFKA-5566 Project: Kafka Issue Type: Bug Components: streams, unit tests Reporter: Matthias J. Sax Assignee: Eno Thereska This test failed about 4 times in the last 24h. Always the same stack trace so far: {noformat} java.lang.AssertionError: Condition not met within timeout 3. wait for agg to be '123' at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:274) at org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowToQueryAfterThreadDied(QueryableStateIntegrationTest.java:793) {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-3745) Consider adding join key to ValueJoiner interface
[ https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-3745: -- Assignee: Jeyhun Karimov (was: Sreepathi Prasanna) > Consider adding join key to ValueJoiner interface > - > > Key: KAFKA-3745 > URL: https://issues.apache.org/jira/browse/KAFKA-3745 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Jeyhun Karimov >Priority: Minor > Labels: api, needs-kip, newbie > > In working with Kafka Stream joining, it's sometimes the case that a join key > is not actually present in the values of the joins themselves (if, for > example, a previous transform generated an ephemeral join key.) In such > cases, the actual key of the join is not available in the ValueJoiner > implementation to be used to construct the final joined value. This can be > worked around by explicitly threading the join key into the value if needed, > but it seems like extending the interface to pass the join key along as well > would be helpful. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-3745) Consider adding join key to ValueJoiner interface
[ https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-3745. Resolution: Duplicate See https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner > Consider adding join key to ValueJoiner interface > - > > Key: KAFKA-3745 > URL: https://issues.apache.org/jira/browse/KAFKA-3745 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Jeyhun Karimov >Priority: Minor > Labels: api, needs-kip, newbie > > In working with Kafka Stream joining, it's sometimes the case that a join key > is not actually present in the values of the joins themselves (if, for > example, a previous transform generated an ephemeral join key.) In such > cases, the actual key of the join is not available in the ValueJoiner > implementation to be used to construct the final joined value. This can be > worked around by explicitly threading the join key into the value if needed, > but it seems like extending the interface to pass the join key along as well > would be helpful. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-4726) ValueMapper should have (read) access to key
[ https://issues.apache.org/jira/browse/KAFKA-4726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-4726: --- Labels: kip (was: ) > ValueMapper should have (read) access to key > > > Key: KAFKA-4726 > URL: https://issues.apache.org/jira/browse/KAFKA-4726 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.1 >Reporter: Steven Schlansker >Assignee: Jeyhun Karimov > Labels: kip > > {{ValueMapper}} should have read-only access to the key for the value it is > mapping. Sometimes the value transformation will depend on the key. > It is possible to do this with a full blown {{KeyValueMapper}} but that loses > the promise that you won't change the key -- so you might introduce a > re-keying phase that is totally unnecessary. It also requires you to return > an identity KeyValue object which costs something to construct (unless we are > lucky and the optimizer elides it). > [ If mapValues() is guaranteed to be no less efficient than map() the issue > may be moot, but I presume there are some optimizations that are valid with > the former but not latter. ] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-4726) ValueMapper should have (read) access to key
[ https://issues.apache.org/jira/browse/KAFKA-4726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-4726. Resolution: Duplicate See https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner > ValueMapper should have (read) access to key > > > Key: KAFKA-4726 > URL: https://issues.apache.org/jira/browse/KAFKA-4726 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.1 >Reporter: Steven Schlansker >Assignee: Jeyhun Karimov > Labels: kip > > {{ValueMapper}} should have read-only access to the key for the value it is > mapping. Sometimes the value transformation will depend on the key. > It is possible to do this with a full blown {{KeyValueMapper}} but that loses > the promise that you won't change the key -- so you might introduce a > re-keying phase that is totally unnecessary. It also requires you to return > an identity KeyValue object which costs something to construct (unless we are > lucky and the optimizer elides it). > [ If mapValues() is guaranteed to be no less efficient than map() the issue > may be moot, but I presume there are some optimizations that are valid with > the former but not latter. ] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-4726) ValueMapper should have (read) access to key
[ https://issues.apache.org/jira/browse/KAFKA-4726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-4726: -- Assignee: Jeyhun Karimov > ValueMapper should have (read) access to key > > > Key: KAFKA-4726 > URL: https://issues.apache.org/jira/browse/KAFKA-4726 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.1 >Reporter: Steven Schlansker >Assignee: Jeyhun Karimov > Labels: kip > > {{ValueMapper}} should have read-only access to the key for the value it is > mapping. Sometimes the value transformation will depend on the key. > It is possible to do this with a full blown {{KeyValueMapper}} but that loses > the promise that you won't change the key -- so you might introduce a > re-keying phase that is totally unnecessary. It also requires you to return > an identity KeyValue object which costs something to construct (unless we are > lucky and the optimizer elides it). > [ If mapValues() is guaranteed to be no less efficient than map() the issue > may be moot, but I presume there are some optimizations that are valid with > the former but not latter. ] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-3745) Consider adding join key to ValueJoiner interface
[ https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-3745: --- Labels: api kip (was: api needs-kip newbie) > Consider adding join key to ValueJoiner interface > - > > Key: KAFKA-3745 > URL: https://issues.apache.org/jira/browse/KAFKA-3745 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Jeyhun Karimov >Priority: Minor > Labels: api, kip > > In working with Kafka Stream joining, it's sometimes the case that a join key > is not actually present in the values of the joins themselves (if, for > example, a previous transform generated an ephemeral join key.) In such > cases, the actual key of the join is not available in the ValueJoiner > implementation to be used to construct the final joined value. This can be > worked around by explicitly threading the join key into the value if needed, > but it seems like extending the interface to pass the join key along as well > would be helpful. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-4218) Enable access to key in ValueTransformer
[ https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-4218: --- Labels: api kip (was: api needs-kip newbie) > Enable access to key in ValueTransformer > > > Key: KAFKA-4218 > URL: https://issues.apache.org/jira/browse/KAFKA-4218 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Jeyhun Karimov > Labels: api, kip > > While transforming values via {{KStream.transformValues}} and > {{ValueTransformer}}, the key associated with the value may be needed, even > if it is not changed. For instance, it may be used to access stores. > As of now, the key is not available within these methods and interfaces, > leading to the use of {{KStream.transform}} and {{Transformer}}, and the > unnecessary creation of new {{KeyValue}} objects. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-4218) Enable access to key in ValueTransformer
[ https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-4218: --- Description: While transforming values via {{KStream.transformValues}} and {{ValueTransformer}}, the key associated with the value may be needed, even if it is not changed. For instance, it may be used to access stores. As of now, the key is not available within these methods and interfaces, leading to the use of {{KStream.transform}} and {{Transformer}}, and the unnecessary creation of new {{KeyValue}} objects. KIP-149: https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner was: While transforming values via {{KStream.transformValues}} and {{ValueTransformer}}, the key associated with the value may be needed, even if it is not changed. For instance, it may be used to access stores. As of now, the key is not available within these methods and interfaces, leading to the use of {{KStream.transform}} and {{Transformer}}, and the unnecessary creation of new {{KeyValue}} objects. > Enable access to key in ValueTransformer > > > Key: KAFKA-4218 > URL: https://issues.apache.org/jira/browse/KAFKA-4218 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Jeyhun Karimov > Labels: api, kip > > While transforming values via {{KStream.transformValues}} and > {{ValueTransformer}}, the key associated with the value may be needed, even > if it is not changed. For instance, it may be used to access stores. > As of now, the key is not available within these methods and interfaces, > leading to the use of {{KStream.transform}} and {{Transformer}}, and the > unnecessary creation of new {{KeyValue}} objects. > KIP-149: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16077303#comment-16077303 ] Guozhang Wang commented on KAFKA-5545: -- I am investigating why the thread does not end its current loop and move on to the shutdown process; while I'm doing that, could you share what you meant above by saying {code} If I do close with in connection timeout all goes well. But if I issue close after connection timeout the threads are stuck {code} What "connection timeout" are you referring to here? > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) >
[jira] [Commented] (KAFKA-5440) Kafka Streams report state RUNNING even if all threads are dead
[ https://issues.apache.org/jira/browse/KAFKA-5440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16077334#comment-16077334 ] Matthias J. Sax commented on KAFKA-5440: Is this resolved with KAFKA-5372 ? \cc [~enothereska] > Kafka Streams report state RUNNING even if all threads are dead > --- > > Key: KAFKA-5440 > URL: https://issues.apache.org/jira/browse/KAFKA-5440 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Matthias J. Sax > > From the mailing list: > {quote} > Hi All, > We recently implemented a health check for a Kafka Streams based application. > The health check is simply checking the state of Kafka Streams by calling > KafkaStreams.state(). It reports healthy if it’s not in PENDING_SHUTDOWN or > NOT_RUNNING states. > We truly appreciate having the possibility to easily check the state of Kafka > Streams but to our surprise we noticed that KafkaStreams.state() returns > RUNNING even though all StreamThreads has crashed and reached NOT_RUNNING > state. Is this intended behaviour or is it a bug? Semantically it seems weird > to me that KafkaStreams would say it’s RUNNING when it is in fact not > consuming anything since all underlying working threads has crashed. > If this is intended behaviour I would appreciate an explanation of why that > is the case. Also in that case, how could I determine if the consumption from > Kafka hasn’t crashed? > If this is not intended behaviour, how fast could I expect it to be fixed? I > wouldn’t mind fixing it myself but I’m not sure if this is considered trivial > or big enough to require a JIRA. Also, if I would implement a fix I’d like > your input on what would be a reasonable solution. By just inspecting to code > I have an idea but I’m not sure I understand all the implication so I’d be > happy to hear your thoughts first. > {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5318) Streams state may be misleading
[ https://issues.apache.org/jira/browse/KAFKA-5318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16077336#comment-16077336 ] Matthias J. Sax commented on KAFKA-5318: Is this contained by KAFKA-5372 ? \cc [~enothereska] > Streams state may be misleading > --- > > Key: KAFKA-5318 > URL: https://issues.apache.org/jira/browse/KAFKA-5318 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Joao > > I rely on "org.apache.kafka.streams.KafkaStreams#state" to know if my current > stream instance is properly running. If it becomes unhealthy my provisioning > system (Kubernetes) automatically restarts/replaces the instance. > One of such instance encountered bug > https://issues.apache.org/jira/browse/KAFKA-5167. > The issue is that during the whole time my instance was affected by the > linked bug, the stream state was considered healthy when in fact it was not. > My instance did not recover automatically from the LockException and I > happened to notice something was wrong because I monitor the stream delay, > which went into abnormal values. > This ultimately means that the kafka stream state is unreliable at describing > if an instance is actually running as intended. > There are some improvements in the works from what I was told, such as > https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5562) Do streams state directory cleanup on a single thread
[ https://issues.apache.org/jira/browse/KAFKA-5562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16077341#comment-16077341 ] Matthias J. Sax commented on KAFKA-5562: Seems to be related to https://issues.apache.org/jira/browse/KAFKA-4890 > Do streams state directory cleanup on a single thread > - > > Key: KAFKA-5562 > URL: https://issues.apache.org/jira/browse/KAFKA-5562 > Project: Kafka > Issue Type: Bug >Reporter: Damian Guy >Assignee: Damian Guy > > Currently in streams we clean up old state directories every so often (as > defined by {{state.cleanup.delay.ms}}). However, every StreamThread runs the > cleanup, which is both unnecessary and can potentially lead to race > conditions. > It would be better to perform the state cleanup on a single thread and only > when the {{KafkaStreams}} instance is in a running state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-4830) Augment KStream.print() to allow users pass in extra parameters in the printed string
[ https://issues.apache.org/jira/browse/KAFKA-4830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-4830: --- Fix Version/s: 0.11.1.0 > Augment KStream.print() to allow users pass in extra parameters in the > printed string > - > > Key: KAFKA-4830 > URL: https://issues.apache.org/jira/browse/KAFKA-4830 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: james chien > Labels: kip, newbie > Fix For: 0.11.1.0 > > > Today {{KStream.print}} use the hard-coded result string as: > {code} > "[" + this.streamName + "]: " + keyToPrint + " , " + valueToPrint > {code} > And some users are asking to augment this so that they can customize the > output string as {{KStream.print(KeyValueMapper)}} : > {code} > "[" + this.streamName + "]: " + mapper.apply(keyToPrint, valueToPrint) > {code} > Original KIP-132: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-132%3A+Augment+KStream.print+to+allow+extra+parameters+in+the+printed+string > Duplicating KIP-160: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-160%3A+Augment+KStream.print%28%29+to+allow+users+pass+in+extra+parameters+in+the+printed+string -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-5565) Add a broker metric specifying the number of consumer group rebalances in progress
[ https://issues.apache.org/jira/browse/KAFKA-5565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin P. McCabe reassigned KAFKA-5565: -- Assignee: Colin P. McCabe > Add a broker metric specifying the number of consumer group rebalances in > progress > -- > > Key: KAFKA-5565 > URL: https://issues.apache.org/jira/browse/KAFKA-5565 > Project: Kafka > Issue Type: Improvement >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe > > We should add a broker metric specifying the number of consumer group > rebalances in progress. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-4125) Provide low-level Processor API meta data in DSL layer
[ https://issues.apache.org/jira/browse/KAFKA-4125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-4125: -- Assignee: Jeyhun Karimov (was: Bill Bejeck) > Provide low-level Processor API meta data in DSL layer > -- > > Key: KAFKA-4125 > URL: https://issues.apache.org/jira/browse/KAFKA-4125 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Jeyhun Karimov >Priority: Minor > Labels: kip > Fix For: 0.11.1.0 > > > For Processor API, user can get meta data like record offset, timestamp etc > via the provided {{Context}} object. It might be useful to allow uses to > access this information in DSL layer, too. > The idea would be, to do it "the Flink way", ie, by providing > RichFunctions; {{mapValue()}} for example. > Is takes a {{ValueMapper}} that only has method > {noformat} > V2 apply(V1 value); > {noformat} > Thus, you cannot get any meta data within apply (it's completely "blind"). > We would add two more interfaces: {{RichFunction}} with a method > {{open(Context context)}} and > {noformat} > RichValueMapper extends ValueMapper, RichFunction > {noformat} > This way, the user can chose to implement Rich- or Standard-function and > we do not need to change existing APIs. Both can be handed into > {{KStream.mapValues()}} for example. Internally, we check if a Rich > function is provided, and if yes, hand in the {{Context}} object once, to > make it available to the user who can now access it within {{apply()}} -- or > course, the user must set a member variable in {{open()}} to hold the > reference to the Context object. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-4125) Provide low-level Processor API meta data in DSL layer
[ https://issues.apache.org/jira/browse/KAFKA-4125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-4125: --- Labels: kip (was: ) > Provide low-level Processor API meta data in DSL layer > -- > > Key: KAFKA-4125 > URL: https://issues.apache.org/jira/browse/KAFKA-4125 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Bill Bejeck >Priority: Minor > Labels: kip > Fix For: 0.11.1.0 > > > For Processor API, user can get meta data like record offset, timestamp etc > via the provided {{Context}} object. It might be useful to allow uses to > access this information in DSL layer, too. > The idea would be, to do it "the Flink way", ie, by providing > RichFunctions; {{mapValue()}} for example. > Is takes a {{ValueMapper}} that only has method > {noformat} > V2 apply(V1 value); > {noformat} > Thus, you cannot get any meta data within apply (it's completely "blind"). > We would add two more interfaces: {{RichFunction}} with a method > {{open(Context context)}} and > {noformat} > RichValueMapper extends ValueMapper, RichFunction > {noformat} > This way, the user can chose to implement Rich- or Standard-function and > we do not need to change existing APIs. Both can be handed into > {{KStream.mapValues()}} for example. Internally, we check if a Rich > function is provided, and if yes, hand in the {{Context}} object once, to > make it available to the user who can now access it within {{apply()}} -- or > course, the user must set a member variable in {{open()}} to hold the > reference to the Context object. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-4125) Provide low-level Processor API meta data in DSL layer
[ https://issues.apache.org/jira/browse/KAFKA-4125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-4125: --- Fix Version/s: 0.11.1.0 > Provide low-level Processor API meta data in DSL layer > -- > > Key: KAFKA-4125 > URL: https://issues.apache.org/jira/browse/KAFKA-4125 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Bill Bejeck >Priority: Minor > Labels: kip > Fix For: 0.11.1.0 > > > For Processor API, user can get meta data like record offset, timestamp etc > via the provided {{Context}} object. It might be useful to allow uses to > access this information in DSL layer, too. > The idea would be, to do it "the Flink way", ie, by providing > RichFunctions; {{mapValue()}} for example. > Is takes a {{ValueMapper}} that only has method > {noformat} > V2 apply(V1 value); > {noformat} > Thus, you cannot get any meta data within apply (it's completely "blind"). > We would add two more interfaces: {{RichFunction}} with a method > {{open(Context context)}} and > {noformat} > RichValueMapper extends ValueMapper, RichFunction > {noformat} > This way, the user can chose to implement Rich- or Standard-function and > we do not need to change existing APIs. Both can be handed into > {{KStream.mapValues()}} for example. Internally, we check if a Rich > function is provided, and if yes, hand in the {{Context}} object once, to > make it available to the user who can now access it within {{apply()}} -- or > course, the user must set a member variable in {{open()}} to hold the > reference to the Context object. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-4125) Provide low-level Processor API meta data in DSL layer
[ https://issues.apache.org/jira/browse/KAFKA-4125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-4125: --- Description: For Processor API, user can get meta data like record offset, timestamp etc via the provided {{Context}} object. It might be useful to allow uses to access this information in DSL layer, too. The idea would be, to do it "the Flink way", ie, by providing RichFunctions; {{mapValue()}} for example. Is takes a {{ValueMapper}} that only has method {noformat} V2 apply(V1 value); {noformat} Thus, you cannot get any meta data within apply (it's completely "blind"). We would add two more interfaces: {{RichFunction}} with a method {{open(Context context)}} and {noformat} RichValueMapper extends ValueMapper, RichFunction {noformat} This way, the user can chose to implement Rich- or Standard-function and we do not need to change existing APIs. Both can be handed into {{KStream.mapValues()}} for example. Internally, we check if a Rich function is provided, and if yes, hand in the {{Context}} object once, to make it available to the user who can now access it within {{apply()}} -- or course, the user must set a member variable in {{open()}} to hold the reference to the Context object. KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams was: For Processor API, user can get meta data like record offset, timestamp etc via the provided {{Context}} object. It might be useful to allow uses to access this information in DSL layer, too. The idea would be, to do it "the Flink way", ie, by providing RichFunctions; {{mapValue()}} for example. Is takes a {{ValueMapper}} that only has method {noformat} V2 apply(V1 value); {noformat} Thus, you cannot get any meta data within apply (it's completely "blind"). We would add two more interfaces: {{RichFunction}} with a method {{open(Context context)}} and {noformat} RichValueMapper extends ValueMapper, RichFunction {noformat} This way, the user can chose to implement Rich- or Standard-function and we do not need to change existing APIs. Both can be handed into {{KStream.mapValues()}} for example. Internally, we check if a Rich function is provided, and if yes, hand in the {{Context}} object once, to make it available to the user who can now access it within {{apply()}} -- or course, the user must set a member variable in {{open()}} to hold the reference to the Context object. > Provide low-level Processor API meta data in DSL layer > -- > > Key: KAFKA-4125 > URL: https://issues.apache.org/jira/browse/KAFKA-4125 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Jeyhun Karimov >Priority: Minor > Labels: kip > Fix For: 0.11.1.0 > > > For Processor API, user can get meta data like record offset, timestamp etc > via the provided {{Context}} object. It might be useful to allow uses to > access this information in DSL layer, too. > The idea would be, to do it "the Flink way", ie, by providing > RichFunctions; {{mapValue()}} for example. > Is takes a {{ValueMapper}} that only has method > {noformat} > V2 apply(V1 value); > {noformat} > Thus, you cannot get any meta data within apply (it's completely "blind"). > We would add two more interfaces: {{RichFunction}} with a method > {{open(Context context)}} and > {noformat} > RichValueMapper extends ValueMapper, RichFunction > {noformat} > This way, the user can chose to implement Rich- or Standard-function and > we do not need to change existing APIs. Both can be handed into > {{KStream.mapValues()}} for example. Internally, we check if a Rich > function is provided, and if yes, hand in the {{Context}} object once, to > make it available to the user who can now access it within {{apply()}} -- or > course, the user must set a member variable in {{open()}} to hold the > reference to the Context object. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5567) With transformations that mutate the topic-partition committing offsets should to refer to the original topic-partition
Konstantine Karantasis created KAFKA-5567: - Summary: With transformations that mutate the topic-partition committing offsets should to refer to the original topic-partition Key: KAFKA-5567 URL: https://issues.apache.org/jira/browse/KAFKA-5567 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 0.11.0.0, 0.10.2.1, 0.10.2.0 Reporter: Konstantine Karantasis Assignee: Konstantine Karantasis Fix For: 0.11.0.1 When a chain of transformations (SMTs) that mutate a record's topic-partition is applied, then Connect is unable to map the transformed record to its original topic-partition. This affects committing offsets. Currently, in order to reproduce the issue one could use the {{TimestampRouter}} transformation with a sink connector such as the {{FileStreamSinkConnector}}. In this ticket we'll address the issue for connectors that don't manage/commit their offsets themselves. For the connectors that do such management, broader API changes are required to supply the connectors with the necessary information that will allow them to map a transformed record to the original. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5568) Transformations that mutate topic-partitions break sink connectors that manage their own configuration
Ewen Cheslack-Postava created KAFKA-5568: Summary: Transformations that mutate topic-partitions break sink connectors that manage their own configuration Key: KAFKA-5568 URL: https://issues.apache.org/jira/browse/KAFKA-5568 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 0.11.0.0, 0.10.2.1, 0.10.2.0 Reporter: Ewen Cheslack-Postava KAFKA-5567 describes how offset commits for sink connectors are broken if a record's topic-partition is mutated by an SMT, e.g RegexRouter or TimestampRouter. This is also a problem for sink connectors that manage their own offsets, i.e. those that store offsets elsewhere and call SinkTaskContext.rewind(). In this case, the transformation has already been applied by the time the SinkTask sees it, so there is no way it could correctly track offsets and call rewind() with valid values. For example, this would make the offset tracking that Confluent's HDFS connector does by working with filenames no longer work. Even if they were stored separately in a file rather than relying on filenames, it still wouldn't have ever had the correct offsets to write to that file. There are a couple of options: 1. Decide that this is an acceptable consequence of combining SMTs with sink connectors and it's a limitation we accept. You can either transform the data via Kafka Streams instead or accept that you can't do these "routing" type operations in the sink connector unless it supports it natively. This *might* not be the wrong choice since we think there are very few connectors that track their own offsets. In the case of HDFS, we might rarely hit this issue because it supports its own file/directory partitioning schemes anyway so doing this via SMTs isn't as necessary there. 2. Try to expose the original record information to the sink connector via the records. I can think of 2 ways this could be done. The first is to attach the original record to each SinkRecord. The cost here is relatively high in terms of memory, especially for sink connectors that need to buffer data. The second is to add fields to SinkRecords for originalTopic() and originalPartition(). This feels a bit ugly to me but might be the least intrusive change API-wise and we can guarantee those fields aren't overwritten by not allowing public constructors to set them. 3. Try to expose the original record information to the sink connector via a new pre-processing callback. The idea is similar to preCommit, but instead would happen before any processing occurs. Taken to its logical conclusion this turns into a sort of interceptor interface (preConversion, preTransformation, put, and preCommit). 4. Add something to the Context that allows the connector to get back at the original information. Maybe some sort of IdentityMap originalPutRecords() that would let you get a mapping back to the original records. One nice aspect of this is that the connector can hold onto the original only if it needs it. 5. A very intrusive change/extension to the SinkTask API that passes in pairs of records. Accomplishes the same as 2 but requires what I think are more complicated changes. Mentioned for completeness. 6. Something else I haven't thought of? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5407) Mirrormaker dont start after upgrade
[ https://issues.apache.org/jira/browse/KAFKA-5407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16077458#comment-16077458 ] Fernando Vega commented on KAFKA-5407: -- [~huxi_2b] So we manage to bring the mirrormakers up. However there's things that I dont get it, I will like to see if you can help us with this: So Basicaly we upgrade to 10.2.1 from 0.8.2-beta Producers :=> We use the new producer config using the bootstrap parameter and works fine Brokers :=> Using the config file that is pretty much the same. Consumers :=> with this one if I use bootstrap parameter doesnt work, however if I use the old configuration does work fine. Using the zookeeper parameter. So thats the first thing I dont get. Second thing is now one of the cluster were we replicate has 6 hosts if we bring all of them up it takes for ever to revalance and eventually the host dies. If we bring 2 up seem to take the load but is not the optimal. This has being like a 2 weeks or more that we cannot figure out was going on. Please if you can provide me with some guideline that will be great thanks > Mirrormaker dont start after upgrade > > > Key: KAFKA-5407 > URL: https://issues.apache.org/jira/browse/KAFKA-5407 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.1 > Environment: Operating system > CentOS 6.8 > HW > Board Mfg : HP > Board Product : ProLiant DL380p Gen8 > CPU's x2 > Product Manufacturer : Intel > Product Name : Intel(R) Xeon(R) CPU E5-2660 v2 @ 2.20GHz > Memory Type : DDR3 SDRAM > SDRAM Capacity: 2048 MB > Total Memory: : 64GB > Hardrives size and layout: > 9 drives using jbod > drive size 3.6TB each >Reporter: Fernando Vega >Priority: Critical > > Currently Im upgrading the cluster from 0.8.2-beta to 0.10.2.1 > So I followed the rolling procedure: > Here the config files: > Consumer > {noformat} > # > # Cluster: repl > # Topic list(goes into command line): > REPL-ams1-global,REPL-atl1-global,REPL-sjc2-global,REPL-ams1-global-PN_HXIDMAP_.*,REPL-atl1-global-PN_HXIDMAP_.*,REPL-sjc2-global-PN_HXIDMAP_.*,REPL-ams1-global-PN_HXCONTEXTUALV2_.*,REPL-atl1-global-PN_HXCONTEXTUALV2_.*,REPL-sjc2-global-PN_HXCONTEXTUALV2_.* > bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092 > group.id=hkg1_cluster > auto.commit.interval.ms=6 > partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor > {noformat} > Producer > {noformat} > hkg1 > # # Producer > # # hkg1 > bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092 > compression.type=gzip > acks=0 > {noformat} > Broker > {noformat} > auto.leader.rebalance.enable=true > delete.topic.enable=true > socket.receive.buffer.bytes=1048576 > socket.send.buffer.bytes=1048576 > default.replication.factor=2 > auto.create.topics.enable=true > num.partitions=1 > num.network.threads=8 > num.io.threads=40 > log.retention.hours=1 > log.roll.hours=1 > num.replica.fetchers=8 > zookeeper.connection.timeout.ms=3 > zookeeper.session.timeout.ms=3 > inter.broker.protocol.version=0.10.2 > log.message.format.version=0.8.2 > {noformat} > I tried also using stock configuraiton with no luck. > The error that I get is this: > {noformat} > 2017-06-07 12:24:45,476] INFO ConsumerConfig values: > auto.commit.interval.ms = 6 > auto.offset.reset = latest > bootstrap.servers = [app454.sjc2.mytest.com:9092, > app455.sjc2.mytest.com:9092, app456.sjc2.mytest.com:9092, > app457.sjc2.mytest.com:9092, app458.sjc2.mytest.com:9092, > app459.sjc2.mytest.com:9092] > check.crcs = true > client.id = MirrorMaker_hkg1-1 > connections.max.idle.ms = 54 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = MirrorMaker_hkg1 > heartbeat.interval.ms = 3000 > interceptor.classes = null > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RoundRobinAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.ms = 50 > request.timeout.ms = 305000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = nu
[jira] [Commented] (KAFKA-5407) Mirrormaker dont start after upgrade
[ https://issues.apache.org/jira/browse/KAFKA-5407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16077460#comment-16077460 ] Fernando Vega commented on KAFKA-5407: -- listed are the config files: Broker: {noformat} ### ### This file is managed by Puppet. ### # See http://kafka.apache.org/documentation.html#brokerconfigs for default values. # The id of the broker. This must be set to a unique integer for each broker. broker.id=37 # The port the socket server listens on port=9092 # A comma seperated list of directories under which to store log files log.dirs=/kafka1/datalog,/kafka2/datalog,/kafka3/datalog,/kafka4/datalog,/kafka5/datalog,/kafka6/datalog,/kafka7/datalog,/kafka8/datalog,/kafka9/datalog,/kafka10/datalog # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=zookeeper1-repl:2181,zookeeper2-repl:2181,zookeeper3-repl:2181,zookeeper4-repl:2181,zookeeper5-repl:2181/replication/kafka # Additional configuration options may follow here auto.leader.rebalance.enable=true delete.topic.enable=true socket.receive.buffer.bytes=1048576 socket.send.buffer.bytes=1048576 default.replication.factor=2 auto.create.topics.enable=true num.partitions=1 num.network.threads=8 num.io.threads=40 log.retention.hours=1 log.roll.hours=1 num.replica.fetchers=8 zookeeper.connection.timeout.ms=3 zookeeper.session.timeout.ms=3 inter.broker.protocol.version=0.10.1 log.message.format.version=0.8.2 {noformat} Producer {noformat} # Producer # ams1 bootstrap.servers=app454.ams1.com:9092,app455.ams1.com:9092,app456.ams1.com:9092,app457.ams1.com:9092,app458.ams1.com:9092,app459.ams1.com:9092 acks=0 compression.type=gzip {noformat} consumer {noformat} zookeeper.connect=zookeeper1-repl.atl1.com:2181,zookeeper2-repl.atl1.com:2181,zookeeper3-repl.atl1.com:2181,zookeeper4-repl.atl1.com:2181,zookeeper5-repl.atl1.com:2181/replication/kafka zookeeper.connection.timeout.ms=3 zookeeper.session.timeout.ms=3 group.id=MirrorMaker_ams1 auto.commit.interval.ms=6 partition.assignment.strategy=roundrobin rebalance.backoff.ms=1 rebalance.max.retries=4 socket.receive.buffer.bytes=262144 zookeeper.sync.time.ms=6000 {noformat} > Mirrormaker dont start after upgrade > > > Key: KAFKA-5407 > URL: https://issues.apache.org/jira/browse/KAFKA-5407 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.1 > Environment: Operating system > CentOS 6.8 > HW > Board Mfg : HP > Board Product : ProLiant DL380p Gen8 > CPU's x2 > Product Manufacturer : Intel > Product Name : Intel(R) Xeon(R) CPU E5-2660 v2 @ 2.20GHz > Memory Type : DDR3 SDRAM > SDRAM Capacity: 2048 MB > Total Memory: : 64GB > Hardrives size and layout: > 9 drives using jbod > drive size 3.6TB each >Reporter: Fernando Vega >Priority: Critical > > Currently Im upgrading the cluster from 0.8.2-beta to 0.10.2.1 > So I followed the rolling procedure: > Here the config files: > Consumer > {noformat} > # > # Cluster: repl > # Topic list(goes into command line): > REPL-ams1-global,REPL-atl1-global,REPL-sjc2-global,REPL-ams1-global-PN_HXIDMAP_.*,REPL-atl1-global-PN_HXIDMAP_.*,REPL-sjc2-global-PN_HXIDMAP_.*,REPL-ams1-global-PN_HXCONTEXTUALV2_.*,REPL-atl1-global-PN_HXCONTEXTUALV2_.*,REPL-sjc2-global-PN_HXCONTEXTUALV2_.* > bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092 > group.id=hkg1_cluster > auto.commit.interval.ms=6 > partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor > {noformat} > Producer > {noformat} > hkg1 > # # Producer > # # hkg1 > bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092 > compression.type=gzip > acks=0 > {noformat} > Broker > {noformat} > auto.leader.rebalance.enable=true > delete.topic.enable=true > socket.receive.buffer.bytes=1048576 > socket.send.buffer.bytes=1048576 > default.replication.factor=2 > auto.create.topics.enable=true > num.partitions=1 > num.network.threads=8 > num.io.threads=40 > log.retention.hours=1 > log.roll.hours=1 > num.replica.fetchers=8 > zookeeper.connection.timeout.ms=3 > zookeeper.session.timeout.ms=3 > inter.broker.protocol.version=0.10.2 > log.message.format.version=0.8.2 > {noformat} > I tried also using stock configuraiton with no luck. > The error that I get is this: > {noformat} > 2017-06-07 12:24:45,476] INFO ConsumerConfig values: > auto.commit.interval.ms = 6 > auto.offset.reset = latest > bootstrap.servers = [app454.sjc2.mytest.com:9092, >
[jira] [Commented] (KAFKA-5568) Transformations that mutate topic-partitions break sink connectors that manage their own configuration
[ https://issues.apache.org/jira/browse/KAFKA-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16077473#comment-16077473 ] Randall Hauch commented on KAFKA-5568: -- Option #2 seems like it's the cleanest, IMO. It still allows connectors to know what the original topic and partition were for each record, though connectors that rely upon this would need to change. The impact on memory is slight, especially if we take a bit of care to ensure the fields refer to the same objects when the topic and partitions were not changed. Option #3 is a bit more flexible and a bit more efficient (memory and GC-wise) at the cost of being harder for connectors to correlate the two `Collection`. And technically, the API uses Collection, which doesn't imply order. Option #4 is better than #3 IMO. Option #5 could be done, but it'd be a complicated migration. Option #6: How about a slight variation of #2, but instead have SinkRecord have an `originalRecord()` method that returns the original record. Overhead is a single field, but since the transformations are already creating new SinkRecords there's almost no additional GC impact or computational cost. > Transformations that mutate topic-partitions break sink connectors that > manage their own configuration > -- > > Key: KAFKA-5568 > URL: https://issues.apache.org/jira/browse/KAFKA-5568 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0 >Reporter: Ewen Cheslack-Postava > Labels: needs-discussion, needs-kip > > KAFKA-5567 describes how offset commits for sink connectors are broken if a > record's topic-partition is mutated by an SMT, e.g RegexRouter or > TimestampRouter. > This is also a problem for sink connectors that manage their own offsets, > i.e. those that store offsets elsewhere and call SinkTaskContext.rewind(). In > this case, the transformation has already been applied by the time the > SinkTask sees it, so there is no way it could correctly track offsets and > call rewind() with valid values. For example, this would make the offset > tracking that Confluent's HDFS connector does by working with filenames no > longer work. Even if they were stored separately in a file rather than > relying on filenames, it still wouldn't have ever had the correct offsets to > write to that file. > There are a couple of options: > 1. Decide that this is an acceptable consequence of combining SMTs with sink > connectors and it's a limitation we accept. You can either transform the data > via Kafka Streams instead or accept that you can't do these "routing" type > operations in the sink connector unless it supports it natively. This *might* > not be the wrong choice since we think there are very few connectors that > track their own offsets. In the case of HDFS, we might rarely hit this issue > because it supports its own file/directory partitioning schemes anyway so > doing this via SMTs isn't as necessary there. > 2. Try to expose the original record information to the sink connector via > the records. I can think of 2 ways this could be done. The first is to attach > the original record to each SinkRecord. The cost here is relatively high in > terms of memory, especially for sink connectors that need to buffer data. The > second is to add fields to SinkRecords for originalTopic() and > originalPartition(). This feels a bit ugly to me but might be the least > intrusive change API-wise and we can guarantee those fields aren't > overwritten by not allowing public constructors to set them. > 3. Try to expose the original record information to the sink connector via a > new pre-processing callback. The idea is similar to preCommit, but instead > would happen before any processing occurs. Taken to its logical conclusion > this turns into a sort of interceptor interface (preConversion, > preTransformation, put, and preCommit). > 4. Add something to the Context that allows the connector to get back at the > original information. Maybe some sort of IdentityMap > originalPutRecords() that would let you get a mapping back to the original > records. One nice aspect of this is that the connector can hold onto the > original only if it needs it. > 5. A very intrusive change/extension to the SinkTask API that passes in pairs > of records. Accomplishes the same as 2 but requires > what I think are more complicated changes. Mentioned for completeness. > 6. Something else I haven't thought of? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5568) Transformations that mutate topic-partitions break sink connectors that manage their own configuration
[ https://issues.apache.org/jira/browse/KAFKA-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16077473#comment-16077473 ] Randall Hauch edited comment on KAFKA-5568 at 7/7/17 2:17 AM: -- Option #2 isn't too bad, IMO. It still allows connectors to know what the original topic and partition were for each record, though connectors that rely upon this would need to change. The impact on memory is slight, especially if we take a bit of care to ensure the fields refer to the same objects when the topic and partitions were not changed. Option #3 is a bit more flexible and a bit more efficient (memory and GC-wise) at the cost of being harder for connectors to correlate the two `Collection`. And technically, the API uses Collection, which doesn't imply order. Option #4 is better than #3 IMO. Option #5 could be done, but it'd be a complicated migration. Option #6: How about a slight variation of #2, but instead have SinkRecord have an `originalRecord()` method that returns the original record. Overhead is a single field, but since the transformations are already creating new SinkRecords there's almost no additional GC impact or computational cost. was (Author: rhauch): Option #2 seems like it's the cleanest, IMO. It still allows connectors to know what the original topic and partition were for each record, though connectors that rely upon this would need to change. The impact on memory is slight, especially if we take a bit of care to ensure the fields refer to the same objects when the topic and partitions were not changed. Option #3 is a bit more flexible and a bit more efficient (memory and GC-wise) at the cost of being harder for connectors to correlate the two `Collection`. And technically, the API uses Collection, which doesn't imply order. Option #4 is better than #3 IMO. Option #5 could be done, but it'd be a complicated migration. Option #6: How about a slight variation of #2, but instead have SinkRecord have an `originalRecord()` method that returns the original record. Overhead is a single field, but since the transformations are already creating new SinkRecords there's almost no additional GC impact or computational cost. > Transformations that mutate topic-partitions break sink connectors that > manage their own configuration > -- > > Key: KAFKA-5568 > URL: https://issues.apache.org/jira/browse/KAFKA-5568 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0 >Reporter: Ewen Cheslack-Postava > Labels: needs-discussion, needs-kip > > KAFKA-5567 describes how offset commits for sink connectors are broken if a > record's topic-partition is mutated by an SMT, e.g RegexRouter or > TimestampRouter. > This is also a problem for sink connectors that manage their own offsets, > i.e. those that store offsets elsewhere and call SinkTaskContext.rewind(). In > this case, the transformation has already been applied by the time the > SinkTask sees it, so there is no way it could correctly track offsets and > call rewind() with valid values. For example, this would make the offset > tracking that Confluent's HDFS connector does by working with filenames no > longer work. Even if they were stored separately in a file rather than > relying on filenames, it still wouldn't have ever had the correct offsets to > write to that file. > There are a couple of options: > 1. Decide that this is an acceptable consequence of combining SMTs with sink > connectors and it's a limitation we accept. You can either transform the data > via Kafka Streams instead or accept that you can't do these "routing" type > operations in the sink connector unless it supports it natively. This *might* > not be the wrong choice since we think there are very few connectors that > track their own offsets. In the case of HDFS, we might rarely hit this issue > because it supports its own file/directory partitioning schemes anyway so > doing this via SMTs isn't as necessary there. > 2. Try to expose the original record information to the sink connector via > the records. I can think of 2 ways this could be done. The first is to attach > the original record to each SinkRecord. The cost here is relatively high in > terms of memory, especially for sink connectors that need to buffer data. The > second is to add fields to SinkRecords for originalTopic() and > originalPartition(). This feels a bit ugly to me but might be the least > intrusive change API-wise and we can guarantee those fields aren't > overwritten by not allowing public constructors to set them. > 3. Try to expose the original record information to the sink connector via a > new pre-proce
[jira] [Commented] (KAFKA-5407) Mirrormaker dont start after upgrade
[ https://issues.apache.org/jira/browse/KAFKA-5407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16077505#comment-16077505 ] huxihx commented on KAFKA-5407: --- [~fvegaucr] Did you find any exceptions thrown when new consumer was used? And, how did you specify `--whitelist`? > Mirrormaker dont start after upgrade > > > Key: KAFKA-5407 > URL: https://issues.apache.org/jira/browse/KAFKA-5407 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.1 > Environment: Operating system > CentOS 6.8 > HW > Board Mfg : HP > Board Product : ProLiant DL380p Gen8 > CPU's x2 > Product Manufacturer : Intel > Product Name : Intel(R) Xeon(R) CPU E5-2660 v2 @ 2.20GHz > Memory Type : DDR3 SDRAM > SDRAM Capacity: 2048 MB > Total Memory: : 64GB > Hardrives size and layout: > 9 drives using jbod > drive size 3.6TB each >Reporter: Fernando Vega >Priority: Critical > > Currently Im upgrading the cluster from 0.8.2-beta to 0.10.2.1 > So I followed the rolling procedure: > Here the config files: > Consumer > {noformat} > # > # Cluster: repl > # Topic list(goes into command line): > REPL-ams1-global,REPL-atl1-global,REPL-sjc2-global,REPL-ams1-global-PN_HXIDMAP_.*,REPL-atl1-global-PN_HXIDMAP_.*,REPL-sjc2-global-PN_HXIDMAP_.*,REPL-ams1-global-PN_HXCONTEXTUALV2_.*,REPL-atl1-global-PN_HXCONTEXTUALV2_.*,REPL-sjc2-global-PN_HXCONTEXTUALV2_.* > bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092 > group.id=hkg1_cluster > auto.commit.interval.ms=6 > partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor > {noformat} > Producer > {noformat} > hkg1 > # # Producer > # # hkg1 > bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092 > compression.type=gzip > acks=0 > {noformat} > Broker > {noformat} > auto.leader.rebalance.enable=true > delete.topic.enable=true > socket.receive.buffer.bytes=1048576 > socket.send.buffer.bytes=1048576 > default.replication.factor=2 > auto.create.topics.enable=true > num.partitions=1 > num.network.threads=8 > num.io.threads=40 > log.retention.hours=1 > log.roll.hours=1 > num.replica.fetchers=8 > zookeeper.connection.timeout.ms=3 > zookeeper.session.timeout.ms=3 > inter.broker.protocol.version=0.10.2 > log.message.format.version=0.8.2 > {noformat} > I tried also using stock configuraiton with no luck. > The error that I get is this: > {noformat} > 2017-06-07 12:24:45,476] INFO ConsumerConfig values: > auto.commit.interval.ms = 6 > auto.offset.reset = latest > bootstrap.servers = [app454.sjc2.mytest.com:9092, > app455.sjc2.mytest.com:9092, app456.sjc2.mytest.com:9092, > app457.sjc2.mytest.com:9092, app458.sjc2.mytest.com:9092, > app459.sjc2.mytest.com:9092] > check.crcs = true > client.id = MirrorMaker_hkg1-1 > connections.max.idle.ms = 54 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = MirrorMaker_hkg1 > heartbeat.interval.ms = 3000 > interceptor.classes = null > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RoundRobinAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.ms = 50 > request.timeout.ms = 305000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 1 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type
[jira] [Commented] (KAFKA-5560) LogManager should be able to create new logs based on free disk space
[ https://issues.apache.org/jira/browse/KAFKA-5560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16077506#comment-16077506 ] huxihx commented on KAFKA-5560: --- [~junrao] Do you see it as a reasonable requirement? > LogManager should be able to create new logs based on free disk space > - > > Key: KAFKA-5560 > URL: https://issues.apache.org/jira/browse/KAFKA-5560 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.11.0.0 >Reporter: huxihx > > Currently, log manager chooses a directory configured in `log.dirs` by > calculating the number partitions in each directory and then choosing the one > with the fewest partitions. But in some real production scenarios where data > volumes of partitions are not even, some disks nearly become full whereas the > others have a lot of spaces which lead to a poor data distribution. > We should offer a new strategy to users to have log manager honor the real > disk free spaces and choose the directory with the most disk space. Maybe a > new broker configuration parameter is needed, `log.directory.strategy` for > instance. Perhaps this needs a new KIP also. > Does it make sense? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5467) setting offset retention minutes to a lower value is not reflecting
[ https://issues.apache.org/jira/browse/KAFKA-5467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16077536#comment-16077536 ] huxihx commented on KAFKA-5467: --- Log cleaner does not clean the active log segment. How many log segments do you have for your __consumer_offsets topic? > setting offset retention minutes to a lower value is not reflecting > --- > > Key: KAFKA-5467 > URL: https://issues.apache.org/jira/browse/KAFKA-5467 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 0.10.1.1 >Reporter: Divya > > We have been observing offsets to be unknown and saw that our offset > retention time was lesser than the log retention period. Inorder to recreate > the same in test environment, we set the offset.retention.minutes to 1 minute > and the log retention time to 168 hours. There were no events written for > more than an hour but still the offsets were not turning to unknown. (The > offset clean interval was 10 minutes.) I would like to know the reason on why > the offset did not turn to unknown in an hour. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5567) With transformations that mutate the topic-partition committing offsets should to refer to the original topic-partition
[ https://issues.apache.org/jira/browse/KAFKA-5567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16077603#comment-16077603 ] ASF GitHub Bot commented on KAFKA-5567: --- GitHub user kkonstantine opened a pull request: https://github.com/apache/kafka/pull/3499 KAFKA-5567: Connect sink worker should commit offsets of original topic partitions You can merge this pull request into a Git repository by running: $ git pull https://github.com/kkonstantine/kafka KAFKA-5567-With-transformations-that-mutate-the-topic-partition-committing-offsets-should-to-refer-to-the-original-topic-partition Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3499.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3499 commit b9eeeb12812f0b237b4356717dfc5bf0e252e8e6 Author: Konstantine Karantasis Date: 2017-07-07T05:13:50Z KAFKA-5567: Connect sink worker should commit offsets of original topic partitions. > With transformations that mutate the topic-partition committing offsets > should to refer to the original topic-partition > --- > > Key: KAFKA-5567 > URL: https://issues.apache.org/jira/browse/KAFKA-5567 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0 >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis > Fix For: 0.11.0.1 > > > When a chain of transformations (SMTs) that mutate a record's > topic-partition is applied, then Connect is unable to map the transformed > record to its original topic-partition. This affects committing offsets. > Currently, in order to reproduce the issue one could use the > {{TimestampRouter}} transformation with a sink connector such as the > {{FileStreamSinkConnector}}. > In this ticket we'll address the issue for connectors that don't > manage/commit their offsets themselves. For the connectors that do such > management, broader API changes are required to supply the connectors with > the necessary information that will allow them to map a transformed record to > the original. -- This message was sent by Atlassian JIRA (v6.4.14#64029)