FrankChen021 commented on code in PR #19547: URL: https://github.com/apache/druid/pull/19547#discussion_r3356164347
########## web-console/src/helpers/supervisor-conversion.ts: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { C, F, L, SqlExpression, SqlQuery } from 'druid-query-toolkit'; + +interface MetricSpec { + type: string; + name?: string; + fieldName?: string; + maxStringBytes?: number; + size?: number; + lgK?: number; + tgtHllType?: string; + k?: number; +} + +function extraArgs(...args: Array<[any, any]>): string { + const filtered = args.filter(([value, defaultValue]) => value !== undefined && value !== defaultValue); + if (filtered.length === 0) return ''; + return ', ' + filtered.map(([value]) => (typeof value === 'string' ? `'${value}'` : value)).join(', '); +} + +function metricSpecToSqlExpression(metricSpec: MetricSpec): SqlExpression | undefined { + if (metricSpec.type === 'count') { + return SqlExpression.parse('COUNT(*)'); + } + + if (!metricSpec.fieldName) return undefined; + const column = C(metricSpec.fieldName); + + switch (metricSpec.type) { + case 'longSum': + case 'floatSum': + case 'doubleSum': + return F('SUM', column); + + case 'longMin': + case 'floatMin': + case 'doubleMin': + return F('MIN', column); + + case 'longMax': + case 'floatMax': + case 'doubleMax': + return F('MAX', column); + + case 'doubleFirst': + case 'floatFirst': + case 'longFirst': + return F('EARLIEST', column); + + case 'stringFirst': + return F('EARLIEST', column, L(metricSpec.maxStringBytes || 128)); + + case 'doubleLast': + case 'floatLast': + case 'longLast': + return F('LATEST', column); + + case 'stringLast': + return F('LATEST', column, L(metricSpec.maxStringBytes || 128)); + + case 'thetaSketch': + return SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_THETA(${column}${extraArgs([metricSpec.size, 16384])})`); + + case 'HLLSketchBuild': + case 'HLLSketchMerge': + return SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_HLL(${column}${extraArgs([metricSpec.lgK, 12], [metricSpec.tgtHllType, 'HLL_4'])})`); + + case 'quantilesDoublesSketch': + return SqlExpression.parse(`DS_QUANTILES_SKETCH(${column}${extraArgs([metricSpec.k, 128])})`); + + case 'hyperUnique': + return F('APPROX_COUNT_DISTINCT_BUILTIN', column); + + default: + // Unsupported: tDigestSketch, momentSketch, fixedBucketsHistogram + return undefined; + } +} + +export interface SupervisorSpec { + type: string; + spec: { + dataSchema: { + dataSource: string; + timestampSpec: { + column: string; + format: string; + }; + dimensionsSpec?: { + dimensions: Array<string | { name: string; type: string }>; + }; + metricsSpec?: Array<{ name?: string; fieldName?: string; type: string }>; + }; + ioConfig?: { + topic?: string; + inputSource?: { + type: string; + uris?: string[]; + baseDir?: string; + }; + }; + }; +} + +export interface SupervisorConversionOptions { + fileLocation: string; + fileType: string; +} + +interface QueryWithContext { + queryString: string; + queryContext: Record<string, any>; +} + +function extractDimensionName(dimension: string | { name: string; type: string }): string { + return typeof dimension === 'string' ? dimension : dimension.name; +} + +export function convertSupervisorToSql( + supervisorSpec: SupervisorSpec, + options: SupervisorConversionOptions, +): QueryWithContext { + const { fileLocation, fileType } = options; + const { dataSchema } = supervisorSpec.spec; + + if (!dataSchema) { + throw new Error('Supervisor spec missing dataSchema'); + } + + const datasource = dataSchema.dataSource; + if (!datasource) { + throw new Error('Supervisor spec missing dataSource'); + } + + const timestampColumn = dataSchema.timestampSpec?.column || '__time'; + const timestampFormat = dataSchema.timestampSpec?.format || 'auto'; + + // Extract dimensions + const dimensions = (dataSchema.dimensionsSpec?.dimensions || []).map(extractDimensionName); + + // Extract and convert metrics to SQL aggregations + const metricSpecs = dataSchema.metricsSpec || []; + const metricExpressions: Array<{ expr: SqlExpression; name: string }> = []; + + for (const metricSpec of metricSpecs) { + const sqlExpr = metricSpecToSqlExpression(metricSpec as MetricSpec); + if (sqlExpr && metricSpec.name) { + metricExpressions.push({ expr: sqlExpr, name: metricSpec.name }); + } + } + + // Determine if we need GROUP BY (if we have aggregations) + const hasAggregations = metricExpressions.length > 0; + + // Build column list for SELECT + // If no aggregations, just select dimensions as-is + // If aggregations exist, dimensions become GROUP BY and we add aggregations + const selectColumns = dimensions; + + // All columns for EXTERN (includes timestamp + all raw input columns) + // For EXTERN, we need all the fieldNames that metrics reference + const metricFieldNames = metricSpecs + .map(m => m.fieldName) + .filter((name): name is string => !!name); + const allExternColumns = [timestampColumn, ...dimensions, ...metricFieldNames]; + const uniqueExternColumns = Array.from(new Set(allExternColumns)); + + // Create input format based on file type + const inputFormatType = fileType === 'json' ? 'json' : fileType === 'csv' ? 'csv' : fileType; + + // Build proper Druid input source + let inputSource: any; + if (fileLocation.startsWith('s3://')) { + inputSource = { + type: 's3', + uris: [fileLocation], + }; + // Add objectGlob based on file type if it's a directory + if (fileLocation.endsWith('/')) { + inputSource.objectGlob = `**.${inputFormatType}`; + } + } else if (fileLocation.startsWith('gs://')) { + inputSource = { + type: 'google', + uris: [fileLocation], + }; + } else if (fileLocation.startsWith('http://') || fileLocation.startsWith('https://')) { + inputSource = { + type: 'http', + uris: [fileLocation], + }; + } else { + // Default to local for file:// or absolute paths + inputSource = { + type: 'local', + baseDir: fileLocation.replace('file://', ''), + filter: `*.${inputFormatType}`, + }; + } + + // Build EXTERN expression with proper escaping + const columnSchema = uniqueExternColumns.map(col => ({name: col, type: 'string'})); Review Comment: [P1] EXTERN declares numeric inputs as strings The generated EXTERN signature marks every raw column as string, including metric fields used by SUM, MIN, MAX, sketches, and typed dimensions. A common longSum or doubleSum supervisor will therefore generate numeric aggregations over VARCHAR inputs, which can fail planning or ingest with wrong types. Preserve metric and dimension native types as the existing spec converter does. ########## web-console/src/helpers/supervisor-conversion.ts: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { C, F, L, SqlExpression, SqlQuery } from 'druid-query-toolkit'; + +interface MetricSpec { + type: string; + name?: string; + fieldName?: string; + maxStringBytes?: number; + size?: number; + lgK?: number; + tgtHllType?: string; + k?: number; +} + +function extraArgs(...args: Array<[any, any]>): string { + const filtered = args.filter(([value, defaultValue]) => value !== undefined && value !== defaultValue); + if (filtered.length === 0) return ''; + return ', ' + filtered.map(([value]) => (typeof value === 'string' ? `'${value}'` : value)).join(', '); +} + +function metricSpecToSqlExpression(metricSpec: MetricSpec): SqlExpression | undefined { + if (metricSpec.type === 'count') { + return SqlExpression.parse('COUNT(*)'); + } + + if (!metricSpec.fieldName) return undefined; + const column = C(metricSpec.fieldName); + + switch (metricSpec.type) { + case 'longSum': + case 'floatSum': + case 'doubleSum': + return F('SUM', column); + + case 'longMin': + case 'floatMin': + case 'doubleMin': + return F('MIN', column); + + case 'longMax': + case 'floatMax': + case 'doubleMax': + return F('MAX', column); + + case 'doubleFirst': + case 'floatFirst': + case 'longFirst': + return F('EARLIEST', column); + + case 'stringFirst': + return F('EARLIEST', column, L(metricSpec.maxStringBytes || 128)); + + case 'doubleLast': + case 'floatLast': + case 'longLast': + return F('LATEST', column); + + case 'stringLast': + return F('LATEST', column, L(metricSpec.maxStringBytes || 128)); + + case 'thetaSketch': + return SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_THETA(${column}${extraArgs([metricSpec.size, 16384])})`); + + case 'HLLSketchBuild': + case 'HLLSketchMerge': + return SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_HLL(${column}${extraArgs([metricSpec.lgK, 12], [metricSpec.tgtHllType, 'HLL_4'])})`); + + case 'quantilesDoublesSketch': + return SqlExpression.parse(`DS_QUANTILES_SKETCH(${column}${extraArgs([metricSpec.k, 128])})`); + + case 'hyperUnique': + return F('APPROX_COUNT_DISTINCT_BUILTIN', column); + + default: + // Unsupported: tDigestSketch, momentSketch, fixedBucketsHistogram + return undefined; + } +} + +export interface SupervisorSpec { + type: string; + spec: { + dataSchema: { + dataSource: string; + timestampSpec: { + column: string; + format: string; + }; + dimensionsSpec?: { + dimensions: Array<string | { name: string; type: string }>; + }; + metricsSpec?: Array<{ name?: string; fieldName?: string; type: string }>; + }; + ioConfig?: { + topic?: string; + inputSource?: { + type: string; + uris?: string[]; + baseDir?: string; + }; + }; + }; +} + +export interface SupervisorConversionOptions { + fileLocation: string; + fileType: string; +} + +interface QueryWithContext { + queryString: string; + queryContext: Record<string, any>; +} + +function extractDimensionName(dimension: string | { name: string; type: string }): string { + return typeof dimension === 'string' ? dimension : dimension.name; +} + +export function convertSupervisorToSql( + supervisorSpec: SupervisorSpec, + options: SupervisorConversionOptions, +): QueryWithContext { + const { fileLocation, fileType } = options; + const { dataSchema } = supervisorSpec.spec; + + if (!dataSchema) { + throw new Error('Supervisor spec missing dataSchema'); + } + + const datasource = dataSchema.dataSource; + if (!datasource) { + throw new Error('Supervisor spec missing dataSource'); + } + + const timestampColumn = dataSchema.timestampSpec?.column || '__time'; + const timestampFormat = dataSchema.timestampSpec?.format || 'auto'; + + // Extract dimensions + const dimensions = (dataSchema.dimensionsSpec?.dimensions || []).map(extractDimensionName); + + // Extract and convert metrics to SQL aggregations + const metricSpecs = dataSchema.metricsSpec || []; + const metricExpressions: Array<{ expr: SqlExpression; name: string }> = []; + + for (const metricSpec of metricSpecs) { + const sqlExpr = metricSpecToSqlExpression(metricSpec as MetricSpec); + if (sqlExpr && metricSpec.name) { + metricExpressions.push({ expr: sqlExpr, name: metricSpec.name }); + } + } + + // Determine if we need GROUP BY (if we have aggregations) + const hasAggregations = metricExpressions.length > 0; + + // Build column list for SELECT + // If no aggregations, just select dimensions as-is + // If aggregations exist, dimensions become GROUP BY and we add aggregations + const selectColumns = dimensions; + + // All columns for EXTERN (includes timestamp + all raw input columns) + // For EXTERN, we need all the fieldNames that metrics reference + const metricFieldNames = metricSpecs + .map(m => m.fieldName) + .filter((name): name is string => !!name); + const allExternColumns = [timestampColumn, ...dimensions, ...metricFieldNames]; + const uniqueExternColumns = Array.from(new Set(allExternColumns)); + + // Create input format based on file type + const inputFormatType = fileType === 'json' ? 'json' : fileType === 'csv' ? 'csv' : fileType; + + // Build proper Druid input source + let inputSource: any; + if (fileLocation.startsWith('s3://')) { + inputSource = { + type: 's3', + uris: [fileLocation], + }; + // Add objectGlob based on file type if it's a directory + if (fileLocation.endsWith('/')) { + inputSource.objectGlob = `**.${inputFormatType}`; + } + } else if (fileLocation.startsWith('gs://')) { + inputSource = { + type: 'google', + uris: [fileLocation], + }; + } else if (fileLocation.startsWith('http://') || fileLocation.startsWith('https://')) { + inputSource = { + type: 'http', + uris: [fileLocation], + }; + } else { + // Default to local for file:// or absolute paths + inputSource = { + type: 'local', + baseDir: fileLocation.replace('file://', ''), + filter: `*.${inputFormatType}`, + }; + } + + // Build EXTERN expression with proper escaping + const columnSchema = uniqueExternColumns.map(col => ({name: col, type: 'string'})); + const columnSchemaJson = JSON.stringify(columnSchema); + const inputSourceJson = JSON.stringify(inputSource); + const inputFormatJson = JSON.stringify({type: inputFormatType}); Review Comment: [P2] Input format details are discarded The input format is reduced to only {type: fileType}, dropping the supervisor's ioConfig.inputFormat settings such as CSV columns/header handling, JSON flattenSpec, Kafka metadata formats, Avro/Protobuf schemas, and other parser options. Those supervisors convert to SQL that reads different columns or cannot parse the backfill files. ########## web-console/src/helpers/supervisor-conversion.ts: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { C, F, L, SqlExpression, SqlQuery } from 'druid-query-toolkit'; + +interface MetricSpec { + type: string; + name?: string; + fieldName?: string; + maxStringBytes?: number; + size?: number; + lgK?: number; + tgtHllType?: string; + k?: number; +} + +function extraArgs(...args: Array<[any, any]>): string { + const filtered = args.filter(([value, defaultValue]) => value !== undefined && value !== defaultValue); + if (filtered.length === 0) return ''; + return ', ' + filtered.map(([value]) => (typeof value === 'string' ? `'${value}'` : value)).join(', '); +} + +function metricSpecToSqlExpression(metricSpec: MetricSpec): SqlExpression | undefined { + if (metricSpec.type === 'count') { + return SqlExpression.parse('COUNT(*)'); + } + + if (!metricSpec.fieldName) return undefined; + const column = C(metricSpec.fieldName); + + switch (metricSpec.type) { + case 'longSum': + case 'floatSum': + case 'doubleSum': + return F('SUM', column); + + case 'longMin': + case 'floatMin': + case 'doubleMin': + return F('MIN', column); + + case 'longMax': + case 'floatMax': + case 'doubleMax': + return F('MAX', column); + + case 'doubleFirst': + case 'floatFirst': + case 'longFirst': + return F('EARLIEST', column); + + case 'stringFirst': + return F('EARLIEST', column, L(metricSpec.maxStringBytes || 128)); + + case 'doubleLast': + case 'floatLast': + case 'longLast': + return F('LATEST', column); + + case 'stringLast': + return F('LATEST', column, L(metricSpec.maxStringBytes || 128)); + + case 'thetaSketch': + return SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_THETA(${column}${extraArgs([metricSpec.size, 16384])})`); + + case 'HLLSketchBuild': + case 'HLLSketchMerge': + return SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_HLL(${column}${extraArgs([metricSpec.lgK, 12], [metricSpec.tgtHllType, 'HLL_4'])})`); + + case 'quantilesDoublesSketch': + return SqlExpression.parse(`DS_QUANTILES_SKETCH(${column}${extraArgs([metricSpec.k, 128])})`); + + case 'hyperUnique': + return F('APPROX_COUNT_DISTINCT_BUILTIN', column); + + default: + // Unsupported: tDigestSketch, momentSketch, fixedBucketsHistogram + return undefined; + } +} + +export interface SupervisorSpec { + type: string; + spec: { + dataSchema: { + dataSource: string; + timestampSpec: { + column: string; + format: string; + }; + dimensionsSpec?: { + dimensions: Array<string | { name: string; type: string }>; + }; + metricsSpec?: Array<{ name?: string; fieldName?: string; type: string }>; + }; + ioConfig?: { + topic?: string; + inputSource?: { + type: string; + uris?: string[]; + baseDir?: string; + }; + }; + }; +} + +export interface SupervisorConversionOptions { + fileLocation: string; + fileType: string; +} + +interface QueryWithContext { + queryString: string; + queryContext: Record<string, any>; +} + +function extractDimensionName(dimension: string | { name: string; type: string }): string { + return typeof dimension === 'string' ? dimension : dimension.name; +} + +export function convertSupervisorToSql( + supervisorSpec: SupervisorSpec, + options: SupervisorConversionOptions, +): QueryWithContext { + const { fileLocation, fileType } = options; + const { dataSchema } = supervisorSpec.spec; + + if (!dataSchema) { + throw new Error('Supervisor spec missing dataSchema'); + } + + const datasource = dataSchema.dataSource; + if (!datasource) { + throw new Error('Supervisor spec missing dataSource'); + } + + const timestampColumn = dataSchema.timestampSpec?.column || '__time'; + const timestampFormat = dataSchema.timestampSpec?.format || 'auto'; + + // Extract dimensions + const dimensions = (dataSchema.dimensionsSpec?.dimensions || []).map(extractDimensionName); + + // Extract and convert metrics to SQL aggregations + const metricSpecs = dataSchema.metricsSpec || []; + const metricExpressions: Array<{ expr: SqlExpression; name: string }> = []; + + for (const metricSpec of metricSpecs) { + const sqlExpr = metricSpecToSqlExpression(metricSpec as MetricSpec); + if (sqlExpr && metricSpec.name) { + metricExpressions.push({ expr: sqlExpr, name: metricSpec.name }); + } + } + + // Determine if we need GROUP BY (if we have aggregations) + const hasAggregations = metricExpressions.length > 0; + + // Build column list for SELECT + // If no aggregations, just select dimensions as-is + // If aggregations exist, dimensions become GROUP BY and we add aggregations + const selectColumns = dimensions; + + // All columns for EXTERN (includes timestamp + all raw input columns) + // For EXTERN, we need all the fieldNames that metrics reference + const metricFieldNames = metricSpecs + .map(m => m.fieldName) + .filter((name): name is string => !!name); + const allExternColumns = [timestampColumn, ...dimensions, ...metricFieldNames]; + const uniqueExternColumns = Array.from(new Set(allExternColumns)); + + // Create input format based on file type + const inputFormatType = fileType === 'json' ? 'json' : fileType === 'csv' ? 'csv' : fileType; + + // Build proper Druid input source + let inputSource: any; + if (fileLocation.startsWith('s3://')) { + inputSource = { + type: 's3', + uris: [fileLocation], + }; + // Add objectGlob based on file type if it's a directory + if (fileLocation.endsWith('/')) { + inputSource.objectGlob = `**.${inputFormatType}`; + } + } else if (fileLocation.startsWith('gs://')) { + inputSource = { + type: 'google', + uris: [fileLocation], + }; + } else if (fileLocation.startsWith('http://') || fileLocation.startsWith('https://')) { + inputSource = { + type: 'http', + uris: [fileLocation], + }; + } else { + // Default to local for file:// or absolute paths + inputSource = { + type: 'local', + baseDir: fileLocation.replace('file://', ''), + filter: `*.${inputFormatType}`, + }; + } + + // Build EXTERN expression with proper escaping + const columnSchema = uniqueExternColumns.map(col => ({name: col, type: 'string'})); + const columnSchemaJson = JSON.stringify(columnSchema); + const inputSourceJson = JSON.stringify(inputSource); + const inputFormatJson = JSON.stringify({type: inputFormatType}); + + const externExpression = F( + 'EXTERN', + SqlExpression.parse(`'${inputSourceJson.replace(/'/g, "''")}'`), + SqlExpression.parse(`'${inputFormatJson.replace(/'/g, "''")}'`), + SqlExpression.parse(`'${columnSchemaJson.replace(/'/g, "''")}'`), + ); + + // Build SELECT expressions + const selectExpressions: SqlExpression[] = selectColumns.map(col => C(col)); + + // Add metric aggregations + for (const { expr, name } of metricExpressions) { + selectExpressions.push(expr.as(name)); + } + + // Add timestamp parsing + const timeParseExpression = + timestampFormat === 'auto' + ? F('TIME_PARSE', C(timestampColumn)) + : F('TIME_PARSE', C(timestampColumn), SqlExpression.parse(`'${timestampFormat}'`)); + + selectExpressions.push(timeParseExpression.as('__time')); + + // Build the query using druid-query-toolkit + let query = SqlQuery.from(F('TABLE', externExpression)); + + // Add select expressions one by one + for (const expr of selectExpressions) { + query = query.addSelect(expr); + } + + // Add GROUP BY if we have aggregations + if (hasAggregations) { + const groupByExprs: any[] = [timeParseExpression]; + dimensions.forEach(d => groupByExprs.push(C(d))); + query = query.changeGroupByExpressions(groupByExprs as any); + } + + // Convert to string and manually add INSERT, PARTITIONED BY, and CLUSTERED BY + // because the query builder API is giving us trouble + let sqlString = query.toString(); + + // Prepend INSERT INTO + sqlString = `INSERT INTO ${C(datasource)}\n${sqlString}`; + + // Append PARTITIONED BY + sqlString += `\nPARTITIONED BY DAY`; Review Comment: [P1] Supervisor granularity is ignored The conversion always appends PARTITIONED BY DAY and never applies dataSchema.granularitySpec.queryGranularity. Supervisors commonly use HOUR segment granularity or non-none query granularity, so the generated batch SQL can create different segment layout and different rollup grouping from the streaming supervisor. ########## web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.tsx: ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Button, Classes, Dialog, FormGroup, Intent, Radio, RadioGroup, TextArea, InputGroup } from '@blueprintjs/core'; +import { IconNames } from '@blueprintjs/icons'; +import React, { useState } from 'react'; + +import { ExternalLink } from '../../components'; +import { convertSupervisorToSql } from '../../helpers/supervisor-conversion'; +import { AppToaster } from '../../singletons'; +import { Api } from '../../singletons'; +import { deepGet } from '../../utils'; + +import './supervisor-to-sql-dialog.scss'; + +interface SupervisorSpec { + type: string; + spec: { + dataSchema: { + dataSource: string; + timestampSpec: { + column: string; + format: string; + }; + dimensionsSpec: { + dimensions: Array<string | { name: string; type: string }>; + }; + metricsSpec: Array<{ name?: string; fieldName?: string; type: string }>; + }; + ioConfig?: { + topic?: string; + inputSource?: { + type: string; + uris?: string[]; + baseDir?: string; + }; + }; + }; +} + +export interface SupervisorToSqlDialogProps { + onConvert(sql: string): void; + onClose(): void; +} + +export const SupervisorToSqlDialog = React.memo(function SupervisorToSqlDialog( + props: SupervisorToSqlDialogProps, +) { + const { onConvert, onClose } = props; + + const [supervisorSource, setSupervisorSource] = useState<'select' | 'paste'>('select'); + const [selectedSupervisor, setSelectedSupervisor] = useState<string>(''); + const [pastedSupervisor, setPastedSupervisor] = useState<string>(''); + const [availableSupervisors, setAvailableSupervisors] = useState<string[]>([]); + const [supervisorSpec, setSupervisorSpec] = useState<SupervisorSpec | undefined>(); + + const [fileLocation, setFileLocation] = useState<string>(''); + const [fileType, setFileType] = useState<string>('json'); + + const [loading, setLoading] = useState(false); + const [error, setError] = useState<string | undefined>(); + + React.useEffect(() => { + void loadSupervisors(); + }, []); + + async function loadSupervisors() { + try { + const supervisors = await Api.instance.get<string[]>('/druid/indexer/v1/supervisor'); + setAvailableSupervisors(supervisors.data); + if (supervisors.data.length > 0) { + setSelectedSupervisor(supervisors.data[0]); + } + } catch (e) { + setError(`Failed to load supervisors: ${e.message}`); + } + } + + async function loadSupervisorSpec(supervisorId: string) { + if (!supervisorId) return; + + setLoading(true); + setError(undefined); + + try { + const resp = await Api.instance.get<SupervisorSpec>( + `/druid/indexer/v1/supervisor/${Api.encodePath(supervisorId)}`, + ); + setSupervisorSpec(resp.data); + + // Auto-populate file location from ioConfig if available + const ioConfig = deepGet(resp.data, 'spec.ioConfig'); + if (ioConfig?.inputSource?.uris) { + setFileLocation(ioConfig.inputSource.uris[0] || ''); + } else if (ioConfig?.inputSource?.baseDir) { + setFileLocation(ioConfig.inputSource.baseDir); + } + } catch (e) { + setError(`Failed to load supervisor spec: ${e.message}`); + } finally { + setLoading(false); + } + } + + function parsePastedSupervisor() { + if (!pastedSupervisor.trim()) { Review Comment: [P2] Paste mode can submit stale specs When the pasted JSON is empty, parsePastedSupervisor returns without clearing supervisorSpec. Switching from selected-supervisor mode to paste mode, or clearing a previously valid paste, can leave Generate SQL enabled and convert a hidden stale supervisor instead of the visible input. ########## web-console/src/helpers/supervisor-conversion.ts: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { C, F, L, SqlExpression, SqlQuery } from 'druid-query-toolkit'; + +interface MetricSpec { + type: string; + name?: string; + fieldName?: string; + maxStringBytes?: number; + size?: number; + lgK?: number; + tgtHllType?: string; + k?: number; +} + +function extraArgs(...args: Array<[any, any]>): string { + const filtered = args.filter(([value, defaultValue]) => value !== undefined && value !== defaultValue); + if (filtered.length === 0) return ''; + return ', ' + filtered.map(([value]) => (typeof value === 'string' ? `'${value}'` : value)).join(', '); +} + +function metricSpecToSqlExpression(metricSpec: MetricSpec): SqlExpression | undefined { + if (metricSpec.type === 'count') { + return SqlExpression.parse('COUNT(*)'); + } + + if (!metricSpec.fieldName) return undefined; + const column = C(metricSpec.fieldName); + + switch (metricSpec.type) { + case 'longSum': + case 'floatSum': + case 'doubleSum': + return F('SUM', column); + + case 'longMin': + case 'floatMin': + case 'doubleMin': + return F('MIN', column); + + case 'longMax': + case 'floatMax': + case 'doubleMax': + return F('MAX', column); + + case 'doubleFirst': + case 'floatFirst': + case 'longFirst': + return F('EARLIEST', column); + + case 'stringFirst': + return F('EARLIEST', column, L(metricSpec.maxStringBytes || 128)); + + case 'doubleLast': + case 'floatLast': + case 'longLast': + return F('LATEST', column); + + case 'stringLast': + return F('LATEST', column, L(metricSpec.maxStringBytes || 128)); + + case 'thetaSketch': + return SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_THETA(${column}${extraArgs([metricSpec.size, 16384])})`); + + case 'HLLSketchBuild': + case 'HLLSketchMerge': + return SqlExpression.parse(`APPROX_COUNT_DISTINCT_DS_HLL(${column}${extraArgs([metricSpec.lgK, 12], [metricSpec.tgtHllType, 'HLL_4'])})`); + + case 'quantilesDoublesSketch': + return SqlExpression.parse(`DS_QUANTILES_SKETCH(${column}${extraArgs([metricSpec.k, 128])})`); + + case 'hyperUnique': + return F('APPROX_COUNT_DISTINCT_BUILTIN', column); + + default: + // Unsupported: tDigestSketch, momentSketch, fixedBucketsHistogram + return undefined; + } +} + +export interface SupervisorSpec { + type: string; + spec: { + dataSchema: { + dataSource: string; + timestampSpec: { + column: string; + format: string; + }; + dimensionsSpec?: { + dimensions: Array<string | { name: string; type: string }>; + }; + metricsSpec?: Array<{ name?: string; fieldName?: string; type: string }>; + }; + ioConfig?: { + topic?: string; + inputSource?: { + type: string; + uris?: string[]; + baseDir?: string; + }; + }; + }; +} + +export interface SupervisorConversionOptions { + fileLocation: string; + fileType: string; +} + +interface QueryWithContext { + queryString: string; + queryContext: Record<string, any>; +} + +function extractDimensionName(dimension: string | { name: string; type: string }): string { + return typeof dimension === 'string' ? dimension : dimension.name; +} + +export function convertSupervisorToSql( + supervisorSpec: SupervisorSpec, + options: SupervisorConversionOptions, +): QueryWithContext { + const { fileLocation, fileType } = options; + const { dataSchema } = supervisorSpec.spec; + + if (!dataSchema) { + throw new Error('Supervisor spec missing dataSchema'); + } + + const datasource = dataSchema.dataSource; + if (!datasource) { + throw new Error('Supervisor spec missing dataSource'); + } + + const timestampColumn = dataSchema.timestampSpec?.column || '__time'; + const timestampFormat = dataSchema.timestampSpec?.format || 'auto'; + + // Extract dimensions + const dimensions = (dataSchema.dimensionsSpec?.dimensions || []).map(extractDimensionName); + + // Extract and convert metrics to SQL aggregations + const metricSpecs = dataSchema.metricsSpec || []; + const metricExpressions: Array<{ expr: SqlExpression; name: string }> = []; + + for (const metricSpec of metricSpecs) { + const sqlExpr = metricSpecToSqlExpression(metricSpec as MetricSpec); + if (sqlExpr && metricSpec.name) { + metricExpressions.push({ expr: sqlExpr, name: metricSpec.name }); + } + } + + // Determine if we need GROUP BY (if we have aggregations) + const hasAggregations = metricExpressions.length > 0; + + // Build column list for SELECT + // If no aggregations, just select dimensions as-is + // If aggregations exist, dimensions become GROUP BY and we add aggregations + const selectColumns = dimensions; + + // All columns for EXTERN (includes timestamp + all raw input columns) + // For EXTERN, we need all the fieldNames that metrics reference + const metricFieldNames = metricSpecs + .map(m => m.fieldName) + .filter((name): name is string => !!name); + const allExternColumns = [timestampColumn, ...dimensions, ...metricFieldNames]; + const uniqueExternColumns = Array.from(new Set(allExternColumns)); + + // Create input format based on file type + const inputFormatType = fileType === 'json' ? 'json' : fileType === 'csv' ? 'csv' : fileType; + + // Build proper Druid input source + let inputSource: any; + if (fileLocation.startsWith('s3://')) { + inputSource = { + type: 's3', + uris: [fileLocation], + }; + // Add objectGlob based on file type if it's a directory + if (fileLocation.endsWith('/')) { + inputSource.objectGlob = `**.${inputFormatType}`; + } + } else if (fileLocation.startsWith('gs://')) { + inputSource = { + type: 'google', + uris: [fileLocation], + }; + } else if (fileLocation.startsWith('http://') || fileLocation.startsWith('https://')) { + inputSource = { + type: 'http', + uris: [fileLocation], + }; + } else { + // Default to local for file:// or absolute paths + inputSource = { + type: 'local', + baseDir: fileLocation.replace('file://', ''), + filter: `*.${inputFormatType}`, + }; + } + + // Build EXTERN expression with proper escaping + const columnSchema = uniqueExternColumns.map(col => ({name: col, type: 'string'})); + const columnSchemaJson = JSON.stringify(columnSchema); + const inputSourceJson = JSON.stringify(inputSource); + const inputFormatJson = JSON.stringify({type: inputFormatType}); + + const externExpression = F( + 'EXTERN', + SqlExpression.parse(`'${inputSourceJson.replace(/'/g, "''")}'`), + SqlExpression.parse(`'${inputFormatJson.replace(/'/g, "''")}'`), + SqlExpression.parse(`'${columnSchemaJson.replace(/'/g, "''")}'`), + ); + + // Build SELECT expressions + const selectExpressions: SqlExpression[] = selectColumns.map(col => C(col)); + + // Add metric aggregations + for (const { expr, name } of metricExpressions) { + selectExpressions.push(expr.as(name)); + } + + // Add timestamp parsing + const timeParseExpression = + timestampFormat === 'auto' + ? F('TIME_PARSE', C(timestampColumn)) + : F('TIME_PARSE', C(timestampColumn), SqlExpression.parse(`'${timestampFormat}'`)); Review Comment: [P2] Timestamp formats are not SQL-escaped Custom timestamp formats are interpolated directly into a quoted SQL literal. Valid Druid/Joda formats often contain single quotes, for example to quote a literal T, and those produce invalid SQL here. Build this argument with the query toolkit literal helper or otherwise escape single quotes. ########## web-console/src/views/workbench-view/workbench-view.tsx: ########## @@ -496,6 +507,21 @@ export class WorkbenchView extends React.PureComponent<WorkbenchViewProps, Workb ); } + private renderSupervisorToSqlDialog() { + const { supervisorToSqlDialogOpen } = this.state; + if (!supervisorToSqlDialogOpen) return; + + return ( + <SupervisorToSqlDialog + onConvert={(sql: string) => { + this.handleQueryStringChange(sql); Review Comment: [P2] Conversion overwrites the active tab Supervisor conversion writes the generated SQL into the current workbench tab, unlike the existing ingestion-spec conversion that opens a new tab. This can replace an unsaved query and also preserves the current tab's explicit engine/context, so an INSERT/EXTERN query can remain on sql-native or another unsuitable engine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
