[jira] [Commented] (KAFKA-6554) Broker doesn't reject Produce request with inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-6554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16369845#comment-16369845 ] ASF GitHub Bot commented on KAFKA-6554: --- rajinisivaram closed pull request #4585: KAFKA-6554; Missing lastOffsetDelta validation before log append URL: https://github.com/apache/kafka/pull/4585 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index ff8d3b990ba..71e668e45da 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -106,7 +106,7 @@ static final int CRC_LENGTH = 4; static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH; static final int ATTRIBUTE_LENGTH = 2; -static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +public static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; static final int LAST_OFFSET_DELTA_LENGTH = 4; static final int FIRST_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH; static final int FIRST_TIMESTAMP_LENGTH = 8; @@ -118,7 +118,7 @@ static final int PRODUCER_EPOCH_LENGTH = 2; static final int BASE_SEQUENCE_OFFSET = PRODUCER_EPOCH_OFFSET + PRODUCER_EPOCH_LENGTH; static final int BASE_SEQUENCE_LENGTH = 4; -static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH; +public static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH; static final int RECORDS_COUNT_LENGTH = 4; static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; public static final int RECORD_BATCH_OVERHEAD = RECORDS_OFFSET; diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 15750e9cd06..1beb2bdda37 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -74,15 +74,27 @@ private[kafka] object LogValidator extends Logging { private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = { if (isFromClient) { + if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { +val countFromOffsets = batch.lastOffset - batch.baseOffset + 1 +if (countFromOffsets <= 0) + throw new InvalidRecordException(s"Batch has an invalid offset range: [${batch.baseOffset}, ${batch.lastOffset}]") + +// v2 and above messages always have a non-null count +val count = batch.countOrNull +if (count <= 0) + throw new InvalidRecordException(s"Invalid reported count for record batch: $count") + +if (countFromOffsets != batch.countOrNull) + throw new InvalidRecordException(s"Inconsistent batch offset range [${batch.baseOffset}, ${batch.lastOffset}] " + +s"and count of records $count") + } + if (batch.hasProducerId && batch.baseSequence < 0) throw new InvalidRecordException(s"Invalid sequence number ${batch.baseSequence} in record batch " + s"with producerId ${batch.producerId}") if (batch.isControlBatch) throw new InvalidRecordException("Clients are not allowed to write control records") - - if (Option(batch.countOrNull).contains(0)) -throw new InvalidRecordException("Record batches must contain at least one record") } if (batch.isTransactional && toMagic < RecordBatch.MAGIC_VALUE_V2) diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 131152af430..04e89528b12 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -26,6 +26,7 @@ import org.apache.kafka.common.utils.Time import org.apache.kafka.test.TestUtils import org.junit.Assert._ import org.junit.Test +import org.scalatest.Assertions.intercept import scala.collection.JavaConverters._ @@ -147,6 +148,50 @@ class LogValidatorTest { compressed = true) } + @Test + def testInvalidOffsetRangeAndRecordCount(): Unit = { +// The batch to be written contains 3 records, so the correct lastOffsetDelta is 2 +validateRecordBatchWithCountOverrides(lastOffsetDelta = 2, count = 3) + +// Count and offset range are inconsistent or invalid +assertInvalidBatchCountOverrides(lastOffsetDelta = 0, count = 3) +assertIn
[jira] [Commented] (KAFKA-6111) Tests for KafkaZkClient
[ https://issues.apache.org/jira/browse/KAFKA-6111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16369862#comment-16369862 ] ASF GitHub Bot commented on KAFKA-6111: --- smurakozi opened a new pull request #4596: KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests URL: https://github.com/apache/kafka/pull/4596 New test cases and checks were added to cover most of the functionality in KafkaZkClient. The new tests found two issues: - deleteLogDirEventNotifications used wrong paths when it attempted to delete notifications - updateBrokerInfoInZk did not throw an exception if the update was not successful. These issues are also fixed in this PR. New tests were added, they discovered the issues mentioned above. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tests for KafkaZkClient > --- > > Key: KAFKA-6111 > URL: https://issues.apache.org/jira/browse/KAFKA-6111 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Sandor Murakozi >Priority: Major > Fix For: 1.2.0 > > > Some methods in KafkaZkClient have no tests at the moment and we need to fix > that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6493) Cannot use kafka.metrics.KafkaCSVMetricsReporter
[ https://issues.apache.org/jira/browse/KAFKA-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16369959#comment-16369959 ] SivaPrasad commented on KAFKA-6493: --- Hi, I face same issue. Also I am looking to enable only few CSV files instead of all. Is that possible ? Can you brief me on how to fix it using metrics-core. {color:#8eb021}*I found new version of metrics-core on [https://github.com/dropwizard/metrics].*{color} Cheers, Siva > Cannot use kafka.metrics.KafkaCSVMetricsReporter > > > Key: KAFKA-6493 > URL: https://issues.apache.org/jira/browse/KAFKA-6493 > Project: Kafka > Issue Type: Bug > Components: core, metrics >Affects Versions: 1.0.0 > Environment: $ uname -a > Linux enerian 4.14.0-3-amd64 #1 SMP Debian 4.14.12-2 (2018-01-06) x86_64 > GNU/Linux > $ java -version > openjdk version "1.8.0_151" > OpenJDK Runtime Environment (build 1.8.0_151-8u151-b12-1-b12) > OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode) >Reporter: okkez >Priority: Major > > I cannot use kafka.metrics.KafkaCSVMetricsReporter as following with Kafka > 1.0.0. > I added following configuration to server.properties: > {noformat} > kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter > kafka.metrics.polling.interval.secs=5 > kafka.csv.metrics.dir=/tmp/csv_metrics > kafka.csv.metrics.reporter.enabled=true > {noformat} > And I got following errors in logs/kafkaServer.out every 5 seconds: > {noformat} > java.io.IOException: Unable to create /tmp/csv_metrics/RequestsPerSec.csv > at > com.yammer.metrics.reporting.CsvReporter.createStreamForMetric(CsvReporter.java:141) > at > com.yammer.metrics.reporting.CsvReporter.getPrintStream(CsvReporter.java:257) > at > com.yammer.metrics.reporting.CsvReporter.access$000(CsvReporter.java:22) > at > com.yammer.metrics.reporting.CsvReporter$1.getStream(CsvReporter.java:156) > at > com.yammer.metrics.reporting.CsvReporter.processMeter(CsvReporter.java:173) > at > com.yammer.metrics.reporting.CsvReporter.processMeter(CsvReporter.java:22) > at com.yammer.metrics.core.Meter.processWith(Meter.java:131) > at com.yammer.metrics.reporting.CsvReporter.run(CsvReporter.java:163) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > I found > https://github.com/infusionsoft/yammer-metrics/blob/v2.2.0/metrics-core/src/main/java/com/yammer/metrics/reporting/CsvReporter.java#L136-L142. > This means that throw IOException when CSV files to store metrics exist. > I removed all CSV files and restart server, I got same error... > I found new version of metrics-core on https://github.com/dropwizard/metrics. > Kafka should use new version of metrics-core, I think. > The new version of metrics-core seems not to have a such problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6568) LogCleanerManager.doneDeleting() should check the partition state before deleting the in progress partition
[ https://issues.apache.org/jira/browse/KAFKA-6568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16370133#comment-16370133 ] ASF GitHub Bot commented on KAFKA-6568: --- ijuma closed pull request #4592: MINOR follow-up for KAFKA-6568 URL: https://github.com/apache/kafka/pull/4592 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index b23107be491..223c6119654 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -314,6 +314,8 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], case Some(LogCleaningAborted) => inProgress.put(topicPartition, LogCleaningPaused) pausedCleaningCond.signalAll() +case None => + throw new IllegalStateException(s"State for partition $topicPartition should exist.") case s => throw new IllegalStateException(s"In-progress partition $topicPartition cannot be in $s state.") } @@ -328,6 +330,8 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], case Some(LogCleaningAborted) => inProgress.put(topicPartition, LogCleaningPaused) pausedCleaningCond.signalAll() +case None => + throw new IllegalStateException(s"State for partition $topicPartition should exist.") case s => throw new IllegalStateException(s"In-progress partition $topicPartition cannot be in $s state.") } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 42a447a2b16..7455763f5b7 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -226,20 +226,10 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val cleanerManager: LogCleanerManager = createCleanerManager(log) val tp = new TopicPartition("log", 0) -try { - cleanerManager.doneCleaning(tp, log.dir, 1) -} catch { - case _ : IllegalStateException => - case _ : Throwable => fail("Should have thrown IllegalStateException.") -} - -try { - cleanerManager.setCleaningState(tp, LogCleaningPaused) - cleanerManager.doneCleaning(tp, log.dir, 1) -} catch { - case _ : IllegalStateException => - case _ : Throwable => fail("Should have thrown IllegalStateException.") -} +intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1)) + +cleanerManager.setCleaningState(tp, LogCleaningPaused) +intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1)) cleanerManager.setCleaningState(tp, LogCleaningInProgress) cleanerManager.doneCleaning(tp, log.dir, 1) @@ -260,20 +250,10 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val tp = new TopicPartition("log", 0) -try { - cleanerManager.doneDeleting(tp) -} catch { - case _ : IllegalStateException => - case _ : Throwable => fail("Should have thrown IllegalStateException.") -} - -try { - cleanerManager.setCleaningState(tp, LogCleaningPaused) - cleanerManager.doneDeleting(tp) -} catch { - case _ : IllegalStateException => - case _ : Throwable => fail("Should have thrown IllegalStateException.") -} +intercept[IllegalStateException](cleanerManager.doneDeleting(tp)) + +cleanerManager.setCleaningState(tp, LogCleaningPaused) +intercept[IllegalStateException](cleanerManager.doneDeleting(tp)) cleanerManager.setCleaningState(tp, LogCleaningInProgress) cleanerManager.doneDeleting(tp) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > LogCleanerManager.doneDeleting() should check the partition state before > deleting the in progress partition > --- > > Key: KAFKA-6568 > URL: https://issues.apache.org/jira/browse/KAFKA-6568 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Blocker >
[jira] [Resolved] (KAFKA-5624) Unsafe use of expired sensors
[ https://issues.apache.org/jira/browse/KAFKA-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-5624. Resolution: Fixed Fix Version/s: 2.0.0 > Unsafe use of expired sensors > - > > Key: KAFKA-5624 > URL: https://issues.apache.org/jira/browse/KAFKA-5624 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Manikumar >Priority: Major > Fix For: 2.0.0 > > > Seems a couple unhandled cases following sensor expiration: > 1. Static sensors (such as {{ClientQuotaManager.delayQueueSensor}}) can be > expired due to inactivity, but the references will remain valid and usable. > Probably a good idea to either ensure we use a "get or create" pattern when > accessing the sensor or add a new static registration option which makes the > sensor ineligible for expiration. > 2. It is possible to register metrics through the sensor even after it is > expired. We should probably raise an exception instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5624) Unsafe use of expired sensors
[ https://issues.apache.org/jira/browse/KAFKA-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16370258#comment-16370258 ] ASF GitHub Bot commented on KAFKA-5624: --- hachikuji closed pull request #4404: KAFKA-5624: Add expiry check to sensor.add() methods URL: https://github.com/apache/kafka/pull/4404 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index 837ac2e4b43..06c8c7f362b 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -207,9 +207,11 @@ public void checkQuotas(long timeMs) { /** * Register a compound statistic with this sensor with no config override + * @param stat The stat to register + * @return true if stat is added to sensor, false if sensor is expired */ -public void add(CompoundStat stat) { -add(stat, null); +public boolean add(CompoundStat stat) { +return add(stat, null); } /** @@ -217,8 +219,12 @@ public void add(CompoundStat stat) { * @param stat The stat to register * @param config The configuration for this stat. If null then the stat will use the default configuration for this *sensor. + * @return true if stat is added to sensor, false if sensor is expired */ -public synchronized void add(CompoundStat stat, MetricConfig config) { +public synchronized boolean add(CompoundStat stat, MetricConfig config) { +if (hasExpired()) +return false; + this.stats.add(Utils.notNull(stat)); Object lock = new Object(); for (NamedMeasurable m : stat.stats()) { @@ -226,15 +232,17 @@ public synchronized void add(CompoundStat stat, MetricConfig config) { this.registry.registerMetric(metric); this.metrics.add(metric); } +return true; } /** * Register a metric with this sensor * @param metricName The name of the metric * @param stat The statistic to keep + * @return true if metric is added to sensor, false if sensor is expired */ -public void add(MetricName metricName, MeasurableStat stat) { -add(metricName, stat, null); +public boolean add(MetricName metricName, MeasurableStat stat) { +return add(metricName, stat, null); } /** @@ -242,8 +250,12 @@ public void add(MetricName metricName, MeasurableStat stat) { * @param metricName The name of the metric * @param stat The statistic to keep * @param config A special configuration for this metric. If null use the sensor default configuration. + * @return true if metric is added to sensor, false if sensor is expired */ -public synchronized void add(MetricName metricName, MeasurableStat stat, MetricConfig config) { +public synchronized boolean add(MetricName metricName, MeasurableStat stat, MetricConfig config) { +if (hasExpired()) +return false; + KafkaMetric metric = new KafkaMetric(new Object(), Utils.notNull(metricName), Utils.notNull(stat), @@ -252,6 +264,7 @@ public synchronized void add(MetricName metricName, MeasurableStat stat, MetricC this.registry.registerMetric(metric); this.metrics.add(metric); this.stats.add(stat); +return true; } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 8bcc775df56..23fc5411b3c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -256,7 +256,7 @@ public void testCleanupMemoryAvailabilityOnMetricsException() throws Exception { mockedSensor.record(anyDouble(), anyLong()); expectLastCall().andThrow(new OutOfMemoryError()); expect(mockedMetrics.metricName(anyString(), eq(metricGroup), anyString())).andReturn(metricName); -mockedSensor.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName)); +expect(mockedSensor.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName))).andReturn(true); replay(mockedMetrics, mockedSensor, metricName); diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/c
[jira] [Resolved] (KAFKA-6554) Broker doesn't reject Produce request with inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-6554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6554. Resolution: Fixed Fix Version/s: 1.1.0 > Broker doesn't reject Produce request with inconsistent state > - > > Key: KAFKA-6554 > URL: https://issues.apache.org/jira/browse/KAFKA-6554 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.0.0 >Reporter: Simon Fell >Assignee: Jason Gustafson >Priority: Minor > Fix For: 1.1.0 > > Attachments: produce_v3.txt > > > Produce messages of type v3 have offset deltas in each record along with a > LastOffsetDelta for the topic/partition set. In investigating an issue with > missing offsets, I found a bug in a producer library where it would send > multiple records, but leave LastOffsetDelta at 0. This causes various > problems including holes in the offsets fetched by the consumer. > As lastOffsetDelta can be computed by looking at the records, it seems like > the broker should at least validate the LastOffsetDelta field against the > contained records to stop this bad data getting in. > I've attached a decode v3 produce message that was causing the problems, and > was accepted by the broker. > Here's a link to the issue in the kafka library we were using which has more > context if you need it. > https://github.com/Shopify/sarama/issues/1032 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6573) KafkaController.brokerInfo not updated on dynamic update
Rajini Sivaram created KAFKA-6573: - Summary: KafkaController.brokerInfo not updated on dynamic update Key: KAFKA-6573 URL: https://issues.apache.org/jira/browse/KAFKA-6573 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 1.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 1.1.0 KafkaController.brokerInfo is cached in-memory and used to re-register the broker in ZooKeeper if ZK session expires. It should be kept up-to-date if listeners are dynamically updated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16370505#comment-16370505 ] Guozhang Wang commented on KAFKA-5285: -- [~davispw][~xvrl] Thanks for your inputs. I think there are some mis-communications here about my proposal. I understand the issue that you raised, and hence proposing to always use the keyFrom only for lower range. For upper range I think Xavier's point is valid and I'd like to slightly modify my proposal accordingly. This time I'd try to propose them in a clearer way: 0. The following bullet points 1) and 2) are ONLY discussed for window stores, NOT session stores. 1. Note that within the [lowerRange, upperRange] iterating, we will still check the validity of each key within the range to determine whether or not if it should be returned. So the goal of setting lowerRange and upperRange is to make sure they are "safe" to include all possible keys. And from that point setting lowerRange as keyFrom should be safe, since lexicograpically all keys larger than keyFrom should have the its [key, timestamp, sequence] bytes representation larger than just [keyFrom] byte representation. Our current lowerRange, [keyFrom + ] is also safe but it is not "more efficient" than [keyFrom] since we know there will be not records at all in between these two lower bounds. On the other hand, the [keyFrom + timestampFrom] is not safe, as you pointed out: {code} The smallest byte sequence for that key will be keyFrom + S + minSuffix which is greater than keyFrom, but still smaller than keyFrom + minSuffix {code} As you mentioned, this key is still larger than {{keyFrom}} bytes. *I think {{keyFrom}} should be actually safe under all circumstances such that, any key larger than keyFrom should have its complete [key, timestamp, sequence] larger than the keyFrom bytes only, and hence they would all be included in the range for validation, please let me know if this is wrong.* 2. For the upper range, again the goal is to be "safe" first, and then see if we can do better on "efficiency". What I was proposing is MAX({{keyFrom + timestampFrom}}, {{keyTo + timestampTo}}), note it is NOT MAX({{keyFrom + timeEndEarliest}}, {{keyTo + timeStartLatest}}), as {{timeStartLatest}} and {{timeEndEarliest}} is only from session windows. So going back to your example, {{keyFrom + timestampFrom = A1000}}, {{keyTo + timestampTo = AABDFFF}}, so their larger value, {{AABDFFF}} will be selected as upper range. But Xavier's example exposed that {{AABDFFF}} is not a safe upper range either, since same records like {{AADFFF}} should be included in consideration but their bytes are larger than {{AABDFFF}}. *So I'm proposing to consider the following combinations, and pick the largest*: a). keyFrom + timestampTo b). keyTo + timestampTo Please let me know if you think this works. 3. This bullet point is for session store: for session store the current byte representation is {{key, session-start-time, session-end-time}}, where {{session-end-time >= session-start-time}}. Similarly to window store, for the lower range I'm proposing to use only {{key}} bytes, which is safe. For the upper range, similarly to my previous modifications on window stores, *we can consider the following combinations and choose the largest one*: a) keyFrom + timeStartLatest + MAX_LONG b) keyTo + timeStartLatest + MAX_LONG > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Guozhang Wang >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6574) Support Headers in console-consumer and console-producer
Vik Gamov created KAFKA-6574: Summary: Support Headers in console-consumer and console-producer Key: KAFKA-6574 URL: https://issues.apache.org/jira/browse/KAFKA-6574 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 1.0.0 Reporter: Vik Gamov Message headers were introduced in 1.0.0 but console producer and consumer don't support them, -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4651) Improve test coverage of Stores
[ https://issues.apache.org/jira/browse/KAFKA-4651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16370587#comment-16370587 ] ASF GitHub Bot commented on KAFKA-4651: --- guozhangwang closed pull request #4555: KAFKA-4651: improve test coverage of stores URL: https://github.com/apache/kafka/pull/4555 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index 77b1abbed20..c9267dc8fda 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; @@ -38,33 +37,6 @@ protected StateSerdes serdes; protected String topic; -// this is optimizing the case when this store is already a bytes store, in which we can avoid Bytes.wrap() costs -private static class RocksDBSessionBytesStore extends RocksDBSessionStore { -RocksDBSessionBytesStore(final SegmentedBytesStore inner) { -super(inner, Serdes.Bytes(), Serdes.ByteArray()); -} - -@Override -public KeyValueIterator, byte[]> findSessions(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { -final KeyValueIterator bytesIterator = bytesStore.fetch(key, earliestSessionEndTime, latestSessionStartTime); -return WrappedSessionStoreIterator.bytesIterator(bytesIterator, serdes); -} - -@Override -public void remove(final Windowed key) { -bytesStore.remove(SessionKeySerde.bytesToBinary(key)); -} - -@Override -public void put(final Windowed sessionKey, final byte[] aggregate) { -bytesStore.put(SessionKeySerde.bytesToBinary(sessionKey), aggregate); -} -} - -static RocksDBSessionStore bytesStore(final SegmentedBytesStore inner) { -return new RocksDBSessionBytesStore(inner); -} - RocksDBSessionStore(final SegmentedBytesStore bytesStore, final Serde keySerde, final Serde aggSerde) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index a2e45e00e56..f54c783aae1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -363,28 +363,6 @@ public void putAll(final List> entries) { return rocksDbIterator; } -public synchronized KeyValue first() { -validateStoreOpen(); - -final RocksIterator innerIter = db.newIterator(); -innerIter.seekToFirst(); -final KeyValue pair = new KeyValue<>(new Bytes(innerIter.key()), innerIter.value()); -innerIter.close(); - -return pair; -} - -public synchronized KeyValue last() { -validateStoreOpen(); - -final RocksIterator innerIter = db.newIterator(); -innerIter.seekToLast(); -final KeyValue pair = new KeyValue<>(new Bytes(innerIter.key()), innerIter.value()); -innerIter.close(); - -return pair; -} - /** * Return an approximate count of key-value mappings in this store. * diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 0583e916511..937b1d0b0bf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -30,12 +30,18 @@ import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; +import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import sta
[jira] [Resolved] (KAFKA-4651) Improve test coverage of Stores
[ https://issues.apache.org/jira/browse/KAFKA-4651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-4651. -- Resolution: Fixed Assignee: Bill Bejeck Fix Version/s: 1.2.0 > Improve test coverage of Stores > --- > > Key: KAFKA-4651 > URL: https://issues.apache.org/jira/browse/KAFKA-4651 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Bill Bejeck >Priority: Minor > Labels: newbie > Fix For: 1.2.0 > > > Some factory methods aren't tested -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5660) Don't throw TopologyBuilderException during runtime
[ https://issues.apache.org/jira/browse/KAFKA-5660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16370702#comment-16370702 ] ASF GitHub Bot commented on KAFKA-5660: --- nafshartous opened a new pull request #4605: KAFKA-5660 Don't throw TopologyBuilderException during runtime URL: https://github.com/apache/kafka/pull/4605 TopologyBuilderException is a pre-runtime exception that should only be thrown before KafkaStreams#start() is called. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Don't throw TopologyBuilderException during runtime > --- > > Key: KAFKA-5660 > URL: https://issues.apache.org/jira/browse/KAFKA-5660 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Matthias J. Sax >Assignee: Nick Afshartous >Priority: Major > > {{TopologyBuilderException}} is a pre-runtime exception that should only be > thrown before {{KafkaStreams#start()}} is called. > However, we do throw {{TopologyBuilderException}} within > - `SourceNodeFactory#getTopics` > - `ProcessorContextImpl#getStateStore` > - `StreamPartitionAssignor#prepareTopic ` > (and maybe somewhere else: we should double check if there are other places > in the code like those). > We should replace those exception with either {{StreamsException}} or with a > new exception type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-1457) Nginx Kafka Integration
[ https://issues.apache.org/jira/browse/KAFKA-1457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sönke Liebau resolved KAFKA-1457. - Resolution: Not A Problem I think this issue can safely be closed, there are numerous solutions out there for pushing log files into Kafka: * Kafka Connect Filesource * Filebeat * Logstash * Heka * Collectd We could actually consider closing this as "fixed" since with Kafka Connect one might argue that Kafka now offers a component for what [~darion] wants to do :) > Nginx Kafka Integration > --- > > Key: KAFKA-1457 > URL: https://issues.apache.org/jira/browse/KAFKA-1457 > Project: Kafka > Issue Type: Wish > Components: tools >Reporter: darion yaphet >Priority: Minor > Labels: DataSource > > Some use Kafka as log data collector and nginx is the web server . we need to > push nginx log into kafka but I don't know how to write the plugin > Would Apacha Kafka provide a nginx module to push nginx log into kafka ? > Thanks a lot -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-1689) automatic migration of log dirs to new locations
[ https://issues.apache.org/jira/browse/KAFKA-1689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sönke Liebau resolved KAFKA-1689. - Resolution: Fixed Fix Version/s: 1.1.0 The requested functionality is available as part of KAFKA-5163 > automatic migration of log dirs to new locations > > > Key: KAFKA-1689 > URL: https://issues.apache.org/jira/browse/KAFKA-1689 > Project: Kafka > Issue Type: New Feature > Components: config, core >Affects Versions: 0.8.1.1 >Reporter: Javier Alba >Priority: Minor > Labels: newbie++ > Fix For: 1.1.0 > > > There is no automated way in Kafka 0.8.1.1 to make a migration of log data if > we want to reconfigure our cluster nodes to use several data directories > where we have mounted new disks instead our original data directory. > For example, say we have our brokers configured with: > log.dirs = /tmp/kafka-logs > And we added 3 new disks and now we want our brokers to use them as log.dirs: > logs.dirs = /srv/data/1,/srv/data/2,/srv/data/3 > It would be great to have an automated way of doing such a migration, of > course without losing current data in the cluster. > It would be ideal to be able to do this migration without losing service. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6574) Support Headers in console-consumer and console-producer
[ https://issues.apache.org/jira/browse/KAFKA-6574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371048#comment-16371048 ] Nikita Salnikov-Tarnovski commented on KAFKA-6574: -- We were upgrading to Kafka 1.0 and have some troubles with our existing codebase. Inability to see message headers with console consumers were quite significant problem for us. Please, make it possible :) > Support Headers in console-consumer and console-producer > > > Key: KAFKA-6574 > URL: https://issues.apache.org/jira/browse/KAFKA-6574 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.0.0 >Reporter: Vik Gamov >Priority: Minor > > Message headers were introduced in 1.0.0 but console producer and consumer > don't support them, -- This message was sent by Atlassian JIRA (v7.6.3#76005)