This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a1615105d7a refactor: migrate influx-extensions to InputFormat, remove 
InfluxParser/InfluxParseSpec (#19229)
a1615105d7a is described below

commit a1615105d7ad7e14b8f1e445707eb271df87a901
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Mar 30 19:49:25 2026 -0700

    refactor: migrate influx-extensions to InputFormat, remove 
InfluxParser/InfluxParseSpec (#19229)
---
 docs/development/extensions-contrib/influx.md      |  41 +--
 extensions-contrib/influx-extensions/pom.xml       |   5 +
 .../data/input/influx/InfluxExtensionsModule.java  |   2 +-
 .../druid/data/input/influx/InfluxInputFormat.java | 101 ++++++++
 ...uxParser.java => InfluxLineProtocolReader.java} |  85 +++---
 .../druid/data/input/influx/InfluxParseSpec.java   |  64 -----
 .../data/input/influx/InfluxInputFormatTest.java   | 285 +++++++++++++++++++++
 .../druid/data/input/influx/InfluxParserTest.java  | 231 -----------------
 8 files changed, 466 insertions(+), 348 deletions(-)

diff --git a/docs/development/extensions-contrib/influx.md 
b/docs/development/extensions-contrib/influx.md
index eec9fb555ec..f40db9973fb 100644
--- a/docs/development/extensions-contrib/influx.md
+++ b/docs/development/extensions-contrib/influx.md
@@ -40,28 +40,29 @@ which contains four parts:
   - measurements: one or more key-value pairs; values can be numeric, boolean, 
or string
   - timestamp: nanoseconds since Unix epoch (the parser truncates it to 
milliseconds)
 
-The parser extracts these fields into a map, giving the measurement the key 
`measurement` and the timestamp the key `_ts`. The tag and measurement keys are 
copied verbatim, so users should take care to avoid name collisions. It is up 
to the ingestion spec to decide which fields should be treated as dimensions 
and which should be treated as metrics (typically tags correspond to dimensions 
and measurements correspond to metrics).
+The input format extracts these fields into a map, giving the measurement the 
key `measurement` and the timestamp the key `__ts`. The tag and measurement 
keys are copied verbatim, so users should take care to avoid name collisions. 
It is up to the ingestion spec to decide which fields should be treated as 
dimensions and which should be treated as metrics (typically tags correspond to 
dimensions and measurements correspond to metrics).
 
-The parser is configured like so:
+The input format is configured like so:
 
 ```json
-"parser": {
-      "type": "string",
-      "parseSpec": {
-        "format": "influx",
-        "timestampSpec": {
-          "column": "__ts",
-          "format": "millis"
-        },
-        "dimensionsSpec": {
-          "dimensionExclusions": [
-            "__ts"
-          ]
-        },
-        "whitelistMeasurements": [
-          "cpu"
-        ]
-      }
+"inputFormat": {
+  "type": "influx",
+  "whitelistMeasurements": [
+    "cpu"
+  ]
+}
 ```
 
-The `whitelistMeasurements` field is an optional list of strings. If present, 
measurements that do not match one of the strings in the list will be ignored.
+|Field|Type|Description|Required|
+|-----|----|-----------|--------|
+|`type`|String|Must be `influx`.|yes|
+|`whitelistMeasurements`|List of String|If present, measurements that do not 
match one of the strings in the list will be ignored.|no|
+
+When using the `influx` input format, the timestamp spec should be configured 
with column `__ts` and format `millis`:
+
+```json
+"timestampSpec": {
+  "column": "__ts",
+  "format": "millis"
+}
+```
\ No newline at end of file
diff --git a/extensions-contrib/influx-extensions/pom.xml 
b/extensions-contrib/influx-extensions/pom.xml
index b28e0c072d0..a9917865999 100644
--- a/extensions-contrib/influx-extensions/pom.xml
+++ b/extensions-contrib/influx-extensions/pom.xml
@@ -82,6 +82,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>nl.jqno.equalsverifier</groupId>
+      <artifactId>equalsverifier</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest</artifactId>
diff --git 
a/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxExtensionsModule.java
 
b/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxExtensionsModule.java
index da0eba6a4b6..8559cb5c447 100644
--- 
a/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxExtensionsModule.java
+++ 
b/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxExtensionsModule.java
@@ -40,7 +40,7 @@ public class InfluxExtensionsModule implements DruidModule
     return Collections.singletonList(
         new SimpleModule("InfluxInputRowParserModule")
             .registerSubtypes(
-                new NamedType(InfluxParseSpec.class, "influx")
+                new NamedType(InfluxInputFormat.class, 
InfluxInputFormat.TYPE_KEY)
             )
     );
   }
diff --git 
a/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxInputFormat.java
 
b/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxInputFormat.java
new file mode 100644
index 00000000000..c91a9c88db6
--- /dev/null
+++ 
b/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxInputFormat.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.influx;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.utils.CompressionUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+public class InfluxInputFormat implements InputFormat
+{
+  public static final String TYPE_KEY = "influx";
+
+  @Nullable
+  private final List<String> whitelistMeasurements;
+
+  @JsonCreator
+  public InfluxInputFormat(
+      @JsonProperty("whitelistMeasurements") @Nullable List<String> 
whitelistMeasurements
+  )
+  {
+    this.whitelistMeasurements = whitelistMeasurements;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public List<String> getWhitelistMeasurements()
+  {
+    return whitelistMeasurements;
+  }
+
+  @Override
+  public boolean isSplittable()
+  {
+    return false;
+  }
+
+  @Override
+  public InputEntityReader createReader(InputRowSchema inputRowSchema, 
InputEntity source, File temporaryDirectory)
+  {
+    Set<String> whitelist = null;
+    if (whitelistMeasurements != null && !whitelistMeasurements.isEmpty()) {
+      whitelist = new HashSet<>(whitelistMeasurements);
+    }
+    return new InfluxLineProtocolReader(inputRowSchema, source, whitelist);
+  }
+
+  @Override
+  public long getWeightedSize(String path, long size)
+  {
+    return size * 
CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path));
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    InfluxInputFormat that = (InfluxInputFormat) o;
+    return Objects.equals(whitelistMeasurements, that.whitelistMeasurements);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(whitelistMeasurements);
+  }
+}
diff --git 
a/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java
 
b/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxLineProtocolReader.java
similarity index 67%
rename from 
extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java
rename to 
extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxLineProtocolReader.java
index a1520668613..16d8770cf6a 100644
--- 
a/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java
+++ 
b/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxLineProtocolReader.java
@@ -19,44 +19,78 @@
 
 package org.apache.druid.data.input.influx;
 
-import com.google.common.collect.ImmutableList;
 import org.antlr.v4.runtime.ANTLRInputStream;
 import org.antlr.v4.runtime.CharStream;
 import org.antlr.v4.runtime.CommonTokenStream;
 import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.TextReader;
+import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.java.util.common.parsers.Parser;
 
 import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
-public class InfluxParser implements Parser<String, Object>
+public class InfluxLineProtocolReader extends TextReader.Strings
 {
-  public static final String TIMESTAMP_KEY = "__ts";
-  private static final String MEASUREMENT_KEY = "measurement";
+  static final String TIMESTAMP_KEY = "__ts";
+  static final String MEASUREMENT_KEY = "measurement";
 
   private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
   private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= 
])");
 
+  @Nullable
   private final Set<String> measurementWhitelist;
 
-  public InfluxParser(Set<String> measurementWhitelist)
+  InfluxLineProtocolReader(
+      InputRowSchema inputRowSchema,
+      InputEntity source,
+      @Nullable Set<String> measurementWhitelist
+  )
   {
+    super(inputRowSchema, source);
     this.measurementWhitelist = measurementWhitelist;
   }
 
   @Override
-  public void startFileFromBeginning()
+  public List<InputRow> parseInputRows(String intermediateRow) throws 
ParseException
   {
+    final Map<String, Object> parsed = parseLineToMap(intermediateRow);
+    return 
Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), parsed));
+  }
+
+  @Override
+  protected List<Map<String, Object>> toMap(String intermediateRow)
+  {
+    return Collections.singletonList(parseLineToMap(intermediateRow));
+  }
+
+  @Override
+  public int getNumHeaderLinesToSkip()
+  {
+    return 0;
+  }
+
+  @Override
+  public boolean needsToProcessHeaderLine()
+  {
+    return false;
   }
 
-  @Nullable
   @Override
-  public Map<String, Object> parseToMap(String input)
+  public void processHeaderLine(String line)
+  {
+    // no header lines in influx line protocol
+  }
+
+  private Map<String, Object> parseLineToMap(String input)
   {
     CharStream charStream = new ANTLRInputStream(input);
     InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
@@ -65,10 +99,10 @@ public class InfluxParser implements Parser<String, Object>
 
     List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
     if (parser.getNumberOfSyntaxErrors() != 0) {
-      throw new ParseException(null, "Unable to parse line.");
+      throw new ParseException(input, "Unable to parse line.");
     }
     if (lines.size() != 1) {
-      throw new ParseException(null, "Multiple lines present; unable to parse 
more than one per record.");
+      throw new ParseException(input, "Multiple lines present; unable to parse 
more than one per record.");
     }
 
     Map<String, Object> out = new LinkedHashMap<>();
@@ -77,7 +111,7 @@ public class InfluxParser implements Parser<String, Object>
     String measurement = parseIdentifier(line.identifier());
 
     if (!checkWhitelist(measurement)) {
-      throw new ParseException(null, "Metric [%s] not whitelisted.", 
measurement);
+      throw new ParseException(input, "Metric [%s] not whitelisted.", 
measurement);
     }
 
     out.put(MEASUREMENT_KEY, measurement);
@@ -94,14 +128,14 @@ public class InfluxParser implements Parser<String, Object>
     return out;
   }
 
-  private void parseTag(InfluxLineProtocolParser.Tag_pairContext tag, 
Map<String, Object> out)
+  private static void parseTag(InfluxLineProtocolParser.Tag_pairContext tag, 
Map<String, Object> out)
   {
     String key = parseIdentifier(tag.identifier(0));
     String value = parseIdentifier(tag.identifier(1));
     out.put(key, value);
   }
 
-  private void parseField(InfluxLineProtocolParser.Field_pairContext field, 
Map<String, Object> out)
+  private static void parseField(InfluxLineProtocolParser.Field_pairContext 
field, Map<String, Object> out)
   {
     String key = parseIdentifier(field.identifier());
     InfluxLineProtocolParser.Field_valueContext valueContext = 
field.field_value();
@@ -116,21 +150,20 @@ public class InfluxParser implements Parser<String, 
Object>
     out.put(key, value);
   }
 
-  private Object parseQuotedString(String text)
+  private static Object parseQuotedString(String text)
   {
     return BACKSLASH_PATTERN.matcher(text.substring(1, text.length() - 
1)).replaceAll("\"");
   }
 
-  private Object parseNumber(String raw)
+  private static Object parseNumber(String raw)
   {
     if (raw.endsWith("i")) {
       return Long.valueOf(raw.substring(0, raw.length() - 1));
     }
-
     return Double.valueOf(raw);
   }
 
-  private Object parseBool(String raw)
+  private static Object parseBool(String raw)
   {
     char first = raw.charAt(0);
     if (first == 't' || first == 'T') {
@@ -140,12 +173,11 @@ public class InfluxParser implements Parser<String, 
Object>
     }
   }
 
-  private String parseIdentifier(InfluxLineProtocolParser.IdentifierContext 
ctx)
+  private static String 
parseIdentifier(InfluxLineProtocolParser.IdentifierContext ctx)
   {
     if (ctx.BOOLEAN() != null || ctx.NUMBER() != null) {
       return ctx.getText();
     }
-
     return 
IDENTIFIER_PATTERN.matcher(ctx.IDENTIFIER_STRING().getText()).replaceAll("$1");
   }
 
@@ -154,7 +186,7 @@ public class InfluxParser implements Parser<String, Object>
     return (measurementWhitelist == null) || measurementWhitelist.contains(m);
   }
 
-  private void parseTimestamp(String timestamp, Map<String, Object> dest)
+  private static void parseTimestamp(String timestamp, Map<String, Object> 
dest)
   {
     // Influx timestamps come in nanoseconds; treat anything less than 1 ms as 0
     if (timestamp.length() < 7) {
@@ -165,15 +197,4 @@ public class InfluxParser implements Parser<String, Object>
       dest.put(TIMESTAMP_KEY, timestampMillis);
     }
   }
-
-  @Override
-  public List<String> getFieldNames()
-  {
-    return ImmutableList.of();
-  }
-
-  @Override
-  public void setFieldNames(Iterable<String> fieldNames)
-  {
-  }
 }
diff --git 
a/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParseSpec.java
 
b/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParseSpec.java
deleted file mode 100644
index 39d37dc1884..00000000000
--- 
a/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParseSpec.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.data.input.influx;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Sets;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.ParseSpec;
-import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.java.util.common.parsers.Parser;
-
-import java.util.List;
-
-public class InfluxParseSpec extends ParseSpec
-{
-  private List<String> measurementWhitelist;
-
-  @JsonCreator
-  public InfluxParseSpec(
-      @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
-      @JsonProperty("whitelistMeasurements") List<String> measurementWhitelist
-  )
-  {
-    super(
-        new TimestampSpec(InfluxParser.TIMESTAMP_KEY, "millis", null),
-        dimensionsSpec != null ? dimensionsSpec : DimensionsSpec.EMPTY
-    );
-    this.measurementWhitelist = measurementWhitelist;
-  }
-
-  @Override
-  public Parser<String, Object> makeParser()
-  {
-    if (measurementWhitelist != null && measurementWhitelist.size() > 0) {
-      return new InfluxParser(Sets.newHashSet(measurementWhitelist));
-    } else {
-      return new InfluxParser(null);
-    }
-  }
-
-  @Override
-  public ParseSpec withDimensionsSpec(DimensionsSpec spec)
-  {
-    return new InfluxParseSpec(spec, measurementWhitelist);
-  }
-}
diff --git 
a/extensions-contrib/influx-extensions/src/test/java/org/apache/druid/data/input/influx/InfluxInputFormatTest.java
 
b/extensions-contrib/influx-extensions/src/test/java/org/apache/druid/data/input/influx/InfluxInputFormatTest.java
new file mode 100644
index 00000000000..0b857b42717
--- /dev/null
+++ 
b/extensions-contrib/influx-extensions/src/test/java/org/apache/druid/data/input/influx/InfluxInputFormatTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.influx;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class InfluxInputFormatTest
+{
+  private InputRowSchema schema;
+
+  @Before
+  public void setUp()
+  {
+    schema = new InputRowSchema(
+        new TimestampSpec(InfluxLineProtocolReader.TIMESTAMP_KEY, "millis", 
null),
+        DimensionsSpec.EMPTY,
+        ColumnsFilter.all()
+    );
+  }
+
+  @Test
+  public void testParseRealSample() throws IOException
+  {
+    InputRow row = readSingleRow(
+        new InfluxInputFormat(null),
+        "cpu,host=foo.bar.baz,region=us-east-1,application=echo 
pct_idle=99.3,pct_user=88.8,m1_load=2i 1465839830100400200"
+    );
+    Assert.assertEquals("cpu", row.getDimension("measurement").get(0));
+    Assert.assertEquals(1465839830100L, row.getTimestampFromEpoch());
+    Assert.assertEquals("foo.bar.baz", row.getDimension("host").get(0));
+    Assert.assertEquals("us-east-1", row.getDimension("region").get(0));
+    Assert.assertEquals("echo", row.getDimension("application").get(0));
+    Assert.assertEquals(99.3, row.getMetric("pct_idle").doubleValue(), 0.0);
+    Assert.assertEquals(88.8, row.getMetric("pct_user").doubleValue(), 0.0);
+    Assert.assertEquals(2L, row.getMetric("m1_load").longValue());
+  }
+
+  @Test
+  public void testParseNegativeTimestamp() throws IOException
+  {
+    InputRow row = readSingleRow(
+        new InfluxInputFormat(null),
+        "foo,region=us-east-1,host=127.0.0.1 m=1.0,n=3.0,o=500i -123456789"
+    );
+    Assert.assertEquals("foo", row.getDimension("measurement").get(0));
+    Assert.assertEquals(-123L, row.getTimestampFromEpoch());
+    Assert.assertEquals("us-east-1", row.getDimension("region").get(0));
+    Assert.assertEquals("127.0.0.1", row.getDimension("host").get(0));
+    Assert.assertEquals(1.0, row.getMetric("m").doubleValue(), 0.0);
+    Assert.assertEquals(3.0, row.getMetric("n").doubleValue(), 0.0);
+    Assert.assertEquals(500L, row.getMetric("o").longValue());
+  }
+
+  @Test
+  public void testParseTruncatedTimestamp() throws IOException
+  {
+    InputRow row = readSingleRow(
+        new InfluxInputFormat(null),
+        "foo,region=us-east-1,host=127.0.0.1 m=1.0,n=3.0,o=500i 123"
+    );
+    Assert.assertEquals("foo", row.getDimension("measurement").get(0));
+    Assert.assertEquals(0L, row.getTimestampFromEpoch());
+  }
+
+  @Test
+  public void testParseNoTags() throws IOException
+  {
+    InputRow row = readSingleRow(
+        new InfluxInputFormat(null),
+        "foo m=1.0,n=3.0 123456789"
+    );
+    Assert.assertEquals("foo", row.getDimension("measurement").get(0));
+    Assert.assertEquals(123L, row.getTimestampFromEpoch());
+    Assert.assertEquals(1.0, row.getMetric("m").doubleValue(), 0.0);
+    Assert.assertEquals(3.0, row.getMetric("n").doubleValue(), 0.0);
+  }
+
+  @Test
+  public void testParseQuotedStringFieldValue() throws IOException
+  {
+    InputRow row = readSingleRow(
+        new InfluxInputFormat(null),
+        "foo,region=us-east-1,host=127.0.0.1 m=1.0,n=3.0,o=\"something 
\\\"cool\\\" \" 123456789"
+    );
+    Assert.assertEquals("foo", row.getDimension("measurement").get(0));
+    Assert.assertEquals(123L, row.getTimestampFromEpoch());
+    Assert.assertEquals("something \"cool\" ", row.getDimension("o").get(0));
+  }
+
+  @Test
+  public void testParseUnicodeCharacters() throws IOException
+  {
+    InputRow row = readSingleRow(
+        new InfluxInputFormat(null),
+        "\uD83D\uDE00,\uD83D\uDE05=\uD83D\uDE06 
\uD83D\uDE0B=100i,b=\"\uD83D\uDE42\" 123456789"
+    );
+    Assert.assertEquals("\uD83D\uDE00", 
row.getDimension("measurement").get(0));
+    Assert.assertEquals(123L, row.getTimestampFromEpoch());
+    Assert.assertEquals("\uD83D\uDE06", 
row.getDimension("\uD83D\uDE05").get(0));
+    Assert.assertEquals(100L, row.getMetric("\uD83D\uDE0B").longValue());
+    Assert.assertEquals("\uD83D\uDE42", row.getDimension("b").get(0));
+  }
+
+  @Test
+  public void testParseEscapedCharactersInIdentifiers() throws IOException
+  {
+    InputRow row = readSingleRow(
+        new InfluxInputFormat(null),
+        "f\\,oo\\ \\=,bar=baz m=1.0,n=3.0 123456789"
+    );
+    Assert.assertEquals("f,oo =", row.getDimension("measurement").get(0));
+    Assert.assertEquals(123L, row.getTimestampFromEpoch());
+    Assert.assertEquals("baz", row.getDimension("bar").get(0));
+    Assert.assertEquals(1.0, row.getMetric("m").doubleValue(), 0.0);
+    Assert.assertEquals(3.0, row.getMetric("n").doubleValue(), 0.0);
+  }
+
+  @Test
+  public void testWhitelistPass() throws IOException
+  {
+    InputRow row = readSingleRow(
+        new InfluxInputFormat(ImmutableList.of("cpu")),
+        "cpu,host=foo.bar.baz,region=us-east 
pct_idle=99.3,pct_user=88.8,m1_load=2 1465839830100400200"
+    );
+    Assert.assertEquals("cpu", row.getDimension("measurement").get(0));
+  }
+
+  @Test
+  public void testWhitelistFail()
+  {
+    Assert.assertThrows(
+        ParseException.class,
+        () -> readSingleRow(
+            new InfluxInputFormat(ImmutableList.of("mem")),
+            "cpu,host=foo.bar.baz,region=us-east 
pct_idle=99.3,pct_user=88.8,m1_load=2 1465839830100400200"
+        )
+    );
+  }
+
+  @Test
+  public void testParseEmptyLine() throws IOException
+  {
+    // empty line is skipped (this was a parse exception in old InfluxParser)
+    List<InputRow> rows = readAllRows(new InfluxInputFormat(null), "");
+    Assert.assertTrue(rows.isEmpty());
+  }
+
+  @Test
+  public void testParseInvalidMeasurement()
+  {
+    Assert.assertThrows(
+        ParseException.class,
+        () -> readSingleRow(new InfluxInputFormat(null), "invalid measurement")
+    );
+  }
+
+  @Test
+  public void testParseInvalidTimestamp()
+  {
+    Assert.assertThrows(
+        ParseException.class,
+        () -> readSingleRow(new InfluxInputFormat(null), "foo i=123 123x")
+    );
+  }
+
+  @Test
+  public void testMultipleLines() throws IOException
+  {
+    String input = "cpu,host=a pct_idle=99.3 1465839830100400200\n"
+                   + "mem,host=b used=1024i 1465839831100400200";
+    InfluxInputFormat format = new InfluxInputFormat(null);
+    List<InputRow> rows = readAllRows(format, input);
+    Assert.assertEquals(2, rows.size());
+    Assert.assertEquals("cpu", rows.get(0).getDimension("measurement").get(0));
+    Assert.assertEquals("a", rows.get(0).getDimension("host").get(0));
+    Assert.assertEquals("mem", rows.get(1).getDimension("measurement").get(0));
+    Assert.assertEquals("b", rows.get(1).getDimension("host").get(0));
+  }
+
+  @Test
+  public void testSerde() throws Exception
+  {
+    ObjectMapper mapper = new DefaultObjectMapper();
+    for (Module module : new InfluxExtensionsModule().getJacksonModules()) {
+      mapper.registerModule(module);
+    }
+
+    InfluxInputFormat format = new InfluxInputFormat(ImmutableList.of("cpu", 
"mem"));
+    String json = mapper.writeValueAsString(format);
+    InfluxInputFormat deserialized = (InfluxInputFormat) 
mapper.readValue(json, InputFormat.class);
+
+    Assert.assertEquals(format, deserialized);
+    Assert.assertEquals(format.getWhitelistMeasurements(), 
deserialized.getWhitelistMeasurements());
+  }
+
+  @Test
+  public void testSerdeNoWhitelist() throws Exception
+  {
+    ObjectMapper mapper = new DefaultObjectMapper();
+    for (Module module : new InfluxExtensionsModule().getJacksonModules()) {
+      mapper.registerModule(module);
+    }
+
+    InfluxInputFormat format = new InfluxInputFormat(null);
+    String json = mapper.writeValueAsString(format);
+    InfluxInputFormat deserialized = (InfluxInputFormat) 
mapper.readValue(json, InputFormat.class);
+
+    Assert.assertEquals(format, deserialized);
+    Assert.assertNull(deserialized.getWhitelistMeasurements());
+  }
+
+  @Test
+  public void testIsSplittable()
+  {
+    Assert.assertFalse(new InfluxInputFormat(null).isSplittable());
+  }
+
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(InfluxInputFormat.class).usingGetClass().verify();
+  }
+
+  private InputRow readSingleRow(InfluxInputFormat format, String line) throws 
IOException
+  {
+    InputEntityReader reader = format.createReader(schema, new 
ByteEntity(StringUtils.toUtf8(line)), null);
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      Assert.assertTrue(iterator.hasNext());
+      InputRow row = iterator.next();
+      Assert.assertFalse(iterator.hasNext());
+      return row;
+    }
+  }
+
+  private List<InputRow> readAllRows(InfluxInputFormat format, String data) 
throws IOException
+  {
+    InputEntityReader reader = format.createReader(schema, new 
ByteEntity(StringUtils.toUtf8(data)), null);
+    List<InputRow> rows = new ArrayList<>();
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      while (iterator.hasNext()) {
+        rows.add(iterator.next());
+      }
+    }
+    return rows;
+  }
+}
diff --git 
a/extensions-contrib/influx-extensions/src/test/java/org/apache/druid/data/input/influx/InfluxParserTest.java
 
b/extensions-contrib/influx-extensions/src/test/java/org/apache/druid/data/input/influx/InfluxParserTest.java
deleted file mode 100644
index 49307f91e0a..00000000000
--- 
a/extensions-contrib/influx-extensions/src/test/java/org/apache/druid/data/input/influx/InfluxParserTest.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.data.input.influx;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import junitparams.JUnitParamsRunner;
-import junitparams.Parameters;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.java.util.common.parsers.Parser;
-import org.hamcrest.MatcherAssert;
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@RunWith(JUnitParamsRunner.class)
-public class InfluxParserTest
-{
-  @SuppressWarnings("unused")
-  private String name;
-  @SuppressWarnings("unused")
-  private String input;
-  @SuppressWarnings("unused")
-  private Map<String, Object> expected;
-
-  private static Object[] testCase(String name, String input, Parsed expected)
-  {
-    return Lists.newArrayList(name, input, expected).toArray();
-  }
-
-  public Object[] testData()
-  {
-    return Lists.newArrayList(
-        testCase(
-            "real sample",
-            "cpu,host=foo.bar.baz,region=us-east-1,application=echo 
pct_idle=99.3,pct_user=88.8,m1_load=2i 1465839830100400200",
-            Parsed.row("cpu", 1465839830100L)
-                  .with("host", "foo.bar.baz")
-                  .with("region", "us-east-1")
-                  .with("application", "echo")
-                  .with("pct_idle", 99.3)
-                  .with("pct_user", 88.8)
-                  .with("m1_load", 2L)
-        ),
-        testCase(
-            "negative timestamp",
-            "foo,region=us-east-1,host=127.0.0.1 m=1.0,n=3.0,o=500i 
-123456789",
-            Parsed.row("foo", -123L)
-                  .with("region", "us-east-1")
-                  .with("host", "127.0.0.1")
-                  .with("m", 1.0)
-                  .with("n", 3.0)
-                  .with("o", 500L)
-        ),
-        testCase(
-            "truncated timestamp",
-            "foo,region=us-east-1,host=127.0.0.1 m=1.0,n=3.0,o=500i 123",
-            Parsed.row("foo", 0L)
-                  .with("region", "us-east-1")
-                  .with("host", "127.0.0.1")
-                  .with("m", 1.0)
-                  .with("n", 3.0)
-                  .with("o", 500L)
-        ),
-        testCase(
-            "special characters",
-            "!@#$%^&*()_-\\=+,+++\\ +++=--\\ --- __**__=\"ü\" 123456789",
-            Parsed.row("!@#$%^&*()_-=+", 123L)
-                  .with("+++ +++", "-- ---")
-                  .with("__**__", "127.0.0.1")
-                  .with("__**__", "ü")
-        ),
-        testCase(
-            "unicode characters",
-            "\uD83D\uDE00,\uD83D\uDE05=\uD83D\uDE06 
\uD83D\uDE0B=100i,b=\"\uD83D\uDE42\" 123456789",
-            Parsed.row("\uD83D\uDE00", 123L)
-                  .with("\uD83D\uDE05", "\uD83D\uDE06")
-                  .with("\uD83D\uDE0B", 100L)
-                  .with("b", "\uD83D\uDE42")
-        ),
-        testCase(
-            "quoted string measurement value",
-            "foo,region=us-east-1,host=127.0.0.1 m=1.0,n=3.0,o=\"something 
\\\"cool\\\" \" 123456789",
-            Parsed.row("foo", 123L)
-                  .with("region", "us-east-1")
-                  .with("host", "127.0.0.1")
-                  .with("m", 1.0)
-                  .with("n", 3.0)
-                  .with("o", "something \"cool\" ")
-        ),
-        testCase(
-            "no tags",
-            "foo m=1.0,n=3.0 123456789",
-            Parsed.row("foo", 123L)
-                  .with("m", 1.0)
-                  .with("n", 3.0)
-        ),
-        testCase(
-            "Escaped characters in identifiers",
-            "f\\,oo\\ \\=,bar=baz m=1.0,n=3.0 123456789",
-            Parsed.row("f,oo =", 123L)
-                  .with("bar", "baz")
-                  .with("m", 1.0)
-                  .with("n", 3.0)
-        ),
-        testCase(
-            "Escaped characters in identifiers",
-            "foo\\ \\=,bar=baz m=1.0,n=3.0 123456789",
-            Parsed.row("foo =", 123L)
-                  .with("bar", "baz")
-                  .with("m", 1.0)
-                  .with("n", 3.0)
-        )
-    ).toArray();
-  }
-
-  @Test
-  @Parameters(method = "testData")
-  public void testParse(String name, String input, Parsed expected)
-  {
-    Parser<String, Object> parser = new InfluxParser(null);
-    Map<String, Object> parsed = parser.parseToMap(input);
-    MatcherAssert.assertThat(
-        "correct measurement name",
-        parsed.get("measurement"),
-        Matchers.equalTo(expected.measurement)
-    );
-    MatcherAssert.assertThat(
-        "correct timestamp",
-        parsed.get(InfluxParser.TIMESTAMP_KEY),
-        Matchers.equalTo(expected.timestamp)
-    );
-    expected.kv.forEach((k, v) -> MatcherAssert.assertThat("correct field " + 
k, parsed.get(k), Matchers.equalTo(v)));
-    parsed.remove("measurement");
-    parsed.remove(InfluxParser.TIMESTAMP_KEY);
-    MatcherAssert.assertThat("No extra keys in parsed data", parsed.keySet(), 
Matchers.equalTo(expected.kv.keySet()));
-  }
-
-  @Test
-  public void testParseWhitelistPass()
-  {
-    Parser<String, Object> parser = new InfluxParser(Sets.newHashSet("cpu"));
-    String input = "cpu,host=foo.bar.baz,region=us-east,application=echo 
pct_idle=99.3,pct_user=88.8,m1_load=2 1465839830100400200";
-    Map<String, Object> parsed = parser.parseToMap(input);
-    MatcherAssert.assertThat(parsed.get("measurement"), 
Matchers.equalTo("cpu"));
-  }
-
-  @Test
-  public void testParseWhitelistFail()
-  {
-    Parser<String, Object> parser = new InfluxParser(Sets.newHashSet("mem"));
-    String input = "cpu,host=foo.bar.baz,region=us-east,application=echo 
pct_idle=99.3,pct_user=88.8,m1_load=2 1465839830100400200";
-    try {
-      parser.parseToMap(input);
-    }
-    catch (ParseException t) {
-      MatcherAssert.assertThat(t, Matchers.isA(ParseException.class));
-      return;
-    }
-
-    Assert.fail("Exception not thrown");
-  }
-
-  public Object[] failureTestData()
-  {
-    return Lists.newArrayList(
-        Pair.of("Empty line", ""),
-        Pair.of("Invalid measurement", "invalid measurement"),
-        Pair.of("Invalid timestamp", "foo i=123 123x")
-    ).toArray();
-  }
-
-  @Test
-  @Parameters(method = "failureTestData")
-  public void testParseFailures(Pair<String, String> testCase)
-  {
-    Parser<String, Object> parser = new InfluxParser(null);
-    try {
-      parser.parseToMap(testCase.rhs);
-    }
-    catch (ParseException t) {
-      MatcherAssert.assertThat(t, Matchers.isA(ParseException.class));
-      return;
-    }
-
-    Assert.fail(testCase.rhs + ": exception not thrown");
-  }
-
-  private static class Parsed
-  {
-    private String measurement;
-    private Long timestamp;
-    private final Map<String, Object> kv = new HashMap<>();
-
-    static Parsed row(String measurement, Long timestamp)
-    {
-      Parsed e = new Parsed();
-      e.measurement = measurement;
-      e.timestamp = timestamp;
-      return e;
-    }
-
-    Parsed with(String k, Object v)
-    {
-      kv.put(k, v);
-      return this;
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to