FrankChen021 commented on code in PR #19547:
URL: https://github.com/apache/druid/pull/19547#discussion_r3356164347


##########
web-console/src/helpers/supervisor-conversion.ts:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { C, F, L, SqlExpression, SqlQuery } from 'druid-query-toolkit';
+
+interface MetricSpec {
+  type: string;
+  name?: string;
+  fieldName?: string;
+  maxStringBytes?: number;
+  size?: number;
+  lgK?: number;
+  tgtHllType?: string;
+  k?: number;
+}
+
+function extraArgs(...args: Array<[any, any]>): string {
+  const filtered = args.filter(([value, defaultValue]) => value !== undefined 
&& value !== defaultValue);
+  if (filtered.length === 0) return '';
+  return ', ' + filtered.map(([value]) => (typeof value === 'string' ? 
`'${value}'` : value)).join(', ');
+}
+
+function metricSpecToSqlExpression(metricSpec: MetricSpec): SqlExpression | 
undefined {
+  if (metricSpec.type === 'count') {
+    return SqlExpression.parse('COUNT(*)');
+  }
+
+  if (!metricSpec.fieldName) return undefined;
+  const column = C(metricSpec.fieldName);
+
+  switch (metricSpec.type) {
+    case 'longSum':
+    case 'floatSum':
+    case 'doubleSum':
+      return F('SUM', column);
+
+    case 'longMin':
+    case 'floatMin':
+    case 'doubleMin':
+      return F('MIN', column);
+
+    case 'longMax':
+    case 'floatMax':
+    case 'doubleMax':
+      return F('MAX', column);
+
+    case 'doubleFirst':
+    case 'floatFirst':
+    case 'longFirst':
+      return F('EARLIEST', column);
+
+    case 'stringFirst':
+      return F('EARLIEST', column, L(metricSpec.maxStringBytes || 128));
+
+    case 'doubleLast':
+    case 'floatLast':
+    case 'longLast':
+      return F('LATEST', column);
+
+    case 'stringLast':
+      return F('LATEST', column, L(metricSpec.maxStringBytes || 128));
+
+    case 'thetaSketch':
+      return 
SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_THETA(${column}${extraArgs([metricSpec.size,
 16384])})`);
+
+    case 'HLLSketchBuild':
+    case 'HLLSketchMerge':
+      return 
SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_HLL(${column}${extraArgs([metricSpec.lgK,
 12], [metricSpec.tgtHllType, 'HLL_4'])})`);
+
+    case 'quantilesDoublesSketch':
+      return 
SqlExpression.parse(`DS_QUANTILES_SKETCH(${column}${extraArgs([metricSpec.k, 
128])})`);
+
+    case 'hyperUnique':
+      return F('APPROX_COUNT_DISTINCT_BUILTIN', column);
+
+    default:
+      // Unsupported: tDigestSketch, momentSketch, fixedBucketsHistogram
+      return undefined;
+  }
+}
+
+export interface SupervisorSpec {
+  type: string;
+  spec: {
+    dataSchema: {
+      dataSource: string;
+      timestampSpec: {
+        column: string;
+        format: string;
+      };
+      dimensionsSpec?: {
+        dimensions: Array<string | { name: string; type: string }>;
+      };
+      metricsSpec?: Array<{ name?: string; fieldName?: string; type: string }>;
+    };
+    ioConfig?: {
+      topic?: string;
+      inputSource?: {
+        type: string;
+        uris?: string[];
+        baseDir?: string;
+      };
+    };
+  };
+}
+
+export interface SupervisorConversionOptions {
+  fileLocation: string;
+  fileType: string;
+}
+
+interface QueryWithContext {
+  queryString: string;
+  queryContext: Record<string, any>;
+}
+
+function extractDimensionName(dimension: string | { name: string; type: string 
}): string {
+  return typeof dimension === 'string' ? dimension : dimension.name;
+}
+
+export function convertSupervisorToSql(
+  supervisorSpec: SupervisorSpec,
+  options: SupervisorConversionOptions,
+): QueryWithContext {
+  const { fileLocation, fileType } = options;
+  const { dataSchema } = supervisorSpec.spec;
+
+  if (!dataSchema) {
+    throw new Error('Supervisor spec missing dataSchema');
+  }
+
+  const datasource = dataSchema.dataSource;
+  if (!datasource) {
+    throw new Error('Supervisor spec missing dataSource');
+  }
+
+  const timestampColumn = dataSchema.timestampSpec?.column || '__time';
+  const timestampFormat = dataSchema.timestampSpec?.format || 'auto';
+
+  // Extract dimensions
+  const dimensions = (dataSchema.dimensionsSpec?.dimensions || 
[]).map(extractDimensionName);
+
+  // Extract and convert metrics to SQL aggregations
+  const metricSpecs = dataSchema.metricsSpec || [];
+  const metricExpressions: Array<{ expr: SqlExpression; name: string }> = [];
+  
+  for (const metricSpec of metricSpecs) {
+    const sqlExpr = metricSpecToSqlExpression(metricSpec as MetricSpec);
+    if (sqlExpr && metricSpec.name) {
+      metricExpressions.push({ expr: sqlExpr, name: metricSpec.name });
+    }
+  }
+
+  // Determine if we need GROUP BY (if we have aggregations)
+  const hasAggregations = metricExpressions.length > 0;
+
+  // Build column list for SELECT
+  // If no aggregations, just select dimensions as-is
+  // If aggregations exist, dimensions become GROUP BY and we add aggregations
+  const selectColumns = dimensions;
+  
+  // All columns for EXTERN (includes timestamp + all raw input columns)
+  // For EXTERN, we need all the fieldNames that metrics reference
+  const metricFieldNames = metricSpecs
+    .map(m => m.fieldName)
+    .filter((name): name is string => !!name);
+  const allExternColumns = [timestampColumn, ...dimensions, 
...metricFieldNames];
+  const uniqueExternColumns = Array.from(new Set(allExternColumns));
+
+  // Create input format based on file type
+  const inputFormatType = fileType === 'json' ? 'json' : fileType === 'csv' ? 
'csv' : fileType;
+
+  // Build proper Druid input source
+  let inputSource: any;
+  if (fileLocation.startsWith('s3://')) {
+    inputSource = {
+      type: 's3',
+      uris: [fileLocation],
+    };
+    // Add objectGlob based on file type if it's a directory
+    if (fileLocation.endsWith('/')) {
+      inputSource.objectGlob = `**.${inputFormatType}`;
+    }
+  } else if (fileLocation.startsWith('gs://')) {
+    inputSource = {
+      type: 'google',
+      uris: [fileLocation],
+    };
+  } else if (fileLocation.startsWith('http://') || 
fileLocation.startsWith('https://')) {
+    inputSource = {
+      type: 'http',
+      uris: [fileLocation],
+    };
+  } else {
+    // Default to local for file:// or absolute paths
+    inputSource = {
+      type: 'local',
+      baseDir: fileLocation.replace('file://', ''),
+      filter: `*.${inputFormatType}`,
+    };
+  }
+
+  // Build EXTERN expression with proper escaping
+  const columnSchema = uniqueExternColumns.map(col => ({name: col, type: 
'string'}));

Review Comment:
   [P1] EXTERN declares numeric inputs as strings
   
   The generated EXTERN signature marks every raw column as string, including 
metric fields used by SUM, MIN, MAX, sketches, and typed dimensions. A common 
longSum or doubleSum supervisor will therefore generate numeric aggregations 
over VARCHAR inputs, which can fail planning or ingest with wrong types. 
Preserve metric and dimension native types as the existing spec converter does.



##########
web-console/src/helpers/supervisor-conversion.ts:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { C, F, L, SqlExpression, SqlQuery } from 'druid-query-toolkit';
+
+interface MetricSpec {
+  type: string;
+  name?: string;
+  fieldName?: string;
+  maxStringBytes?: number;
+  size?: number;
+  lgK?: number;
+  tgtHllType?: string;
+  k?: number;
+}
+
+function extraArgs(...args: Array<[any, any]>): string {
+  const filtered = args.filter(([value, defaultValue]) => value !== undefined 
&& value !== defaultValue);
+  if (filtered.length === 0) return '';
+  return ', ' + filtered.map(([value]) => (typeof value === 'string' ? 
`'${value}'` : value)).join(', ');
+}
+
+function metricSpecToSqlExpression(metricSpec: MetricSpec): SqlExpression | 
undefined {
+  if (metricSpec.type === 'count') {
+    return SqlExpression.parse('COUNT(*)');
+  }
+
+  if (!metricSpec.fieldName) return undefined;
+  const column = C(metricSpec.fieldName);
+
+  switch (metricSpec.type) {
+    case 'longSum':
+    case 'floatSum':
+    case 'doubleSum':
+      return F('SUM', column);
+
+    case 'longMin':
+    case 'floatMin':
+    case 'doubleMin':
+      return F('MIN', column);
+
+    case 'longMax':
+    case 'floatMax':
+    case 'doubleMax':
+      return F('MAX', column);
+
+    case 'doubleFirst':
+    case 'floatFirst':
+    case 'longFirst':
+      return F('EARLIEST', column);
+
+    case 'stringFirst':
+      return F('EARLIEST', column, L(metricSpec.maxStringBytes || 128));
+
+    case 'doubleLast':
+    case 'floatLast':
+    case 'longLast':
+      return F('LATEST', column);
+
+    case 'stringLast':
+      return F('LATEST', column, L(metricSpec.maxStringBytes || 128));
+
+    case 'thetaSketch':
+      return 
SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_THETA(${column}${extraArgs([metricSpec.size,
 16384])})`);
+
+    case 'HLLSketchBuild':
+    case 'HLLSketchMerge':
+      return 
SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_HLL(${column}${extraArgs([metricSpec.lgK,
 12], [metricSpec.tgtHllType, 'HLL_4'])})`);
+
+    case 'quantilesDoublesSketch':
+      return 
SqlExpression.parse(`DS_QUANTILES_SKETCH(${column}${extraArgs([metricSpec.k, 
128])})`);
+
+    case 'hyperUnique':
+      return F('APPROX_COUNT_DISTINCT_BUILTIN', column);
+
+    default:
+      // Unsupported: tDigestSketch, momentSketch, fixedBucketsHistogram
+      return undefined;
+  }
+}
+
+export interface SupervisorSpec {
+  type: string;
+  spec: {
+    dataSchema: {
+      dataSource: string;
+      timestampSpec: {
+        column: string;
+        format: string;
+      };
+      dimensionsSpec?: {
+        dimensions: Array<string | { name: string; type: string }>;
+      };
+      metricsSpec?: Array<{ name?: string; fieldName?: string; type: string }>;
+    };
+    ioConfig?: {
+      topic?: string;
+      inputSource?: {
+        type: string;
+        uris?: string[];
+        baseDir?: string;
+      };
+    };
+  };
+}
+
+export interface SupervisorConversionOptions {
+  fileLocation: string;
+  fileType: string;
+}
+
+interface QueryWithContext {
+  queryString: string;
+  queryContext: Record<string, any>;
+}
+
+function extractDimensionName(dimension: string | { name: string; type: string 
}): string {
+  return typeof dimension === 'string' ? dimension : dimension.name;
+}
+
+export function convertSupervisorToSql(
+  supervisorSpec: SupervisorSpec,
+  options: SupervisorConversionOptions,
+): QueryWithContext {
+  const { fileLocation, fileType } = options;
+  const { dataSchema } = supervisorSpec.spec;
+
+  if (!dataSchema) {
+    throw new Error('Supervisor spec missing dataSchema');
+  }
+
+  const datasource = dataSchema.dataSource;
+  if (!datasource) {
+    throw new Error('Supervisor spec missing dataSource');
+  }
+
+  const timestampColumn = dataSchema.timestampSpec?.column || '__time';
+  const timestampFormat = dataSchema.timestampSpec?.format || 'auto';
+
+  // Extract dimensions
+  const dimensions = (dataSchema.dimensionsSpec?.dimensions || 
[]).map(extractDimensionName);
+
+  // Extract and convert metrics to SQL aggregations
+  const metricSpecs = dataSchema.metricsSpec || [];
+  const metricExpressions: Array<{ expr: SqlExpression; name: string }> = [];
+  
+  for (const metricSpec of metricSpecs) {
+    const sqlExpr = metricSpecToSqlExpression(metricSpec as MetricSpec);
+    if (sqlExpr && metricSpec.name) {
+      metricExpressions.push({ expr: sqlExpr, name: metricSpec.name });
+    }
+  }
+
+  // Determine if we need GROUP BY (if we have aggregations)
+  const hasAggregations = metricExpressions.length > 0;
+
+  // Build column list for SELECT
+  // If no aggregations, just select dimensions as-is
+  // If aggregations exist, dimensions become GROUP BY and we add aggregations
+  const selectColumns = dimensions;
+  
+  // All columns for EXTERN (includes timestamp + all raw input columns)
+  // For EXTERN, we need all the fieldNames that metrics reference
+  const metricFieldNames = metricSpecs
+    .map(m => m.fieldName)
+    .filter((name): name is string => !!name);
+  const allExternColumns = [timestampColumn, ...dimensions, 
...metricFieldNames];
+  const uniqueExternColumns = Array.from(new Set(allExternColumns));
+
+  // Create input format based on file type
+  const inputFormatType = fileType === 'json' ? 'json' : fileType === 'csv' ? 
'csv' : fileType;
+
+  // Build proper Druid input source
+  let inputSource: any;
+  if (fileLocation.startsWith('s3://')) {
+    inputSource = {
+      type: 's3',
+      uris: [fileLocation],
+    };
+    // Add objectGlob based on file type if it's a directory
+    if (fileLocation.endsWith('/')) {
+      inputSource.objectGlob = `**.${inputFormatType}`;
+    }
+  } else if (fileLocation.startsWith('gs://')) {
+    inputSource = {
+      type: 'google',
+      uris: [fileLocation],
+    };
+  } else if (fileLocation.startsWith('http://') || 
fileLocation.startsWith('https://')) {
+    inputSource = {
+      type: 'http',
+      uris: [fileLocation],
+    };
+  } else {
+    // Default to local for file:// or absolute paths
+    inputSource = {
+      type: 'local',
+      baseDir: fileLocation.replace('file://', ''),
+      filter: `*.${inputFormatType}`,
+    };
+  }
+
+  // Build EXTERN expression with proper escaping
+  const columnSchema = uniqueExternColumns.map(col => ({name: col, type: 
'string'}));
+  const columnSchemaJson = JSON.stringify(columnSchema);
+  const inputSourceJson = JSON.stringify(inputSource);
+  const inputFormatJson = JSON.stringify({type: inputFormatType});

Review Comment:
   [P2] Input format details are discarded
   
   The input format is reduced to only {type: fileType}, dropping the 
supervisor's ioConfig.inputFormat settings such as CSV columns/header handling, 
JSON flattenSpec, Kafka metadata formats, Avro/Protobuf schemas, and other 
parser options. Those supervisors convert to SQL that reads different columns 
or cannot parse the backfill files.



##########
web-console/src/helpers/supervisor-conversion.ts:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { C, F, L, SqlExpression, SqlQuery } from 'druid-query-toolkit';
+
+interface MetricSpec {
+  type: string;
+  name?: string;
+  fieldName?: string;
+  maxStringBytes?: number;
+  size?: number;
+  lgK?: number;
+  tgtHllType?: string;
+  k?: number;
+}
+
+function extraArgs(...args: Array<[any, any]>): string {
+  const filtered = args.filter(([value, defaultValue]) => value !== undefined 
&& value !== defaultValue);
+  if (filtered.length === 0) return '';
+  return ', ' + filtered.map(([value]) => (typeof value === 'string' ? 
`'${value}'` : value)).join(', ');
+}
+
+function metricSpecToSqlExpression(metricSpec: MetricSpec): SqlExpression | 
undefined {
+  if (metricSpec.type === 'count') {
+    return SqlExpression.parse('COUNT(*)');
+  }
+
+  if (!metricSpec.fieldName) return undefined;
+  const column = C(metricSpec.fieldName);
+
+  switch (metricSpec.type) {
+    case 'longSum':
+    case 'floatSum':
+    case 'doubleSum':
+      return F('SUM', column);
+
+    case 'longMin':
+    case 'floatMin':
+    case 'doubleMin':
+      return F('MIN', column);
+
+    case 'longMax':
+    case 'floatMax':
+    case 'doubleMax':
+      return F('MAX', column);
+
+    case 'doubleFirst':
+    case 'floatFirst':
+    case 'longFirst':
+      return F('EARLIEST', column);
+
+    case 'stringFirst':
+      return F('EARLIEST', column, L(metricSpec.maxStringBytes || 128));
+
+    case 'doubleLast':
+    case 'floatLast':
+    case 'longLast':
+      return F('LATEST', column);
+
+    case 'stringLast':
+      return F('LATEST', column, L(metricSpec.maxStringBytes || 128));
+
+    case 'thetaSketch':
+      return 
SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_THETA(${column}${extraArgs([metricSpec.size,
 16384])})`);
+
+    case 'HLLSketchBuild':
+    case 'HLLSketchMerge':
+      return 
SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_HLL(${column}${extraArgs([metricSpec.lgK,
 12], [metricSpec.tgtHllType, 'HLL_4'])})`);
+
+    case 'quantilesDoublesSketch':
+      return 
SqlExpression.parse(`DS_QUANTILES_SKETCH(${column}${extraArgs([metricSpec.k, 
128])})`);
+
+    case 'hyperUnique':
+      return F('APPROX_COUNT_DISTINCT_BUILTIN', column);
+
+    default:
+      // Unsupported: tDigestSketch, momentSketch, fixedBucketsHistogram
+      return undefined;
+  }
+}
+
+export interface SupervisorSpec {
+  type: string;
+  spec: {
+    dataSchema: {
+      dataSource: string;
+      timestampSpec: {
+        column: string;
+        format: string;
+      };
+      dimensionsSpec?: {
+        dimensions: Array<string | { name: string; type: string }>;
+      };
+      metricsSpec?: Array<{ name?: string; fieldName?: string; type: string }>;
+    };
+    ioConfig?: {
+      topic?: string;
+      inputSource?: {
+        type: string;
+        uris?: string[];
+        baseDir?: string;
+      };
+    };
+  };
+}
+
+export interface SupervisorConversionOptions {
+  fileLocation: string;
+  fileType: string;
+}
+
+interface QueryWithContext {
+  queryString: string;
+  queryContext: Record<string, any>;
+}
+
+function extractDimensionName(dimension: string | { name: string; type: string 
}): string {
+  return typeof dimension === 'string' ? dimension : dimension.name;
+}
+
+export function convertSupervisorToSql(
+  supervisorSpec: SupervisorSpec,
+  options: SupervisorConversionOptions,
+): QueryWithContext {
+  const { fileLocation, fileType } = options;
+  const { dataSchema } = supervisorSpec.spec;
+
+  if (!dataSchema) {
+    throw new Error('Supervisor spec missing dataSchema');
+  }
+
+  const datasource = dataSchema.dataSource;
+  if (!datasource) {
+    throw new Error('Supervisor spec missing dataSource');
+  }
+
+  const timestampColumn = dataSchema.timestampSpec?.column || '__time';
+  const timestampFormat = dataSchema.timestampSpec?.format || 'auto';
+
+  // Extract dimensions
+  const dimensions = (dataSchema.dimensionsSpec?.dimensions || 
[]).map(extractDimensionName);
+
+  // Extract and convert metrics to SQL aggregations
+  const metricSpecs = dataSchema.metricsSpec || [];
+  const metricExpressions: Array<{ expr: SqlExpression; name: string }> = [];
+  
+  for (const metricSpec of metricSpecs) {
+    const sqlExpr = metricSpecToSqlExpression(metricSpec as MetricSpec);
+    if (sqlExpr && metricSpec.name) {
+      metricExpressions.push({ expr: sqlExpr, name: metricSpec.name });
+    }
+  }
+
+  // Determine if we need GROUP BY (if we have aggregations)
+  const hasAggregations = metricExpressions.length > 0;
+
+  // Build column list for SELECT
+  // If no aggregations, just select dimensions as-is
+  // If aggregations exist, dimensions become GROUP BY and we add aggregations
+  const selectColumns = dimensions;
+  
+  // All columns for EXTERN (includes timestamp + all raw input columns)
+  // For EXTERN, we need all the fieldNames that metrics reference
+  const metricFieldNames = metricSpecs
+    .map(m => m.fieldName)
+    .filter((name): name is string => !!name);
+  const allExternColumns = [timestampColumn, ...dimensions, 
...metricFieldNames];
+  const uniqueExternColumns = Array.from(new Set(allExternColumns));
+
+  // Create input format based on file type
+  const inputFormatType = fileType === 'json' ? 'json' : fileType === 'csv' ? 
'csv' : fileType;
+
+  // Build proper Druid input source
+  let inputSource: any;
+  if (fileLocation.startsWith('s3://')) {
+    inputSource = {
+      type: 's3',
+      uris: [fileLocation],
+    };
+    // Add objectGlob based on file type if it's a directory
+    if (fileLocation.endsWith('/')) {
+      inputSource.objectGlob = `**.${inputFormatType}`;
+    }
+  } else if (fileLocation.startsWith('gs://')) {
+    inputSource = {
+      type: 'google',
+      uris: [fileLocation],
+    };
+  } else if (fileLocation.startsWith('http://') || 
fileLocation.startsWith('https://')) {
+    inputSource = {
+      type: 'http',
+      uris: [fileLocation],
+    };
+  } else {
+    // Default to local for file:// or absolute paths
+    inputSource = {
+      type: 'local',
+      baseDir: fileLocation.replace('file://', ''),
+      filter: `*.${inputFormatType}`,
+    };
+  }
+
+  // Build EXTERN expression with proper escaping
+  const columnSchema = uniqueExternColumns.map(col => ({name: col, type: 
'string'}));
+  const columnSchemaJson = JSON.stringify(columnSchema);
+  const inputSourceJson = JSON.stringify(inputSource);
+  const inputFormatJson = JSON.stringify({type: inputFormatType});
+  
+  const externExpression = F(
+    'EXTERN',
+    SqlExpression.parse(`'${inputSourceJson.replace(/'/g, "''")}'`),
+    SqlExpression.parse(`'${inputFormatJson.replace(/'/g, "''")}'`),
+    SqlExpression.parse(`'${columnSchemaJson.replace(/'/g, "''")}'`),
+  );
+
+  // Build SELECT expressions
+  const selectExpressions: SqlExpression[] = selectColumns.map(col => C(col));
+
+  // Add metric aggregations
+  for (const { expr, name } of metricExpressions) {
+    selectExpressions.push(expr.as(name));
+  }
+
+  // Add timestamp parsing
+  const timeParseExpression =
+    timestampFormat === 'auto'
+      ? F('TIME_PARSE', C(timestampColumn))
+      : F('TIME_PARSE', C(timestampColumn), 
SqlExpression.parse(`'${timestampFormat}'`));
+
+  selectExpressions.push(timeParseExpression.as('__time'));
+
+  // Build the query using druid-query-toolkit
+  let query = SqlQuery.from(F('TABLE', externExpression));
+  
+  // Add select expressions one by one
+  for (const expr of selectExpressions) {
+    query = query.addSelect(expr);
+  }
+
+  // Add GROUP BY if we have aggregations
+  if (hasAggregations) {
+    const groupByExprs: any[] = [timeParseExpression];
+    dimensions.forEach(d => groupByExprs.push(C(d)));
+    query = query.changeGroupByExpressions(groupByExprs as any);
+  }
+
+  // Convert to string and manually add INSERT, PARTITIONED BY, and CLUSTERED 
BY
+  // because the query builder API is giving us trouble
+  let sqlString = query.toString();
+  
+  // Prepend INSERT INTO
+  sqlString = `INSERT INTO ${C(datasource)}\n${sqlString}`;
+
+  // Append PARTITIONED BY
+  sqlString += `\nPARTITIONED BY DAY`;

Review Comment:
   [P1] Supervisor granularity is ignored
   
   The conversion always appends PARTITIONED BY DAY and never applies 
dataSchema.granularitySpec.queryGranularity. Supervisors commonly use HOUR 
segment granularity or non-none query granularity, so the generated batch SQL 
can create different segment layout and different rollup grouping from the 
streaming supervisor.



##########
web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.tsx:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Button, Classes, Dialog, FormGroup, Intent, Radio, RadioGroup, 
TextArea, InputGroup } from '@blueprintjs/core';
+import { IconNames } from '@blueprintjs/icons';
+import React, { useState } from 'react';
+
+import { ExternalLink } from '../../components';
+import { convertSupervisorToSql } from '../../helpers/supervisor-conversion';
+import { AppToaster } from '../../singletons';
+import { Api } from '../../singletons';
+import { deepGet } from '../../utils';
+
+import './supervisor-to-sql-dialog.scss';
+
+interface SupervisorSpec {
+  type: string;
+  spec: {
+    dataSchema: {
+      dataSource: string;
+      timestampSpec: {
+        column: string;
+        format: string;
+      };
+      dimensionsSpec: {
+        dimensions: Array<string | { name: string; type: string }>;
+      };
+      metricsSpec: Array<{ name?: string; fieldName?: string; type: string }>;
+    };
+    ioConfig?: {
+      topic?: string;
+      inputSource?: {
+        type: string;
+        uris?: string[];
+        baseDir?: string;
+      };
+    };
+  };
+}
+
+export interface SupervisorToSqlDialogProps {
+  onConvert(sql: string): void;
+  onClose(): void;
+}
+
+export const SupervisorToSqlDialog = React.memo(function SupervisorToSqlDialog(
+  props: SupervisorToSqlDialogProps,
+) {
+  const { onConvert, onClose } = props;
+
+  const [supervisorSource, setSupervisorSource] = useState<'select' | 
'paste'>('select');
+  const [selectedSupervisor, setSelectedSupervisor] = useState<string>('');
+  const [pastedSupervisor, setPastedSupervisor] = useState<string>('');
+  const [availableSupervisors, setAvailableSupervisors] = 
useState<string[]>([]);
+  const [supervisorSpec, setSupervisorSpec] = useState<SupervisorSpec | 
undefined>();
+  
+  const [fileLocation, setFileLocation] = useState<string>('');
+  const [fileType, setFileType] = useState<string>('json');
+  
+  const [loading, setLoading] = useState(false);
+  const [error, setError] = useState<string | undefined>();
+
+  React.useEffect(() => {
+    void loadSupervisors();
+  }, []);
+
+  async function loadSupervisors() {
+    try {
+      const supervisors = await 
Api.instance.get<string[]>('/druid/indexer/v1/supervisor');
+      setAvailableSupervisors(supervisors.data);
+      if (supervisors.data.length > 0) {
+        setSelectedSupervisor(supervisors.data[0]);
+      }
+    } catch (e) {
+      setError(`Failed to load supervisors: ${e.message}`);
+    }
+  }
+
+  async function loadSupervisorSpec(supervisorId: string) {
+    if (!supervisorId) return;
+    
+    setLoading(true);
+    setError(undefined);
+    
+    try {
+      const resp = await Api.instance.get<SupervisorSpec>(
+        `/druid/indexer/v1/supervisor/${Api.encodePath(supervisorId)}`,
+      );
+      setSupervisorSpec(resp.data);
+      
+      // Auto-populate file location from ioConfig if available
+      const ioConfig = deepGet(resp.data, 'spec.ioConfig');
+      if (ioConfig?.inputSource?.uris) {
+        setFileLocation(ioConfig.inputSource.uris[0] || '');
+      } else if (ioConfig?.inputSource?.baseDir) {
+        setFileLocation(ioConfig.inputSource.baseDir);
+      }
+    } catch (e) {
+      setError(`Failed to load supervisor spec: ${e.message}`);
+    } finally {
+      setLoading(false);
+    }
+  }
+
+  function parsePastedSupervisor() {
+    if (!pastedSupervisor.trim()) {

Review Comment:
   [P2] Paste mode can submit stale specs
   
   When the pasted JSON is empty, parsePastedSupervisor returns without 
clearing supervisorSpec. Switching from selected-supervisor mode to paste mode, 
or clearing a previously valid paste, can leave Generate SQL enabled and 
convert a hidden stale supervisor instead of the visible input.



##########
web-console/src/helpers/supervisor-conversion.ts:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { C, F, L, SqlExpression, SqlQuery } from 'druid-query-toolkit';
+
+interface MetricSpec {
+  type: string;
+  name?: string;
+  fieldName?: string;
+  maxStringBytes?: number;
+  size?: number;
+  lgK?: number;
+  tgtHllType?: string;
+  k?: number;
+}
+
+function extraArgs(...args: Array<[any, any]>): string {
+  const filtered = args.filter(([value, defaultValue]) => value !== undefined 
&& value !== defaultValue);
+  if (filtered.length === 0) return '';
+  return ', ' + filtered.map(([value]) => (typeof value === 'string' ? 
`'${value}'` : value)).join(', ');
+}
+
+function metricSpecToSqlExpression(metricSpec: MetricSpec): SqlExpression | 
undefined {
+  if (metricSpec.type === 'count') {
+    return SqlExpression.parse('COUNT(*)');
+  }
+
+  if (!metricSpec.fieldName) return undefined;
+  const column = C(metricSpec.fieldName);
+
+  switch (metricSpec.type) {
+    case 'longSum':
+    case 'floatSum':
+    case 'doubleSum':
+      return F('SUM', column);
+
+    case 'longMin':
+    case 'floatMin':
+    case 'doubleMin':
+      return F('MIN', column);
+
+    case 'longMax':
+    case 'floatMax':
+    case 'doubleMax':
+      return F('MAX', column);
+
+    case 'doubleFirst':
+    case 'floatFirst':
+    case 'longFirst':
+      return F('EARLIEST', column);
+
+    case 'stringFirst':
+      return F('EARLIEST', column, L(metricSpec.maxStringBytes || 128));
+
+    case 'doubleLast':
+    case 'floatLast':
+    case 'longLast':
+      return F('LATEST', column);
+
+    case 'stringLast':
+      return F('LATEST', column, L(metricSpec.maxStringBytes || 128));
+
+    case 'thetaSketch':
+      return 
SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_THETA(${column}${extraArgs([metricSpec.size,
 16384])})`);
+
+    case 'HLLSketchBuild':
+    case 'HLLSketchMerge':
+      return 
SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_HLL(${column}${extraArgs([metricSpec.lgK,
 12], [metricSpec.tgtHllType, 'HLL_4'])})`);
+
+    case 'quantilesDoublesSketch':
+      return 
SqlExpression.parse(`DS_QUANTILES_SKETCH(${column}${extraArgs([metricSpec.k, 
128])})`);
+
+    case 'hyperUnique':
+      return F('APPROX_COUNT_DISTINCT_BUILTIN', column);
+
+    default:
+      // Unsupported: tDigestSketch, momentSketch, fixedBucketsHistogram
+      return undefined;
+  }
+}
+
+export interface SupervisorSpec {
+  type: string;
+  spec: {
+    dataSchema: {
+      dataSource: string;
+      timestampSpec: {
+        column: string;
+        format: string;
+      };
+      dimensionsSpec?: {
+        dimensions: Array<string | { name: string; type: string }>;
+      };
+      metricsSpec?: Array<{ name?: string; fieldName?: string; type: string }>;
+    };
+    ioConfig?: {
+      topic?: string;
+      inputSource?: {
+        type: string;
+        uris?: string[];
+        baseDir?: string;
+      };
+    };
+  };
+}
+
+export interface SupervisorConversionOptions {
+  fileLocation: string;
+  fileType: string;
+}
+
+interface QueryWithContext {
+  queryString: string;
+  queryContext: Record<string, any>;
+}
+
+function extractDimensionName(dimension: string | { name: string; type: string 
}): string {
+  return typeof dimension === 'string' ? dimension : dimension.name;
+}
+
+export function convertSupervisorToSql(
+  supervisorSpec: SupervisorSpec,
+  options: SupervisorConversionOptions,
+): QueryWithContext {
+  const { fileLocation, fileType } = options;
+  const { dataSchema } = supervisorSpec.spec;
+
+  if (!dataSchema) {
+    throw new Error('Supervisor spec missing dataSchema');
+  }
+
+  const datasource = dataSchema.dataSource;
+  if (!datasource) {
+    throw new Error('Supervisor spec missing dataSource');
+  }
+
+  const timestampColumn = dataSchema.timestampSpec?.column || '__time';
+  const timestampFormat = dataSchema.timestampSpec?.format || 'auto';
+
+  // Extract dimensions
+  const dimensions = (dataSchema.dimensionsSpec?.dimensions || 
[]).map(extractDimensionName);
+
+  // Extract and convert metrics to SQL aggregations
+  const metricSpecs = dataSchema.metricsSpec || [];
+  const metricExpressions: Array<{ expr: SqlExpression; name: string }> = [];
+  
+  for (const metricSpec of metricSpecs) {
+    const sqlExpr = metricSpecToSqlExpression(metricSpec as MetricSpec);
+    if (sqlExpr && metricSpec.name) {
+      metricExpressions.push({ expr: sqlExpr, name: metricSpec.name });
+    }
+  }
+
+  // Determine if we need GROUP BY (if we have aggregations)
+  const hasAggregations = metricExpressions.length > 0;
+
+  // Build column list for SELECT
+  // If no aggregations, just select dimensions as-is
+  // If aggregations exist, dimensions become GROUP BY and we add aggregations
+  const selectColumns = dimensions;
+  
+  // All columns for EXTERN (includes timestamp + all raw input columns)
+  // For EXTERN, we need all the fieldNames that metrics reference
+  const metricFieldNames = metricSpecs
+    .map(m => m.fieldName)
+    .filter((name): name is string => !!name);
+  const allExternColumns = [timestampColumn, ...dimensions, 
...metricFieldNames];
+  const uniqueExternColumns = Array.from(new Set(allExternColumns));
+
+  // Create input format based on file type
+  const inputFormatType = fileType === 'json' ? 'json' : fileType === 'csv' ? 
'csv' : fileType;
+
+  // Build proper Druid input source
+  let inputSource: any;
+  if (fileLocation.startsWith('s3://')) {
+    inputSource = {
+      type: 's3',
+      uris: [fileLocation],
+    };
+    // Add objectGlob based on file type if it's a directory
+    if (fileLocation.endsWith('/')) {
+      inputSource.objectGlob = `**.${inputFormatType}`;
+    }
+  } else if (fileLocation.startsWith('gs://')) {
+    inputSource = {
+      type: 'google',
+      uris: [fileLocation],
+    };
+  } else if (fileLocation.startsWith('http://') || 
fileLocation.startsWith('https://')) {
+    inputSource = {
+      type: 'http',
+      uris: [fileLocation],
+    };
+  } else {
+    // Default to local for file:// or absolute paths
+    inputSource = {
+      type: 'local',
+      baseDir: fileLocation.replace('file://', ''),
+      filter: `*.${inputFormatType}`,
+    };
+  }
+
+  // Build EXTERN expression with proper escaping
+  const columnSchema = uniqueExternColumns.map(col => ({name: col, type: 
'string'}));
+  const columnSchemaJson = JSON.stringify(columnSchema);
+  const inputSourceJson = JSON.stringify(inputSource);
+  const inputFormatJson = JSON.stringify({type: inputFormatType});
+  
+  const externExpression = F(
+    'EXTERN',
+    SqlExpression.parse(`'${inputSourceJson.replace(/'/g, "''")}'`),
+    SqlExpression.parse(`'${inputFormatJson.replace(/'/g, "''")}'`),
+    SqlExpression.parse(`'${columnSchemaJson.replace(/'/g, "''")}'`),
+  );
+
+  // Build SELECT expressions
+  const selectExpressions: SqlExpression[] = selectColumns.map(col => C(col));
+
+  // Add metric aggregations
+  for (const { expr, name } of metricExpressions) {
+    selectExpressions.push(expr.as(name));
+  }
+
+  // Add timestamp parsing
+  const timeParseExpression =
+    timestampFormat === 'auto'
+      ? F('TIME_PARSE', C(timestampColumn))
+      : F('TIME_PARSE', C(timestampColumn), 
SqlExpression.parse(`'${timestampFormat}'`));

Review Comment:
   [P2] Timestamp formats are not SQL-escaped
   
   Custom timestamp formats are interpolated directly into a quoted SQL 
literal. Valid Druid/Joda formats often contain single quotes, for example to 
quote a literal T, and those produce invalid SQL here. Build this argument with 
the query toolkit literal helper or otherwise escape single quotes.



##########
web-console/src/views/workbench-view/workbench-view.tsx:
##########
@@ -496,6 +507,21 @@ export class WorkbenchView extends 
React.PureComponent<WorkbenchViewProps, Workb
     );
   }
 
+  private renderSupervisorToSqlDialog() {
+    const { supervisorToSqlDialogOpen } = this.state;
+    if (!supervisorToSqlDialogOpen) return;
+
+    return (
+      <SupervisorToSqlDialog
+        onConvert={(sql: string) => {
+          this.handleQueryStringChange(sql);

Review Comment:
   [P2] Conversion overwrites the active tab
   
   Supervisor conversion writes the generated SQL into the current workbench 
tab, unlike the existing ingestion-spec conversion that opens a new tab. This 
can replace an unsaved query and also preserves the current tab's explicit 
engine/context, so an INSERT/EXTERN query can remain on sql-native or another 
unsuitable engine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to