github-advanced-security[bot] commented on code in PR #19379:
URL: https://github.com/apache/druid/pull/19379#discussion_r3145112307


##########
processing/src/test/java/org/apache/druid/segment/transform/ScanTransformTest.java:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.transform;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ScanTransformTest extends InitializedNullHandlingTest
+{
+  private static final long TIMESTAMP = DateTimes.of("2024-01-01").getMillis();
+
+  private static InputRow makeRow(Object... kvPairs)
+  {
+    final java.util.LinkedHashMap<String, Object> event = new 
java.util.LinkedHashMap<>();
+    final List<String> dimensions = new ArrayList<>();
+    for (int i = 0; i < kvPairs.length; i += 2) {
+      final String key = (String) kvPairs[i];
+      event.put(key, kvPairs[i + 1]);

Review Comment:
   ## CodeQL / Array index out of bounds
   
   This array access might be out of bounds, as the index might be equal to the 
array length.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/11140)



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaScanTransformTest.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.transform.ScanTransform;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.joda.time.Period;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Verifies ScanTransform unnests array columns during Kafka ingestion.
+ * Uses two scan transforms to unnest both "tags" (string array) and 
"services" (object array)
+ * into a single datasource, producing a cross join of tag x service for each 
input row.
+ */
+public class KafkaScanTransformTest extends EmbeddedClusterTestBase
+{
+  // alice: 2 tags x 2 services = 4, bob: 1 tag x 3 services = 3 = 7 unnested 
rows
+  // carol (null arrays) and dave (missing columns) each produce 1 passthrough 
row = 2
+  // total: 9
+  private static final int EXPECTED_ROWS = 9;
+
+  private final KafkaResource kafka = new KafkaResource();
+  private final EmbeddedBroker broker = new EmbeddedBroker();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+
+  private String topic;
+
+  @Override
+  public EmbeddedDruidCluster createCluster()
+  {
+    coordinator.addProperty("druid.manager.segments.useIncrementalCache", 
"always");
+
+    indexer.setServerMemory(300_000_000)
+           .addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
+           .addProperty("druid.processing.numThreads", "2")
+           .addProperty("druid.worker.capacity", "2");
+
+    return EmbeddedDruidCluster
+        .withEmbeddedDerbyAndZookeeper()
+        .addExtension(KafkaIndexTaskModule.class)
+        .addResource(kafka)
+        .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
+        .useLatchableEmitter()
+        .useDefaultTimeoutForLatchableEmitter(30)
+        .addServer(coordinator)
+        .addServer(overlord)
+        .addServer(broker)
+        .addServer(indexer);
+  }
+
+  @Override
+  protected void refreshDatasourceName()
+  {
+    // Do not refresh — datasource is set once in setupAll
+  }
+
+  @BeforeAll
+  void setupAll() throws JsonProcessingException
+  {
+    topic = EmbeddedClusterApis.createTestDatasourceName();
+    kafka.createTopicWithPartitions(topic, 1);
+
+    super.refreshDatasourceName();
+    submitSupervisor();
+    publishTestData();
+
+    indexer.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("ingest/events/processed")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(EXPECTED_ROWS)
+    );
+  }
+
+  private void submitSupervisor()
+  {
+    final TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(
+            new ScanTransform(
+                "tag",
+                new ExpressionVirtualColumn("tag", "\"tags\"", 
ColumnType.STRING, ExprMacroTable.nil()),
+                null
+            ),
+            new ScanTransform(
+                "svc",
+                new ExpressionVirtualColumn("svc", "\"services\"", 
ColumnType.NESTED_DATA, ExprMacroTable.nil()),
+                null
+            )
+        )
+    );
+
+    final KafkaSupervisorSpec spec = new KafkaSupervisorSpecBuilder()
+        .withDataSchema(
+            schema -> schema
+                .withTimestamp(new TimestampSpec("__time", "auto", null))
+                .withGranularity(new UniformGranularitySpec(Granularities.DAY, 
null, null))
+                
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
+                .withTransform(transformSpec)
+        )
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withJsonInputFormat()
+                .withTaskCount(1)
+                .withTaskDuration(Period.hours(1))
+                .withConsumerProperties(kafka.consumerProperties())
+                .withStartDelay(Period.millis(10))
+                .withSupervisorRunPeriod(Period.millis(500))
+                .withUseEarliestSequenceNumber(true)
+                .withCompletionTimeout(Period.seconds(5))
+        )
+        .build(dataSource, topic);
+
+    Assertions.assertEquals(
+        dataSource,
+        cluster.callApi().postSupervisor(spec)
+    );
+  }
+
+  private void publishTestData() throws JsonProcessingException
+  {
+    // alice: 2 tags x 2 services = 4 rows
+    // bob: 1 tag x 3 services = 3 rows
+    // carol: null tags x null services => 1 passthrough row
+    // dave: missing tags & services columns => 1 passthrough row
+    // total: 9 rows
+    final List<Map<String, Object>> records = new ArrayList<>();
+    records.add(Map.of(
+        "__time", "2024-01-01T00:00:00Z",
+        "user", "alice",
+        "tags", List.of("sports", "news"),
+        "services", List.of(
+            Map.of("type", "web", "dc", "us-east1"),
+            Map.of("type", "api", "dc", "us-west2")
+        )
+    ));
+    records.add(Map.of(
+        "__time", "2024-01-01T00:01:00Z",
+        "user", "bob",
+        "tags", List.of("music"),
+        "services", List.of(
+            Map.of("type", "cdn", "dc", "eu-west1"),
+            Map.of("type", "cache", "dc", "eu-west1"),
+            Map.of("type", "db", "dc", "us-east1")
+        )
+    ));
+
+    // carol: explicit null values for both array columns
+    final HashMap<String, Object> carolRecord = new HashMap<>();
+    carolRecord.put("__time", "2024-01-01T00:02:00Z");
+    carolRecord.put("user", "carol");
+    carolRecord.put("tags", null);
+    carolRecord.put("services", null);
+    records.add(carolRecord);
+
+    // dave: columns not present at all
+    records.add(Map.of(
+        "__time", "2024-01-01T00:03:00Z",
+        "user", "dave"
+    ));
+
+    final List<byte[]> recordBytes = new ArrayList<>();
+    for (Map<String, Object> record : records) {
+      recordBytes.add(TestHelper.JSON_MAPPER.writeValueAsBytes(record));
+    }
+    kafka.publishRecordsToTopic(topic, recordBytes);
+  }
+
+  @Test
+  @Timeout(60)
+  public void test_countRows()
+  {
+    final long count = Long.parseLong(cluster.runSql(
+        StringUtils.format("SELECT COUNT(*) FROM \"%s\"", dataSource)
+    ).trim());

Review Comment:
   ## CodeQL / Missing catch of NumberFormatException
   
   Potential uncaught 'java.lang.NumberFormatException'.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/11141)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to