Hello,
I've got interested in Apache Druid and decided to study it. I've decided to 
complete an example of sending data from one data source to another. For this 
goal, I'm using ingestSegment Firehose. In the parser description, I'm adding 
flattenSpec (the parser type is string, the format is json).
Here goes the configuration:
{
  "type" : "index",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "cp37-data8",
      "parser" : {
        "type" : "string",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "__time",
            "format" : "auto"
          },
          "flattenSpec": {
            "useFieldDiscovery": true,
            "fields": [
              {
                "type": "jq",
                "name": "resourceItemStatusDetails_updateDateTime",
                "expr": ".fullDocument_data | 
fromjson.resourceItemStatusDetails.updateDateTime.\"$date\""
              }
            ]
          },
          "dimensionsSpec" : {
            "dimensions": [
              "operationType",
              "databaseName",
              "collectionName",
              "fullDocument_id",
              "fullDocument_docId",
              "resourceItemStatusDetails_updateDateTime",
              {
                "type": "long",
                "name": "clusterTime"
              }
            ],
            "dimensionExclusions" : [

            ],
            "spatialDimensions" : []
          }
        }
      },
      "metricsSpec" : [
        {
          "type" : "count",
          "name" : "count"
        }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "DAY",
        "queryGranularity" : "NONE"
      }
    },
    "ioConfig" : {
      "type" : "index",
      "firehose" : {
        "type" : "ingestSegment",
        "dataSource" : "cp-all-buffer",
        "interval" : "2018-01-01/2020-01-03"
      },
      "appendToExisting" : false
    },
    "tuningConfig" : {
      "type" : "index",
      "maxRowsPerSegment" : 100000,
      "maxRowsInMemory" : 1000
    }
  }
}

The task itself is executed successfully, but settings which I set up in the 
parser are being ignored during the execution. I've taken a look at the source 
code for Druid, and it seems that I have found a bug.
If you'll take a look at the IngestSegmentFirehoseFactory class, you'll see 
that we pass only TransformSpec (which we got from the parser) to the 
IngestSegmentFirehose constructor, but not the parser itself.
final TransformSpec transformSpec = 
TransformSpec.fromInputRowParser(inputRowParser);
return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, 
dimFilter);

Next, in IngestSegmentFirehose we're creating a transformer and perform a 
transformation.

final InputRow inputRow = rowYielder.get();


rowYielder = rowYielder.next(null);


return transformer.transform(inputRow);

During this stage, we have already lost call of the method parse on the parser, 
which explains the fact that in my example parser settings were ignored.
It raises the question, why don't we just pass the parser itself to the 
IngestSegmentFirehose constructor? If we'll take a look at the 
TransformSpec.fromInputRowParser method implementation, we'll see that there's 
always either a decorator with a transformer or error, so in the 
implementations of such parsers in methods parse transformer always being 
called additionally.

parser.parseBatch(row).stream().map(transformer::transform).collect(Collectors.toList());

Could please anyone clarify if this is intentional behaviour or a bug? Thanks!

Reply via email to