vvcephei commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r674847014
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +98,62 @@ public void init(final
org.apache.kafka.streams.processor.ProcessorContext conte
}
@Override
- public void process(final K key, final V value) {
+ public void process(final Record<KIn, VIn> record) {
// if the key is null, then ignore the record
- if (key == null) {
- LOG.warn(
- "Skipping record due to null key. topic=[{}]
partition=[{}] offset=[{}]",
- context().topic(), context().partition(),
context().offset()
- );
+ if (record.key() == null) {
+ if (context.recordMetadata().isPresent()) {
+ final RecordMetadata recordMetadata =
context.recordMetadata().get();
+ LOG.warn(
+ "Skipping record due to null key. "
+ + "value=[{}] topic=[{}] partition=[{}]
offset=[{}]",
+ record.value(),
+ recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset()
+ );
+ } else {
+ LOG.warn(
+ "Skipping record due to null key. "
+ + "value=[{}]. Topic, partition, and offset not
known.",
+ record.value()
+ );
+ }
droppedRecordsSensor.record();
return;
}
if (queryableName != null) {
- final ValueAndTimestamp<V> oldValueAndTimestamp =
store.get(key);
- final V oldValue;
+ final ValueAndTimestamp<VIn> oldValueAndTimestamp =
store.get(record.key());
+ final VIn oldValue;
if (oldValueAndTimestamp != null) {
oldValue = oldValueAndTimestamp.value();
- if (context().timestamp() <
oldValueAndTimestamp.timestamp()) {
- LOG.warn("Detected out-of-order KTable update for {}
at offset {}, partition {}.",
- store.name(), context().offset(),
context().partition());
+ if (record.timestamp() < oldValueAndTimestamp.timestamp())
{
+ if (context.recordMetadata().isPresent()) {
+ final RecordMetadata recordMetadata =
context.recordMetadata().get();
+ LOG.warn(
+ "Detected out-of-order KTable update for {}, "
+ + "old timestamp=[{}] new timestamp=[{}]. "
+ + "value=[{}] topic=[{}] partition=[{}]
offset=[{}].",
Review comment:
also here
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +98,62 @@ public void init(final
org.apache.kafka.streams.processor.ProcessorContext conte
}
@Override
- public void process(final K key, final V value) {
+ public void process(final Record<KIn, VIn> record) {
// if the key is null, then ignore the record
- if (key == null) {
- LOG.warn(
- "Skipping record due to null key. topic=[{}]
partition=[{}] offset=[{}]",
- context().topic(), context().partition(),
context().offset()
- );
+ if (record.key() == null) {
+ if (context.recordMetadata().isPresent()) {
+ final RecordMetadata recordMetadata =
context.recordMetadata().get();
+ LOG.warn(
+ "Skipping record due to null key. "
+ + "value=[{}] topic=[{}] partition=[{}]
offset=[{}]",
Review comment:
Oh, I'm sorry, but it looks like we need one more revision. Useful as it
would be at times, we can't log any data (keys, values, or headers) because it
might leak sensitive information into the logs.
##########
File path:
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
##########
@@ -156,7 +156,7 @@ public void kTableShouldLogAndMeterOnSkippedRecords() {
.filter(e -> e.getLevel().equals("WARN"))
.map(Event::getMessage)
.collect(Collectors.toList()),
- hasItem("Skipping record due to null key. topic=[topic]
partition=[0] offset=[0]")
+ hasItem("Skipping record due to null key. value=[value]
topic=[topic] partition=[0] offset=[0]")
Review comment:
I probably don't need to point this out, but this will have to change
back when you remove the value from the production code.
On another note, I guess we could add a test for the other (new) code path
when the metadata is absent. I'll leave it up to you.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -704,7 +696,7 @@ public void validateCopartition() {
private void validateGlobalStoreArguments(final String sourceName,
final String topic,
final String processorName,
- final ProcessorSupplier<?, ?,
Void, Void> stateUpdateSupplier,
+ final ProcessorSupplier<?, ?, ?,
?> stateUpdateSupplier,
Review comment:
This should be able to roll back as well, right?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +98,62 @@ public void init(final
org.apache.kafka.streams.processor.ProcessorContext conte
}
@Override
- public void process(final K key, final V value) {
+ public void process(final Record<KIn, VIn> record) {
// if the key is null, then ignore the record
- if (key == null) {
- LOG.warn(
- "Skipping record due to null key. topic=[{}]
partition=[{}] offset=[{}]",
- context().topic(), context().partition(),
context().offset()
- );
+ if (record.key() == null) {
+ if (context.recordMetadata().isPresent()) {
+ final RecordMetadata recordMetadata =
context.recordMetadata().get();
+ LOG.warn(
+ "Skipping record due to null key. "
+ + "value=[{}] topic=[{}] partition=[{}]
offset=[{}]",
+ record.value(),
+ recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset()
+ );
+ } else {
+ LOG.warn(
+ "Skipping record due to null key. "
+ + "value=[{}]. Topic, partition, and offset not
known.",
+ record.value()
+ );
+ }
droppedRecordsSensor.record();
return;
}
if (queryableName != null) {
- final ValueAndTimestamp<V> oldValueAndTimestamp =
store.get(key);
- final V oldValue;
+ final ValueAndTimestamp<VIn> oldValueAndTimestamp =
store.get(record.key());
+ final VIn oldValue;
if (oldValueAndTimestamp != null) {
oldValue = oldValueAndTimestamp.value();
- if (context().timestamp() <
oldValueAndTimestamp.timestamp()) {
- LOG.warn("Detected out-of-order KTable update for {}
at offset {}, partition {}.",
- store.name(), context().offset(),
context().partition());
+ if (record.timestamp() < oldValueAndTimestamp.timestamp())
{
+ if (context.recordMetadata().isPresent()) {
+ final RecordMetadata recordMetadata =
context.recordMetadata().get();
+ LOG.warn(
+ "Detected out-of-order KTable update for {}, "
+ + "old timestamp=[{}] new timestamp=[{}]. "
+ + "value=[{}] topic=[{}] partition=[{}]
offset=[{}].",
+ store.name(),
+ oldValueAndTimestamp.timestamp(),
record.timestamp(),
+ record.value(),
+ recordMetadata.topic(),
recordMetadata.offset(), recordMetadata.partition()
+ );
+ } else {
+ LOG.warn(
+ "Detected out-of-order KTable update for {}, "
+ + "old timestamp=[{}] new timestamp=[{}]. "
+ + "value=[{}]. Topic, partition and offset
not known.",
Review comment:
and here
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##########
@@ -90,6 +92,13 @@ public String newStoreName(final String prefix) {
"store-"
);
+ final ProcessorSupplier<Object, Object, Void, Void> processorSupplier
= () ->
+ new ContextualProcessor<Object, Object, Void, Void>() {
+ @Override
+ public void process(final Record<Object, Object> record) {
+ }
Review comment:
Huh, I'm surprised this works; I would have expected that the processor
has to put the record in the store. If the test does still pass this way, it
might reveal that the test is actually not evaluating anything.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]