This is an automated email from the ASF dual-hosted git repository.

jtuglu1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7426949b213 fix: thrownAway reason incorrectly defaulting to null for 
streaming tasks (#19540)
7426949b213 is described below

commit 7426949b2134770c3a37ec0c54b5293d7a57eb30
Author: jtuglu1 <[email protected]>
AuthorDate: Fri Jun 5 10:47:10 2026 -0700

    fix: thrownAway reason incorrectly defaulting to null for streaming tasks 
(#19540)
    
    Streaming ingestion tasks were incorrectly reporting thrown-away reason as 
null for filtered rows.
---
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |   4 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     |   2 +-
 .../seekablestream/SettableByteEntityReader.java   |   8 +-
 .../indexing/seekablestream/StreamChunkReader.java |  13 ++-
 .../seekablestream/StreamChunkReaderTest.java      | 125 +++++++++++++++++++++
 .../druid/segment/transform/Transformer.java       |  40 +++++--
 .../transform/TransformingInputEntityReader.java   |  27 ++++-
 .../druid/segment/transform/TransformerTest.java   |  91 +++++++++++++++
 8 files changed, 289 insertions(+), 21 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index bb40602be7b..5be3f60e6ae 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -1223,7 +1223,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     // Wait for task to exit
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
-    verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 
5)).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 
4).totalProcessed(1));
+    verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 
5)).thrownAwayByReason(InputRowFilterResult.CUSTOM_FILTER, 
4).totalProcessed(1));
 
     // Check published metadata
     final List<SegmentDescriptor> publishedDescriptors = 
publishedDescriptors();
@@ -3406,7 +3406,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     // Wait for task to exit
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
-    verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 
5)).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 
4).totalProcessed(1));
+    verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 
5)).thrownAwayByReason(InputRowFilterResult.CUSTOM_FILTER, 
4).totalProcessed(1));
 
     // Check published metadata
     final List<SegmentDescriptor> publishedDescriptors = 
publishedDescriptors();
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index f4ccfd4fc70..669e7136bfa 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -876,7 +876,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     verifyAll();
 
     verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSize(RECORDS, 0, 5))
-                                     
.thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 
4).totalProcessed(1));
+                                     
.thrownAwayByReason(InputRowFilterResult.CUSTOM_FILTER, 4).totalProcessed(1));
 
     // Check published metadata
     assertEqualsExceptVersion(ImmutableList.of(sdd("2009/P1D", 0)), 
publishedDescriptors());
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
index 2314d740842..883a798f24d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
@@ -28,7 +28,7 @@ import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.impl.ByteEntity;
 import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.segment.transform.Transformer;
 import org.apache.druid.segment.transform.TransformingInputEntityReader;
 
 import java.io.File;
@@ -46,16 +46,16 @@ class SettableByteEntityReader<T extends ByteEntity> 
implements InputEntityReade
   SettableByteEntityReader(
       InputFormat inputFormat,
       InputRowSchema inputRowSchema,
-      TransformSpec transformSpec,
+      Transformer transformer,
       File indexingTmpDir
   )
   {
     Preconditions.checkNotNull(inputFormat, "inputFormat");
     final InputFormat format = JsonInputFormat.withLineSplittable(inputFormat, 
false);
     this.entity = new SettableByteEntity<>();
-    this.delegate = new TransformingInputEntityReader(
+    this.delegate = TransformingInputEntityReader.withoutFilter(
         format.createReader(inputRowSchema, entity, indexingTmpDir),
-        transformSpec.toTransformer()
+        transformer
     );
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java
index a0ac1f01ea5..b7bad274822 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java
@@ -32,6 +32,7 @@ import 
org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.segment.transform.Transformer;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -64,13 +65,14 @@ class StreamChunkReader<RecordType extends ByteEntity>
   )
   {
     InvalidInput.notNull(inputFormat, "inputFormat");
+    final Transformer transformer = transformSpec.toTransformer();
     this.byteEntityReader = new SettableByteEntityReader<>(
         inputFormat,
         inputRowSchema,
-        transformSpec,
+        transformer,
         indexingTmpDir
     );
-    this.rowFilter = rowFilter;
+    this.rowFilter = transformer.hasFilter() ? 
withTransformFilter(transformer, rowFilter) : rowFilter;
     this.rowIngestionMeters = rowIngestionMeters;
     this.parseExceptionHandler = parseExceptionHandler;
   }
@@ -89,6 +91,13 @@ class StreamChunkReader<RecordType extends ByteEntity>
     this.parseExceptionHandler = parseExceptionHandler;
   }
 
+  private static InputRowFilter withTransformFilter(final Transformer 
transformer, final InputRowFilter rowFilter)
+  {
+    final InputRowFilter transformFilter = row ->
+        transformer.rowMatchesFilter(row) ? InputRowFilterResult.ACCEPTED : 
InputRowFilterResult.CUSTOM_FILTER;
+    return transformFilter.and(rowFilter);
+  }
+
   List<InputRow> parse(@Nullable List<RecordType> streamChunk, boolean 
isEndOfShard) throws IOException
   {
     if (streamChunk == null || streamChunk.isEmpty()) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkReaderTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkReaderTest.java
index 7e414ad506b..2b5bc49d1b8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkReaderTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkReaderTest.java
@@ -37,6 +37,10 @@ import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.query.filter.AndDimFilter;
+import org.apache.druid.query.filter.NotDimFilter;
+import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
@@ -158,6 +162,127 @@ public class StreamChunkReaderTest
     Assert.assertEquals(0, rowIngestionMeters.getThrownAway());
   }
 
+  @Test
+  public void testTransformSpecFilterIncrementsCustomFilterReason() throws 
IOException
+  {
+    final JsonInputFormat inputFormat = new JsonInputFormat(
+        JSONPathSpec.DEFAULT,
+        Collections.emptyMap(),
+        null,
+        null,
+        null
+    );
+    final TransformSpec transformSpec = new TransformSpec(
+        new AndDimFilter(
+            new SelectorDimFilter("column_a", "y", null),
+            new NotDimFilter(new SelectorDimFilter("column_b", "other", null))
+        ),
+        null
+    );
+    final StreamChunkReader<ByteEntity> chunkParser = new StreamChunkReader<>(
+        inputFormat,
+        new InputRowSchema(TimestampSpec.DEFAULT, DimensionsSpec.EMPTY, 
ColumnsFilter.all()),
+        transformSpec,
+        temporaryFolder.newFolder(),
+        InputRowFilter.allowAll(),
+        rowIngestionMeters,
+        parseExceptionHandler
+    );
+    final List<InputRow> parsedRows = chunkParser.parse(
+        Arrays.asList(
+            new ByteEntity(
+                "{\"timestamp\": \"2020-01-01\", \"column_a\": \"y\", 
\"column_b\": \"other\"}"
+                    .getBytes(StringUtils.UTF8_STRING)
+            ),
+            new ByteEntity(
+                "{\"timestamp\": \"2020-01-01\", \"column_a\": \"y\", 
\"column_b\": \"title1\"}"
+                    .getBytes(StringUtils.UTF8_STRING)
+            )
+        ),
+        false
+    );
+
+    Assert.assertEquals(1, parsedRows.size());
+    Assert.assertEquals("title1", 
Iterables.getOnlyElement(parsedRows.get(0).getDimension("column_b")));
+    Assert.assertEquals(1, rowIngestionMeters.getThrownAway());
+
+    final Map<String, Long> thrownAwayByReason = 
rowIngestionMeters.getThrownAwayByReason();
+    Assert.assertEquals(Long.valueOf(1), 
thrownAwayByReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason()));
+    
Assert.assertFalse(thrownAwayByReason.containsKey(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason()));
+  }
+
+  @Test
+  public void testTransformSpecFilterPreservesOtherRejectionReasons() throws 
IOException
+  {
+    final JsonInputFormat inputFormat = new JsonInputFormat(
+        JSONPathSpec.DEFAULT,
+        Collections.emptyMap(),
+        null,
+        null,
+        null
+    );
+    final TransformSpec transformSpec = new TransformSpec(
+        new AndDimFilter(
+            new SelectorDimFilter("column_a", "y", null),
+            new NotDimFilter(new SelectorDimFilter("column_b", "other", null))
+        ),
+        null
+    );
+    final InputRowFilter rowFilter = row -> {
+      if (row == null) {
+        return InputRowFilterResult.NULL_OR_EMPTY_RECORD;
+      } else if ("late".equals(row.getRaw("column_b"))) {
+        return InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME;
+      } else if ("early".equals(row.getRaw("column_b"))) {
+        return InputRowFilterResult.AFTER_MAX_MESSAGE_TIME;
+      }
+      return InputRowFilterResult.ACCEPTED;
+    };
+    final StreamChunkReader<ByteEntity> chunkParser = new StreamChunkReader<>(
+        inputFormat,
+        new InputRowSchema(TimestampSpec.DEFAULT, DimensionsSpec.EMPTY, 
ColumnsFilter.all()),
+        transformSpec,
+        temporaryFolder.newFolder(),
+        rowFilter,
+        rowIngestionMeters,
+        parseExceptionHandler
+    );
+
+    chunkParser.parse(ImmutableList.of(), false);
+    final List<InputRow> parsedRows = chunkParser.parse(
+        Arrays.asList(
+            new ByteEntity(
+                "{\"timestamp\": \"2020-01-01\", \"column_a\": \"y\", 
\"column_b\": \"other\"}"
+                    .getBytes(StringUtils.UTF8_STRING)
+            ),
+            new ByteEntity(
+                "{\"timestamp\": \"2020-01-01\", \"column_a\": \"y\", 
\"column_b\": \"late\"}"
+                    .getBytes(StringUtils.UTF8_STRING)
+            ),
+            new ByteEntity(
+                "{\"timestamp\": \"2020-01-01\", \"column_a\": \"y\", 
\"column_b\": \"early\"}"
+                    .getBytes(StringUtils.UTF8_STRING)
+            ),
+            new ByteEntity(
+                "{\"timestamp\": \"2020-01-01\", \"column_a\": \"y\", 
\"column_b\": \"title1\"}"
+                    .getBytes(StringUtils.UTF8_STRING)
+            )
+        ),
+        false
+    );
+
+    Assert.assertEquals(1, parsedRows.size());
+    Assert.assertEquals("title1", 
Iterables.getOnlyElement(parsedRows.get(0).getDimension("column_b")));
+    Assert.assertEquals(4, rowIngestionMeters.getThrownAway());
+
+    final Map<String, Long> thrownAwayByReason = 
rowIngestionMeters.getThrownAwayByReason();
+    Assert.assertEquals(Long.valueOf(1), 
thrownAwayByReason.get(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason()));
+    Assert.assertEquals(Long.valueOf(1), 
thrownAwayByReason.get(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason()));
+    Assert.assertEquals(Long.valueOf(1), 
thrownAwayByReason.get(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.getReason()));
+    Assert.assertEquals(Long.valueOf(1), 
thrownAwayByReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason()));
+    
Assert.assertFalse(thrownAwayByReason.containsKey(InputRowFilterResult.UNKNOWN.getReason()));
+  }
+
   @Test
   public void 
testParseMalformedDataWithAllowedParseExceptions_thenNoException() throws 
IOException
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java 
b/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java
index 2ff263a6473..04186bd7014 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java
@@ -72,26 +72,46 @@ public class Transformer
   @Nullable
   public InputRow transform(@Nullable final InputRow row)
   {
-    if (row == null) {
+    final InputRow transformedRow = transformWithoutFilter(row);
+    if (!rowMatchesFilter(transformedRow)) {
       return null;
     }
+    return transformedRow;
+  }
 
-    final InputRow transformedRow;
+  @Nullable
+  public InputRow transformWithoutFilter(@Nullable final InputRow row)
+  {
+    if (row == null) {
+      return null;
+    }
 
     if (transforms.isEmpty()) {
-      transformedRow = row;
+      return row;
     } else {
-      transformedRow = new TransformedInputRow(row, transforms);
+      return new TransformedInputRow(row, transforms);
     }
+  }
 
-    if (valueMatcher != null) {
-      rowSupplierForValueMatcher.set(transformedRow);
-      if (!valueMatcher.matches(false)) {
-        return null;
-      }
+  /**
+   * Returns true if this transformer has a {@link TransformSpec} filter to 
apply via {@link #rowMatchesFilter}.
+   */
+  public boolean hasFilter()
+  {
+    return valueMatcher != null;
+  }
+
+  /**
+   * Applies the {@link TransformSpec} filter to a row that has already had 
transforms applied.
+   */
+  public boolean rowMatchesFilter(@Nullable final InputRow transformedRow)
+  {
+    if (transformedRow == null || valueMatcher == null) {
+      return true;
     }
 
-    return transformedRow;
+    rowSupplierForValueMatcher.set(transformedRow);
+    return valueMatcher.matches(false);
   }
 
   @Nullable
diff --git 
a/processing/src/main/java/org/apache/druid/segment/transform/TransformingInputEntityReader.java
 
b/processing/src/main/java/org/apache/druid/segment/transform/TransformingInputEntityReader.java
index 33bed465869..945168cd5dd 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/transform/TransformingInputEntityReader.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/transform/TransformingInputEntityReader.java
@@ -30,17 +30,40 @@ public class TransformingInputEntityReader implements 
InputEntityReader
 {
   private final InputEntityReader delegate;
   private final Transformer transformer;
+  private final boolean applyFilter;
 
-  public TransformingInputEntityReader(InputEntityReader delegate, Transformer 
transformer)
+  public TransformingInputEntityReader(final InputEntityReader delegate, final 
Transformer transformer)
+  {
+    this(delegate, transformer, true);
+  }
+
+  public static TransformingInputEntityReader withoutFilter(
+      final InputEntityReader delegate,
+      final Transformer transformer
+  )
+  {
+    return new TransformingInputEntityReader(delegate, transformer, false);
+  }
+
+  private TransformingInputEntityReader(
+      final InputEntityReader delegate,
+      final Transformer transformer,
+      final boolean applyFilter
+  )
   {
     this.delegate = delegate;
     this.transformer = transformer;
+    this.applyFilter = applyFilter;
   }
 
   @Override
   public CloseableIterator<InputRow> read() throws IOException
   {
-    return delegate.read().map(transformer::transform);
+    if (applyFilter) {
+      return delegate.read().map(transformer::transform);
+    } else {
+      return delegate.read().map(transformer::transformWithoutFilter);
+    }
   }
 
   @Override
diff --git 
a/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java
 
b/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java
index 099c9184514..1f8ec37ddf6 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java
@@ -21,13 +21,16 @@ package org.apache.druid.segment.transform;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowListPlusRawValues;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.MapBasedRow;
 import org.apache.druid.data.input.Row;
 import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.java.util.common.CloseableIterators;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.filter.SelectorDimFilter;
@@ -40,6 +43,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import javax.annotation.Nullable;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -58,6 +62,62 @@ public class TransformerTest extends 
InitializedNullHandlingTest
     Assert.assertNull(transformer.transform((InputRowListPlusRawValues) null));
   }
 
+  @Test
+  public void testTransformWithoutFilter()
+  {
+    final Transformer transformer = new Transformer(
+        new TransformSpec(new SelectorDimFilter("dim", "keep", null), null)
+    );
+    final InputRow keepRow = makeRow("keep");
+    final InputRow dropRow = makeRow("drop");
+
+    Assert.assertSame(keepRow, transformer.transformWithoutFilter(keepRow));
+    Assert.assertSame(dropRow, transformer.transformWithoutFilter(dropRow));
+    Assert.assertNull(transformer.transformWithoutFilter(null));
+
+    Assert.assertTrue(transformer.hasFilter());
+    Assert.assertTrue(transformer.rowMatchesFilter(keepRow));
+    Assert.assertFalse(transformer.rowMatchesFilter(dropRow));
+    Assert.assertTrue(transformer.rowMatchesFilter(null));
+  }
+
+  @Test
+  public void testTransformingInputEntityReaderCanSkipFilter() throws 
IOException
+  {
+    final Transformer transformer = new Transformer(
+        new TransformSpec(new SelectorDimFilter("dim", "keep", null), null)
+    );
+    final InputRow dropRow = makeRow("drop");
+    final InputRow keepRow = makeRow("keep");
+    final TransformingInputEntityReader reader = 
TransformingInputEntityReader.withoutFilter(
+        new TestInputEntityReader(dropRow, keepRow),
+        transformer
+    );
+
+    try (final CloseableIterator<InputRow> iterator = reader.read()) {
+      Assert.assertSame(dropRow, iterator.next());
+      Assert.assertSame(keepRow, iterator.next());
+      Assert.assertFalse(iterator.hasNext());
+    }
+  }
+
+  @Test
+  public void 
testTransformingInputEntityReaderReturnsNullForFilteredRowsByDefault() throws 
IOException
+  {
+    final Transformer transformer = new Transformer(
+        new TransformSpec(new SelectorDimFilter("dim", "keep", null), null)
+    );
+    final TransformingInputEntityReader reader = new 
TransformingInputEntityReader(
+        new TestInputEntityReader(makeRow("drop")),
+        transformer
+    );
+
+    try (final CloseableIterator<InputRow> iterator = reader.read()) {
+      Assert.assertNull(iterator.next());
+      Assert.assertFalse(iterator.hasNext());
+    }
+  }
+
   @Test
   public void testTransformTimeColumn()
   {
@@ -579,4 +639,35 @@ public class TransformerTest extends 
InitializedNullHandlingTest
         () -> new ExpressionTransform("__time", "now() + 1000", 
TestExprMacroTable.INSTANCE)
     );
   }
+
+  private static InputRow makeRow(final String dim)
+  {
+    return new MapBasedInputRow(
+        DateTimes.nowUtc(),
+        ImmutableList.of("dim"),
+        ImmutableMap.of("dim", dim)
+    );
+  }
+
+  private static class TestInputEntityReader implements InputEntityReader
+  {
+    private final List<InputRow> rows;
+
+    private TestInputEntityReader(final InputRow... rows)
+    {
+      this.rows = Arrays.asList(rows);
+    }
+
+    @Override
+    public CloseableIterator<InputRow> read()
+    {
+      return CloseableIterators.withEmptyBaggage(rows.iterator());
+    }
+
+    @Override
+    public CloseableIterator<InputRowListPlusRawValues> sample()
+    {
+      return 
CloseableIterators.withEmptyBaggage(ImmutableList.<InputRowListPlusRawValues>of().iterator());
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to