This is an automated email from the ASF dual-hosted git repository.
yqm pushed a commit to branch 37.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/37.0.0 by this push:
new 89c01efce0e docs: Recommend Overlord-based auto-compaction and mark
useIncrementalCache production ready (#19252)
89c01efce0e is described below
commit 89c01efce0e81b70f00bd879659062dd8604559c
Author: Cece Mei <[email protected]>
AuthorDate: Fri Apr 10 11:19:12 2026 -0700
docs: Recommend Overlord-based auto-compaction and mark useIncrementalCache
production ready (#19252)
* auto
* DruidRunStats
* doc
* check
* default
* legacy
* default-native
* format
* Revert "DruidRunStats"
This reverts commit d3751364cce2da5c384400d0e4209797bfc2f09e.
* format
* cascading
* doc
* link
* spell
* test
* style
---
docs/configuration/index.md | 10 +-
docs/data-management/automatic-compaction.md | 299 ++++++++++-----------
docs/data-management/cascading-reindexing.md | 2 -
docs/data-management/delete.md | 2 +-
docs/operations/metrics.md | 2 +-
.../embedded/compact/AutoCompactionTest.java | 114 ++++----
.../compact/AutoCompactionUpgradeTest.java | 16 +-
.../compact/CompactionResourceTestClient.java | 2 +
.../embedded/server/OverlordClientTest.java | 2 +-
.../catalog/compact/CatalogCompactionTest.java | 4 +-
.../druid/k8s/overlord/common/K8sTaskIdTest.java | 12 +-
.../common/KubernetesOverlordUtilsTest.java | 4 +-
.../metadata/SegmentsMetadataManagerConfig.java | 2 +-
.../coordinator/ClusterCompactionConfig.java | 74 ++++-
.../server/coordinator/DruidCompactionConfig.java | 8 +
.../server/coordinator/duty/CompactSegments.java | 2 +-
.../coordinator/ClusterCompactionConfigTest.java | 139 ++++++++++
.../coordinator/DruidCompactionConfigTest.java | 8 +
.../CoordinatorCompactionConfigsResourceTest.java | 2 +-
19 files changed, 457 insertions(+), 247 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 15d05d00b9c..172d8062a5e 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -804,7 +804,7 @@ These Coordinator static configurations can be defined in
the `coordinator/runti
|--------|-----------|-------|
|`druid.manager.config.pollDuration`|How often the manager polls the config
table for updates.|`PT1M`|
|`druid.manager.segments.pollDuration`|The duration between polls the
Coordinator does for updates to the set of active segments. Generally defines
the amount of lag time it can take for the Coordinator to notice new
segments.|`PT1M`|
-|`druid.manager.segments.useIncrementalCache`|(Experimental) Denotes the usage
mode of the segment metadata incremental cache. This cache provides a
performance improvement over the polling mechanism currently employed by the
Coordinator as it retrieves payloads of only updated segments. Possible cache
modes are: (a) `never`: Incremental cache is disabled. (b) `always`:
Incremental cache is enabled. Service start-up will be blocked until cache has
synced with the metadata store at least [...]
+|`druid.manager.segments.useIncrementalCache`|Denotes the usage mode of the
segment metadata incremental cache. This cache provides a performance
improvement over the polling mechanism currently employed by the Coordinator as
it retrieves payloads of only updated segments. Possible cache modes are: (a)
`never`: Incremental cache is disabled. (b) `always`: Incremental cache is
enabled. Service start-up will be blocked until cache has synced with the
metadata store at least once. (c) `ifSy [...]
|`druid.manager.rules.pollDuration`|The duration between polls the Coordinator
does for updates to the set of active rules. Generally defines the amount of
lag time it can take for the Coordinator to notice rules.|`PT1M`|
|`druid.manager.rules.defaultRule`|The default rule for the cluster|`_default`|
|`druid.manager.rules.alertThreshold`|The duration after a failed poll upon
which an alert should be emitted.|`PT10M`|
@@ -1053,14 +1053,14 @@ If autoscaling is enabled, you can set these additional
configs:
The `druid.supervisor.idleConfig.*` specification in the Overlord runtime
properties defines the default behavior for the entire cluster. See [Idle
Configuration in Kafka Supervisor
IOConfig](../ingestion/kinesis-ingestion.md#io-configuration) to override it
for an individual supervisor.
-##### Segment metadata cache (Experimental)
+##### Segment metadata cache
The following properties pertain to segment metadata caching on the Overlord
that may be used to speed up segment allocation and other metadata operations.
|Property|Description|Default|
|--------|-----------|-------|
-|`druid.manager.segments.useIncrementalCache`|Denotes the usage mode of the
segment metadata incremental cache. Possible modes are: (a) `never`: Cache is
disabled. (b) `always`: Reads are always done from the cache. Service start-up
will be blocked until cache has synced with the metadata store at least once.
Transactions will block until cache has synced with the metadata store at least
once after becoming leader. (c) `ifSynced`: Reads are done from the cache only
if it has already sync [...]
-|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between
successive syncs of the cache with the metadata store. This property is used
only when `druid.manager.segments.useIncrementalCache` is set to `always` or
`ifSynced`.|`PT1M` (1 minute)|
+|`druid.manager.segments.useIncrementalCache`|Denotes the usage mode of the
segment metadata incremental cache. Possible modes are: (a) `never`: Cache is
disabled. (b) `always`: Reads are always done from the cache. Service start-up
will be blocked until cache has synced with the metadata store at least once.
Transactions will block until cache has synced with the metadata store at least
once after becoming leader. (c) `ifSynced`: Reads are done from the cache only
if it has already sync [...]
+|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between
successive syncs of the cache with the metadata store. This property is used
only when incremental cache usage mode is to `always` or `ifSynced`.|`PT1M` (1
minute)|
##### Auto-kill unused segments (Experimental)
@@ -1069,7 +1069,7 @@ None of the configs that apply to [auto-kill performed by
the Coordinator](../da
|Property|Description|Default|
|--------|-----------|-------|
-|`druid.manager.segments.killUnused.enabled`|Boolean flag to enable auto-kill
of eligible unused segments on the Overlord. This feature can be used only when
[segment metadata caching](#segment-metadata-cache-experimental) is enabled on
the Overlord and MUST NOT be enabled if `druid.coordinator.kill.on` is already
set to `true` on the Coordinator.|`true`|
+|`druid.manager.segments.killUnused.enabled`|Boolean flag to enable auto-kill
of eligible unused segments on the Overlord. This feature can be used only when
[segment metadata caching](#segment-metadata-cache) is enabled on the Overlord
and MUST NOT be enabled if `druid.coordinator.kill.on` is already set to `true`
on the Coordinator.|`true`|
|`druid.manager.segments.killUnused.bufferPeriod`|Period after which a segment
marked as unused becomes eligible for auto-kill on the Overlord. This config is
effective only if `druid.manager.segments.killUnused.enabled` is set to
`true`.|`P30D` (30 days)|
#### Overlord dynamic configuration
diff --git a/docs/data-management/automatic-compaction.md
b/docs/data-management/automatic-compaction.md
index 3f144e05a95..e455319d924 100644
--- a/docs/data-management/automatic-compaction.md
+++ b/docs/data-management/automatic-compaction.md
@@ -28,6 +28,11 @@ In Apache Druid, compaction is a special type of ingestion
task that reads data
Auto-compaction skips datasources that have a segment granularity of `ALL`.
:::
+There are two ways to run automatic compaction in Druid:
+
+* **Compaction supervisors on the Overlord** (recommended): Provides better
reactivity, support for the MSQ task engine, and easier management through the
supervisor framework. See [Auto-compaction using compaction
supervisors](#auto-compaction-using-compaction-supervisors).
+* **Coordinator duties**: An alternative approach that runs compaction as a
Coordinator duty. See [Auto-compaction using Coordinator
duties](#auto-compaction-using-coordinator-duties).
+
As a best practice, you should set up auto-compaction for all Druid
datasources. You can run compaction tasks manually for cases where you want to
allocate more system resources. For example, you may choose to run multiple
compaction tasks in parallel to compact an existing datasource for the first
time. See [Compaction](compaction.md) for additional details and use cases.
This topic guides you through setting up automatic compaction for your Druid
cluster. See the [examples](#examples) for common use cases for automatic
compaction.
@@ -52,14 +57,6 @@ The automatic compaction system uses the following syntax:
}
```
-:::info
-
-The MSQ task engine is available as a compaction engine when you run automatic
compaction as a compaction supervisor. For more information, see
[Auto-compaction using compaction
supervisors](#auto-compaction-using-compaction-supervisors).
-
-:::
-
-For automatic compaction using Coordinator duties, you submit the spec to the
[Compaction config UI](#manage-auto-compaction-using-the-web-console) or the
[Compaction configuration API](#manage-auto-compaction-using-coordinator-apis).
-
Most fields in the auto-compaction configuration correlate to a typical [Druid
ingestion spec](../ingestion/ingestion-spec.md).
The following properties only apply to auto-compaction:
* `skipOffsetFromLatest`
@@ -68,9 +65,9 @@ The following properties only apply to auto-compaction:
Since the automatic compaction system provides a management layer on top of
manual compaction tasks,
the auto-compaction configuration does not include task-specific properties
found in a typical Druid ingestion spec.
-The following properties are automatically set by the Coordinator:
+The following properties are automatically set by the auto-compaction system:
* `type`: Set to `compact`.
-* `id`: Generated using the task type, datasource name, interval, and
timestamp. The task ID is prefixed with `coordinator-issued`.
+* `id`: Generated using the task type, datasource name, interval, and
timestamp. The task ID is prefixed with `auto`.
* `context`: Set according to the user-provided `taskContext`.
Compaction tasks typically fetch all [relevant
segments](manual-compaction.md#compaction-io-configuration) prior to launching
any subtasks,
@@ -83,9 +80,136 @@ maximize performance and minimize disk usage of the
`compact` tasks launched by
For more details on each of the specs in an auto-compaction configuration, see
[Automatic compaction dynamic
configuration](../configuration/index.md#automatic-compaction-dynamic-configuration).
+## Auto-compaction using compaction supervisors
+
+:::info
+For advanced time-based data lifecycle management — such as coarsening segment
granularity, deleting old rows, or changing compression as data ages — see
[Cascading reindexing](cascading-reindexing.md).
+:::
+
+The recommended way to run automatic compaction is using compaction
supervisors on the Overlord. Compaction supervisors provide the following
benefits:
+
+* Can use the supervisor framework to get information about the
auto-compaction, such as status or state
+* More easily suspend or resume compaction for a datasource
+* Can use either the native compaction engine or the [MSQ task
engine](#use-msq-for-auto-compaction)
+* More reactive and submits tasks as soon as a compaction slot is available
+* Tracked compaction task status to avoid re-compacting an interval repeatedly
+* Can be configured to store compact fingerprints instead of full compaction
state in metadata storage
+* Supports minor compaction through the [Most fragmented first
policy](../api-reference/automatic-compaction-api.md#compaction-policy-mostfragmentedfirst)
+
+To use a compaction supervisor, submit the auto-compaction configuration as a
supervisor spec. Set `type` to `autocompact` and include the auto-compaction
config in the `spec`.
+
+To submit an automatic compaction task, you can submit a supervisor spec
through the [web console](#manage-compaction-supervisors-with-the-web-console)
or the [supervisor API](#manage-compaction-supervisors-with-supervisor-apis).
+
+
+### Manage compaction supervisors with the web console
+
+To submit a supervisor spec for MSQ task engine automatic compaction, perform
the following steps:
+
+1. In the web console, go to the **Supervisors** tab.
+1. Click **...** > **Submit JSON supervisor**.
+1. In the dialog, include the following:
+ - The type of supervisor spec by setting `"type": "autocompact"`
+ - The compaction configuration by adding it to the `spec` field
+ ```json
+ {
+ "type": "autocompact",
+ "spec": {
+ "dataSource": YOUR_DATASOURCE,
+ "tuningConfig": {...},
+ "granularitySpec": {...},
+ "engine": <native|msq>,
+ ...
+ }
+ ```
+1. Submit the supervisor.
+
+To stop the automatic compaction task, suspend or terminate the supervisor
through the UI or API.
+
+### Manage compaction supervisors with supervisor APIs
+
+Submitting an automatic compaction as a supervisor task uses the same endpoint
as supervisor tasks for streaming ingestion.
+
+The following example configures auto-compaction for the `wikipedia`
datasource:
+
+```sh
+curl --location --request POST
'http://localhost:8081/druid/indexer/v1/supervisor' \
+--header 'Content-Type: application/json' \
+--data-raw '{
+ "type": "autocompact", // required
+ "suspended": false, // optional
+ "spec": { // required
+ "dataSource": "wikipedia", // required
+ "tuningConfig": {...}, // optional
+ "granularitySpec": {...}, // optional
+ "engine": <native|msq>, // optional
+ ...
+ }
+}'
+```
+
+Note that if you omit `spec.engine`, Druid uses the default compaction engine.
You can control the default compaction engine by setting `engine` in the
[compaction dynamic
config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config).
If `spec.engine` and `engine` in compaction dynamic config are omitted, Druid
defaults to the `native` engine.
+
+To stop the automatic compaction task, suspend or terminate the supervisor
through the UI or API.
+
+### Use MSQ for auto-compaction
+
+The MSQ task engine is available as a compaction engine if you configure
auto-compaction to use compaction supervisors. To use the MSQ task engine for
automatic compaction, make sure the following requirements are met:
+
+* Enable [incremental segment metadata
caching](../configuration/index.md#metadata-retrieval) on the Overlord.
+* Enable [Auto-compaction using compaction
supervisors](#auto-compaction-using-compaction-supervisors).
+* Update the [compaction dynamic
config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config)
and set `engine` to `msq` to use the MSQ task engine as the default compaction
engine for all compaction supervisors.
+ * Alternatively, you may override the default engine by setting
`spec.engine` to `msq` in the compaction supervisor for the relevant datasource.
+* Have at least two compaction task slots available or set
`spec.taskContext.maxNumTasks` to two or more. The MSQ task engine requires at
least two tasks to run, one controller task and one worker task.
+
+You can use [MSQ task engine context
parameters](../multi-stage-query/reference.md#context-parameters) in
`spec.taskContext` when configuring your datasource for automatic compaction,
such as setting the maximum number of tasks using the
`spec.taskContext.maxNumTasks` parameter. Some of the MSQ task engine context
parameters overlap with automatic compaction parameters. When these settings
overlap, set one or the other.
+
+
+#### MSQ task engine limitations
+
+<!--This list also exists in multi-stage-query/known-issues-->
+
+When using the MSQ task engine for auto-compaction, keep the following
limitations in mind:
+
+- The `metricSpec` field is only supported for certain aggregators. For more
information, see [Supported aggregators](#supported-aggregators).
+- Only dynamic and range-based partitioning are supported.
+- Set `rollup` to `true` if and only if `metricSpec` is not empty or null.
+- You can only partition on string dimensions. However, multi-valued string
dimensions are not supported.
+- The `maxTotalRows` config is not supported in `DynamicPartitionsSpec`. Use
`maxRowsPerSegment` instead.
+- Segments can only be sorted on `__time` as the first column.
+
+#### Supported aggregators
+
+Auto-compaction using the MSQ task engine supports only aggregators that
satisfy the following properties:
+* __Mergeability__: can combine partial aggregates
+* __Idempotency__: produces the same results on repeated runs of the
aggregator on previously aggregated values in a column
+
+This is exemplified by the following `longSum` aggregator:
+
+```
+{"name": "added", "type": "longSum", "fieldName": "added"}
+```
+
+where `longSum` being capable of combining partial results satisfies
mergeability, while input and output column being the same (`added`) ensures
idempotency.
+
+The following are some examples of aggregators that aren't supported since at
least one of the required conditions aren't satisfied:
+
+* `longSum` aggregator where the `added` column rolls up into `sum_added`
column discarding the input `added` column, violating idempotency, as
subsequent runs would no longer find the `added` column:
+ ```
+ {"name": "sum_added", "type": "longSum", "fieldName": "added"}
+ ```
+* Partial sketches which cannot themselves be used to combine partial
aggregates and need merging aggregators -- such as `HLLSketchMerge` required
for `HLLSketchBuild` aggregator below -- violating mergeability:
+ ```
+ {"name": "added", "type": "HLLSketchBuild", "fieldName": "added"}
+ ```
+* Count aggregator since it cannot be used to combine partial aggregates and
it rolls up into a different `count` column discarding the input column(s),
violating both mergeability and idempotency.
+ ```
+ {"type": "count", "name": "count"}
+ ```
+
+
## Auto-compaction using Coordinator duties
-You can control how often the Coordinator checks to see if auto-compaction is
needed. The Coordinator [indexing
period](../configuration/index.md#data-management),
`druid.coordinator.period.indexingPeriod`, controls the frequency of compaction
tasks.
+As an alternative to compaction supervisors, you can run automatic compaction
using Coordinator duties. The Coordinator [indexing
period](../configuration/index.md#data-management),
`druid.coordinator.period.indexingPeriod`, controls the frequency of compaction
tasks.
The default indexing period is 30 minutes, meaning that the Coordinator first
checks for segments to compact at most 30 minutes from when auto-compaction is
enabled.
This time period also affects other Coordinator duties such as cleanup of
unused segments and stale pending segments.
To configure the auto-compaction time period without interfering with
`indexingPeriod`, see [Set frequency of compaction
runs](#change-compaction-frequency).
@@ -94,10 +218,15 @@ At every invocation of auto-compaction, the Coordinator
initiates a [segment sea
When there are eligible segments to compact, the Coordinator issues compaction
tasks based on available worker capacity.
If a compaction task takes longer than the indexing period, the Coordinator
waits for it to finish before resuming the period for segment search.
-No additional configuration is needed to run automatic compaction tasks using
the Coordinator and native engine. This is the default behavior for Druid.
-You can configure it for a datasource through the web console or
programmatically via an API.
+You can configure Coordinator-based auto-compaction for a datasource through
the web console or programmatically via an API.
This process differs for manual compaction tasks, which can be submitted from
the [Tasks view of the web console](../operations/web-console.md) or the [Tasks
API](../api-reference/tasks-api.md).
+To use Coordinator-based auto-compaction, the following configuration
requirements must be met:
+
+* update the [compaction dynamic
config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config)
and set:
+ * `useSupervisors` to `false`
+ * `engine` to `native` to use the native engine, since MSQ task engine is
not supported as the compaction engine in Coordinator.
+
### Manage auto-compaction using the web console
Use the web console to enable automatic compaction for a datasource as follows:
@@ -106,14 +235,14 @@ Use the web console to enable automatic compaction for a
datasource as follows:
2. In the **Compaction** column, click the edit icon for the datasource to
compact.
3. In the **Compaction config** dialog, configure the auto-compaction
settings. The dialog offers a form view as well as a JSON view. Editing the
form updates the JSON specification, and editing the JSON updates the form
field, if present. Form fields not present in the JSON indicate default values.
You may add additional properties to the JSON for auto-compaction settings not
displayed in the form. See [Configure automatic
compaction](#auto-compaction-syntax) for supported settings for [...]
4. Click **Submit**.
-5. Refresh the **Datasources** view. The **Compaction** column for the
datasource changes from “Not enabled” to “Awaiting first run.”
+5. Refresh the **Datasources** view. The **Compaction** column for the
datasource changes from "Not enabled" to "Awaiting first run."
The following screenshot shows the compaction config dialog for a datasource
with auto-compaction enabled.

To disable auto-compaction for a datasource, click **Delete** from the
**Compaction config** dialog. Druid does not retain your auto-compaction
configuration.
-### Manage auto-compaction using Coordinator APIs
+### Manage auto-compaction using Coordinator APIs
Use the [Automatic compaction
API](../api-reference/automatic-compaction-api.md#manage-automatic-compaction)
to configure automatic compaction.
To enable auto-compaction for a datasource, create a JSON object with the
desired auto-compaction settings.
@@ -201,7 +330,7 @@ The following auto-compaction configuration compacts
existing `HOUR` segments in
"granularitySpec": {
"segmentGranularity": "DAY"
},
- "skipOffsetFromLatest": "P1W",
+ "skipOffsetFromLatest": "P1W"
}
```
@@ -228,144 +357,6 @@ The following auto-compaction configuration compacts
updates the `wikipedia` seg
}
```
-## Auto-compaction using compaction supervisors
-
-:::info
-For advanced time-based data lifecycle management — such as coarsening segment
granularity, deleting old rows, or changing compression as data ages — see
[Cascading reindexing](cascading-reindexing.md).
-:::
-
-You can run automatic compaction using compaction supervisors on the Overlord
rather than Coordinator duties. Compaction supervisors provide the following
benefits over Coordinator duties:
-
-* Can use the supervisor framework to get information about the
auto-compaction, such as status or state
-* More easily suspend or resume compaction for a datasource
-* Can use either the native compaction engine or the [MSQ task
engine](#use-msq-for-auto-compaction)
-* More reactive and submits tasks as soon as a compaction slot is available
-* Tracked compaction task status to avoid re-compacting an interval repeatedly
-* Uses new Indexing State Fingerprinting mechanisms to store less data per
segment in metadata storage
-
-
-To use compaction supervisors, the following configuration requirements must
be met:
-
-* You must be using incremental segment metadata caching:
- * `druid.manager.segments.useIncrementalCache` set to `always` or `ifSynced`
in your Overlord and Coordinator runtime properties.
- * See [Segment metadata
caching](../configuration/index.md#metadata-retrieval) for full configuration
documentation.
-
-* update the [compaction dynamic
config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config)
and set:
- * `useSupervisors` to `true` so that compaction tasks can be run as
supervisor tasks
- * `engine` to `msq` to use the MSQ task engine as the compaction engine or
to `native` (default value) to use the native engine.
-
-Compaction supervisors use the same syntax as auto-compaction using
Coordinator duties with one key difference: you submit the auto-compaction as a
supervisor spec. In the spec, set the `type` to `autocompact` and include the
auto-compaction config in the `spec`.
-
-To submit an automatic compaction task, you can submit a supervisor spec
through the [web console](#manage-compaction-supervisors-with-the-web-console)
or the [supervisor API](#manage-compaction-supervisors-with-supervisor-apis).
-
-
-### Manage compaction supervisors with the web console
-
-To submit a supervisor spec for MSQ task engine automatic compaction, perform
the following steps:
-
-1. In the web console, go to the **Supervisors** tab.
-1. Click **...** > **Submit JSON supervisor**.
-1. In the dialog, include the following:
- - The type of supervisor spec by setting `"type": "autocompact"`
- - The compaction configuration by adding it to the `spec` field
- ```json
- {
- "type": "autocompact",
- "spec": {
- "dataSource": YOUR_DATASOURCE,
- "tuningConfig": {...},
- "granularitySpec": {...},
- "engine": <native|msq>,
- ...
- }
- ```
-1. Submit the supervisor.
-
-To stop the automatic compaction task, suspend or terminate the supervisor
through the UI or API.
-
-### Manage compaction supervisors with supervisor APIs
-
-Submitting an automatic compaction as a supervisor task uses the same endpoint
as supervisor tasks for streaming ingestion.
-
-The following example configures auto-compaction for the `wikipedia`
datasource:
-
-```sh
-curl --location --request POST
'http://localhost:8081/druid/indexer/v1/supervisor' \
---header 'Content-Type: application/json' \
---data-raw '{
- "type": "autocompact", // required
- "suspended": false, // optional
- "spec": { // required
- "dataSource": "wikipedia", // required
- "tuningConfig": {...}, // optional
- "granularitySpec": {...}, // optional
- "engine": <native|msq>, // optional
- ...
- }
-}'
-```
-
-Note that if you omit `spec.engine`, Druid uses the default compaction engine.
You can control the default compaction engine by setting `engine` in the
[compaction dynamic
config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config).
If `spec.engine` and `engine` in compaction dynamic config are omitted, Druid
defaults to the `native` engine.
-
-To stop the automatic compaction task, suspend or terminate the supervisor
through the UI or API.
-
-### Use MSQ for auto-compaction
-
-The MSQ task engine is available as a compaction engine if you configure
auto-compaction to use compaction supervisors. To use the MSQ task engine for
automatic compaction, make sure the following requirements are met:
-
-* Enable [incremental segment metadata
caching](../configuration/index.md#metadata-retrieval) on the Overlord.
-* Enable [Auto-compaction using compaction
supervisors](#auto-compaction-using-compaction-supervisors).
-* Update the [compaction dynamic
config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config)
and set `engine` to `msq` to use the MSQ task engine as the default compaction
engine for all compaction supervisors.
- * Alternatively, you may override the default engine by setting
`spec.engine` to `msq` in the compaction supervisor for the relevant datasource.
-* Have at least two compaction task slots available or set
`spec.taskContext.maxNumTasks` to two or more. The MSQ task engine requires at
least two tasks to run, one controller task and one worker task.
-
-You can use [MSQ task engine context
parameters](../multi-stage-query/reference.md#context-parameters) in
`spec.taskContext` when configuring your datasource for automatic compaction,
such as setting the maximum number of tasks using the
`spec.taskContext.maxNumTasks` parameter. Some of the MSQ task engine context
parameters overlap with automatic compaction parameters. When these settings
overlap, set one or the other.
-
-
-#### MSQ task engine limitations
-
-<!--This list also exists in multi-stage-query/known-issues-->
-
-When using the MSQ task engine for auto-compaction, keep the following
limitations in mind:
-
-- The `metricSpec` field is only supported for certain aggregators. For more
information, see [Supported aggregators](#supported-aggregators).
-- Only dynamic and range-based partitioning are supported.
-- Set `rollup` to `true` if and only if `metricSpec` is not empty or null.
-- You can only partition on string dimensions. However, multi-valued string
dimensions are not supported.
-- The `maxTotalRows` config is not supported in `DynamicPartitionsSpec`. Use
`maxRowsPerSegment` instead.
-- Segments can only be sorted on `__time` as the first column.
-
-#### Supported aggregators
-
-Auto-compaction using the MSQ task engine supports only aggregators that
satisfy the following properties:
-* __Mergeability__: can combine partial aggregates
-* __Idempotency__: produces the same results on repeated runs of the
aggregator on previously aggregated values in a column
-
-This is exemplified by the following `longSum` aggregator:
-
-```
-{"name": "added", "type": "longSum", "fieldName": "added"}
-```
-
-where `longSum` being capable of combining partial results satisfies
mergeability, while input and output column being the same (`added`) ensures
idempotency.
-
-The following are some examples of aggregators that aren't supported since at
least one of the required conditions aren't satisfied:
-
-* `longSum` aggregator where the `added` column rolls up into `sum_added`
column discarding the input `added` column, violating idempotency, as
subsequent runs would no longer find the `added` column:
- ```
- {"name": "sum_added", "type": "longSum", "fieldName": "added"}
- ```
-* Partial sketches which cannot themselves be used to combine partial
aggregates and need merging aggregators -- such as `HLLSketchMerge` required
for `HLLSketchBuild` aggregator below -- violating mergeability:
- ```
- {"name": "added", "type": "HLLSketchBuild", "fieldName": "added"}
- ```
-* Count aggregator since it cannot be used to combine partial aggregates and
it rolls up into a different `count` column discarding the input column(s),
violating both mergeability and idempotency.
- ```
- {"type": "count", "name": "count"}
- ```
-
-
-
## Learn more
See the following topics for more information:
diff --git a/docs/data-management/cascading-reindexing.md
b/docs/data-management/cascading-reindexing.md
index 5f7b7f37b09..1cc11764ad0 100644
--- a/docs/data-management/cascading-reindexing.md
+++ b/docs/data-management/cascading-reindexing.md
@@ -40,9 +40,7 @@ Cascading reindexing handles all of this automatically by
generating a timeline
Before using cascading reindexing, ensure your cluster meets the following
requirements:
-- **Compaction supervisors enabled**: Set `useSupervisors` to `true` in the
[compaction dynamic
config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config).
- **MSQ compaction engine**: Set `engine` to `msq` in the compaction dynamic
config or in the supervisor spec.
-- **Incremental segment metadata caching**: Set
`druid.manager.segments.useIncrementalCache` to `always` or `ifSynced` in your
Overlord and Coordinator runtime properties. See [Segment metadata
caching](../configuration/index.md#metadata-retrieval).
- **At least two compaction task slots**: The MSQ task engine requires at
least two tasks (one controller, one worker).
## How cascading reindexing works
diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md
index e37ba48b544..db87e3768d9 100644
--- a/docs/data-management/delete.md
+++ b/docs/data-management/delete.md
@@ -127,7 +127,7 @@ Refer to [Data management on the
Coordinator](../configuration/index.md#data-man
:::info
This is an experimental feature that:
-- Can be used only if [segment metadata
caching](../configuration/index.md#segment-metadata-cache-experimental) is
enabled on the Overlord.
+- Can be used only if [segment metadata
caching](../configuration/index.md#segment-metadata-cache) is enabled on the
Overlord.
- MUST NOT be used if auto-kill of unused segments is already enabled on the
Coordinator.
:::
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 5b99f1be5b0..6f87b7f5c96 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -364,7 +364,7 @@ If the JVM does not support CPU time measurement for the
current thread, `ingest
### Segment metadata cache
-The following metrics are emitted only when [segment metadata
caching](../configuration/index.md#segment-metadata-cache-experimental) is
enabled on the Overlord.
+The following metrics are emitted only when [segment metadata
caching](../configuration/index.md#segment-metadata-cache) is enabled on the
Overlord.
|Metric|Description|Dimensions|
|------|-----------|----------|
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java
index d7d12b397dc..63eb0bbaff1 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java
@@ -49,6 +49,7 @@ import
org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
@@ -98,7 +99,6 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
import java.io.Closeable;
import java.util.ArrayList;
@@ -188,14 +188,24 @@ public class AutoCompactionTest extends CompactionTestBase
)
);
- private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000;
+ private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10_000;
private static final Period NO_SKIP_OFFSET = Period.seconds(0);
private static final FixedIntervalOrderPolicy COMPACT_NOTHING_POLICY = new
FixedIntervalOrderPolicy(List.of());
+ private static final ClusterCompactionConfig
DEFAULT_LEGACY_COMPACTION_CONFIG = new ClusterCompactionConfig(0.5, 10, null,
false, CompactionEngine.NATIVE, true);
public static List<CompactionEngine> getEngine()
{
return List.of(CompactionEngine.NATIVE);
}
+
+ public static List<ClusterCompactionConfig> getClusterCompactionConfig()
+ {
+ return List.of(
+ DEFAULT_LEGACY_COMPACTION_CONFIG,
+ new ClusterCompactionConfig(0.5, 10, null, true,
CompactionEngine.NATIVE, true),
+ new ClusterCompactionConfig(0.5, 10, null, true, CompactionEngine.MSQ,
true)
+ );
+ }
@Override
protected EmbeddedDruidCluster createCluster()
@@ -203,14 +213,14 @@ public class AutoCompactionTest extends CompactionTestBase
// Use timeout required for hash partitioning task
return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
- .useDefaultTimeoutForLatchableEmitter(30)
+ .useDefaultTimeoutForLatchableEmitter(300)
.addExtension(SketchModule.class)
.addExtension(HllSketchModule.class)
.addExtension(DoublesSketchModule.class)
.addServer(overlord)
.addServer(coordinator)
.addServer(broker)
- .addServer(new
EmbeddedIndexer().addProperty("druid.worker.capacity", "10"))
+ .addServer(new
EmbeddedIndexer().addProperty("druid.worker.capacity",
"10").setServerMemory(2_000_000_000L))
.addServer(new EmbeddedHistorical())
.addServer(new EmbeddedRouter());
}
@@ -226,10 +236,9 @@ public class AutoCompactionTest extends CompactionTestBase
}
@BeforeEach
- public void resetCompactionTaskSlots()
+ public void resetClusterCompactionConfig()
{
- // Set compaction slot to 5
- updateCompactionTaskSlot(0.5, 10);
+ updateClusterConfig(DEFAULT_LEGACY_COMPACTION_CONFIG);
fullDatasourceName = dataSource;
}
@@ -353,7 +362,7 @@ public class AutoCompactionTest extends CompactionTestBase
}
}
- @Test()
+ @Test
public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics()
throws Exception
{
// added = 31, count = null, sum_added = null
@@ -1504,12 +1513,11 @@ public class AutoCompactionTest extends
CompactionTestBase
}
}
- @ValueSource(booleans = {false})
- @ParameterizedTest(name = "useSupervisors={0}")
- public void testAutoCompactionDutyWithFilter(boolean useSupervisors) throws
Exception
+ @MethodSource("getClusterCompactionConfig")
+ @ParameterizedTest(name = "clusterCompactionConfig={0}")
+ public void testAutoCompactionDutyWithFilter(ClusterCompactionConfig config)
throws Exception
{
- updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null,
useSupervisors, null, true));
-
+ updateClusterConfig(config);
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<Interval> intervalsBeforeCompaction = getSegmentIntervals();
@@ -1533,7 +1541,7 @@ public class AutoCompactionTest extends CompactionTestBase
false,
CompactionEngine.NATIVE
);
- forceTriggerAutoCompaction(intervalsBeforeCompaction, useSupervisors, 2);
+ forceTriggerOrWaitForCompaction(intervalsBeforeCompaction, config, 2);
// For dim "page", result should only contain value "Striker Eureka"
verifyScanResult("added", "459.0");
@@ -1542,17 +1550,17 @@ public class AutoCompactionTest extends
CompactionTestBase
List<TaskStatusPlus> compactTasksBefore =
getCompleteTasksForDataSource(fullDatasourceName);
// Verify compacted segments does not get compacted again
- forceTriggerAutoCompaction(intervalsBeforeCompaction, useSupervisors, 2);
+ forceTriggerOrWaitForCompaction(intervalsBeforeCompaction, config, 2);
List<TaskStatusPlus> compactTasksAfter =
getCompleteTasksForDataSource(fullDatasourceName);
Assertions.assertEquals(compactTasksAfter.size(),
compactTasksBefore.size());
}
}
- @ValueSource(booleans = {false})
- @ParameterizedTest(name = "useSupervisors={0}")
- public void testAutoCompationDutyWithMetricsSpec(boolean useSupervisors)
throws Exception
+ @MethodSource("getClusterCompactionConfig")
+ @ParameterizedTest(name = "clusterCompactionConfig={0}")
+ public void testAutoCompationDutyWithMetricsSpec(ClusterCompactionConfig
config) throws Exception
{
- updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null,
useSupervisors, null, true));
+ updateClusterConfig(config);
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
@@ -1576,7 +1584,7 @@ public class AutoCompactionTest extends CompactionTestBase
false,
CompactionEngine.NATIVE
);
- forceTriggerAutoCompaction(intervalsBeforeCompaction, useSupervisors, 2);
+ forceTriggerOrWaitForCompaction(intervalsBeforeCompaction, config, 2);
// Result should be the same with the addition of new metrics,
"double_sum_added" and "long_sum_added".
// These new metrics should have the same value as the input field
"added"
@@ -1587,7 +1595,7 @@ public class AutoCompactionTest extends CompactionTestBase
List<TaskStatusPlus> compactTasksBefore =
getCompleteTasksForDataSource(fullDatasourceName);
// Verify compacted segments does not get compacted again
- forceTriggerAutoCompaction(intervalsBeforeCompaction, useSupervisors, 2);
+ forceTriggerOrWaitForCompaction(intervalsBeforeCompaction, config, 2);
List<TaskStatusPlus> compactTasksAfter =
getCompleteTasksForDataSource(fullDatasourceName);
Assertions.assertEquals(compactTasksAfter.size(),
compactTasksBefore.size());
}
@@ -1775,40 +1783,23 @@ public class AutoCompactionTest extends
CompactionTestBase
CompactionEngine engine
)
{
+ UserCompactionTaskQueryTuningConfig tuningConfig =
+ UserCompactionTaskQueryTuningConfig.builder()
+ .partitionsSpec(partitionsSpec)
+ .splitHintSpec(new
MaxSizeSplitHintSpec(null, 1))
+ .maxNumConcurrentSubTasks(1)
+ .totalNumMergeTasks(1)
+ .build();
DataSourceCompactionConfig dataSourceCompactionConfig =
InlineSchemaDataSourceCompactionConfig.builder()
.forDataSource(fullDatasourceName)
.withSkipOffsetFromLatest(skipOffsetFromLatest)
- .withTuningConfig(
- new
UserCompactionTaskQueryTuningConfig(
- null,
- null,
- null,
- null,
- new
MaxSizeSplitHintSpec(null, 1),
- partitionsSpec,
- null,
- null,
- null,
- null,
- null,
- 1,
- null,
- null,
- null,
- null,
- null,
- 1,
- null
- )
- )
+ .withTuningConfig(tuningConfig)
.withGranularitySpec(granularitySpec)
.withDimensionsSpec(dimensionsSpec)
.withMetricsSpec(metricsSpec)
.withTransformSpec(transformSpec)
- .withIoConfig(
- !dropExisting ? null : new
UserCompactionTaskIOConfig(true)
- )
+ .withIoConfig(!dropExisting ?
null : new UserCompactionTaskIOConfig(true))
.withEngine(engine)
.withTaskContext(ImmutableMap.of("maxNumTasks", 2))
.build();
@@ -1840,32 +1831,31 @@ public class AutoCompactionTest extends
CompactionTestBase
* {@link #fullDatasourceName}, and verifies the total number of segments in
* the datasource after compaction.
*/
- private void forceTriggerAutoCompaction(
+ private void forceTriggerOrWaitForCompaction(
List<Interval> intervals,
- boolean useSupervisors,
+ ClusterCompactionConfig config,
int numExpectedSegmentsAfterCompaction
- ) throws Exception
+ )
{
- if (useSupervisors) {
+ if (config.isUseSupervisors()) {
// Enable compaction for the requested intervals
final FixedIntervalOrderPolicy policy = new FixedIntervalOrderPolicy(
intervals.stream().map(
interval -> new
FixedIntervalOrderPolicy.Candidate(fullDatasourceName, interval)
).collect(Collectors.toList())
);
- updateClusterConfig(
- new ClusterCompactionConfig(0.5, intervals.size(), policy, true,
null, true)
- );
+ updateClusterConfig(config.toBuilder().compactionPolicy(policy).build());
- // Wait for scheduler to pick up the compaction job
- // Instead of sleep, we can latch on a relevant metric later
- Thread.sleep(30_000);
+ // Wait for all compaction jobs to be submitted
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("interval/waitCompact/count")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource)
+ .hasValueMatching(Matchers.equalTo(0L))
+ );
waitForCompactionToFinish(numExpectedSegmentsAfterCompaction);
// Disable all compaction
- updateClusterConfig(
- new ClusterCompactionConfig(0.5, intervals.size(),
COMPACT_NOTHING_POLICY, true, null, true)
- );
+
updateClusterConfig(config.toBuilder().compactionPolicy(COMPACT_NOTHING_POLICY).build());
} else {
forceTriggerAutoCompaction(numExpectedSegmentsAfterCompaction);
}
@@ -1908,7 +1898,7 @@ public class AutoCompactionTest extends CompactionTestBase
Assertions.assertEquals(actualTombstoneCount,
expectedCompactedTombstoneCount);
}
- private void verifySegmentsCompacted(PartitionsSpec partitionsSpec, int
expectedCompactedSegmentCount)
+ private void verifySegmentsCompacted(PartitionsSpec expectedPartitionsSpec,
int expectedCompactedSegmentCount)
{
Set<DataSegment> segments =
cluster.callApi().getVisibleUsedSegments(dataSource, overlord);
List<DataSegment> foundCompactedSegments = new ArrayList<>();
@@ -1921,7 +1911,7 @@ public class AutoCompactionTest extends CompactionTestBase
for (DataSegment compactedSegment : foundCompactedSegments) {
Assertions.assertNotNull(compactedSegment.getLastCompactionState());
Assertions.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec());
-
Assertions.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec(),
partitionsSpec);
+ Assertions.assertEquals(expectedPartitionsSpec,
compactedSegment.getLastCompactionState().getPartitionsSpec());
}
}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionUpgradeTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionUpgradeTest.java
index 708ad28607e..9aca257f663 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionUpgradeTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionUpgradeTest.java
@@ -22,9 +22,11 @@ package org.apache.druid.testing.embedded.compact;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.server.coordinator.CatalogDataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
@@ -73,8 +75,8 @@ public class AutoCompactionUpgradeTest extends
EmbeddedClusterTestBase
overlord.start();
// Verify that compaction config already exist. This config was inserted
manually into the database using SQL script.
- DruidCompactionConfig coordinatorCompactionConfig =
DruidCompactionConfig.empty()
- .withDatasourceConfigs(compactionResource.getAllCompactionConfigs());
+ DruidCompactionConfig coordinatorCompactionConfig =
+
DruidCompactionConfig.empty().withDatasourceConfigs(compactionResource.getAllCompactionConfigs());
DataSourceCompactionConfig foundDataSourceCompactionConfig
=
coordinatorCompactionConfig.findConfigForDatasource(dataSource).orNull();
Assertions.assertNotNull(foundDataSourceCompactionConfig);
@@ -132,18 +134,16 @@ public class AutoCompactionUpgradeTest extends
EmbeddedClusterTestBase
*/
private void insertMinimalCompactionConfig(TestDerbyConnector sqlConnector)
{
- final String configJson = StringUtils.format(
- "{\"compactionConfigs\":[{\"dataSource\":\"%s\"}]}",
- dataSource
- );
-
+ DataSourceCompactionConfig dataSourceCompactionConfig =
+ new CatalogDataSourceCompactionConfig(dataSource, null, Period.ZERO,
null, null, null, null);
+ DruidCompactionConfig config =
DruidCompactionConfig.legacy().withDatasourceConfig(dataSourceCompactionConfig);
sqlConnector.retryWithHandle(
handle -> handle.insert(
StringUtils.format(
"INSERT INTO %s (name, payload) VALUES
('coordinator.compaction.config',?)",
sqlConnector.getMetadataTablesConfig().getConfigTable()
),
- configJson.getBytes(StandardCharsets.UTF_8)
+ new
DefaultObjectMapper().writeValueAsString(config).getBytes(StandardCharsets.UTF_8)
)
);
}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionResourceTestClient.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionResourceTestClient.java
index 0229add690c..2abd48a5291 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionResourceTestClient.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionResourceTestClient.java
@@ -34,6 +34,7 @@ import org.apache.druid.server.http.ServletResourceUtils;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.EmbeddedServiceClient;
import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.junit.jupiter.api.Assertions;
import java.util.List;
import java.util.Map;
@@ -124,6 +125,7 @@ public class CompactionResourceTestClient
clusterConfig.getCompactionTaskSlotRatio(),
clusterConfig.getMaxCompactionTaskSlots()
);
+ Assertions.assertFalse(clusterConfig.isUseSupervisors());
final CompactionSimulateResult simulateResult = simulateRunOnCoordinator();
log.info(
"Triggering compaction duty on Coordinator. Expected jobs: %s",
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java
index c445d52a383..eae3a492f6a 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java
@@ -275,7 +275,7 @@ public class OverlordClientTest extends
EmbeddedClusterTestBase
public void test_isCompactionSupervisorEnabled()
{
Boolean result =
cluster.callApi().onLeaderOverlord(OverlordClient::isCompactionSupervisorEnabled);
- Assertions.assertFalse(result);
+ Assertions.assertTrue(result);
}
@Test
diff --git
a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
index d5093d4ffaf..13793def20d 100644
---
a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
+++
b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.catalog.sync.CatalogClient;
import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.TaskBuilder;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
@@ -112,7 +113,7 @@ public class CatalogCompactionTest extends
EmbeddedClusterTestBase
// Create a catalog compaction config
CatalogDataSourceCompactionConfig compactionConfig =
- new CatalogDataSourceCompactionConfig(dataSource, null, Period.ZERO,
null, null, null, null);
+ new CatalogDataSourceCompactionConfig(dataSource,
CompactionEngine.NATIVE, Period.ZERO, null, null, null, null);
final CompactionSupervisorSpec compactionSupervisor
= new CompactionSupervisorSpec(compactionConfig, false, null);
@@ -123,6 +124,7 @@ public class CatalogCompactionTest extends
EmbeddedClusterTestBase
event -> event.hasMetricName("task/run/time")
.hasDimension(DruidMetrics.TASK_TYPE, "compact")
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
+ .hasDimension(DruidMetrics.TASK_STATUS, "SUCCESS")
);
// Verify that segments are now compacted to MONTH granularity
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskIdTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskIdTest.java
index b3b759a64c2..0a7ee7c49cb 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskIdTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskIdTest.java
@@ -28,25 +28,25 @@ public class K8sTaskIdTest
@Test
public void testModifyingTaskIDToBeK8sCompliant()
{
- String original =
"coordinator-issued_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z";
+ String original =
"auto_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z";
String result = new K8sTaskId(null, original).getK8sJobName();
-
Assertions.assertEquals("coordinatorissuedcompactk8smet-2e2c1862cb7ad1d01f4794b27a4438b0",
result);
+
Assertions.assertEquals("autocompactk8smetricsaeifmefd2-4b1aea2eb0593c194d7b4046df1bdd22",
result);
}
@Test
public void testModifyingTaskIDWithEmptyK8sTaskPrefixToBeK8sCompliant()
{
- String original =
"coordinator-issued_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z";
+ String original =
"auto_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z";
String result = new K8sTaskId("", original).getK8sJobName();
-
Assertions.assertEquals("coordinatorissuedcompactk8smet-2e2c1862cb7ad1d01f4794b27a4438b0",
result);
+
Assertions.assertEquals("autocompactk8smetricsaeifmefd2-4b1aea2eb0593c194d7b4046df1bdd22",
result);
}
@Test
public void testModifyingTaskIDWithLongK8sTaskPrefixToBeK8sCompliant()
{
- String original =
"coordinator-issued_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z";
+ String original =
"auto_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z";
String result = new K8sTaskId("this-is_a:VERY-very-long-task-name-prefix",
original).getK8sJobName();
-
Assertions.assertEquals("thisisaveryverylongtasknamepre-2e2c1862cb7ad1d01f4794b27a4438b0",
result);
+
Assertions.assertEquals("thisisaveryverylongtasknamepre-4b1aea2eb0593c194d7b4046df1bdd22",
result);
}
@Test
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesOverlordUtilsTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesOverlordUtilsTest.java
index b5919efc092..f449fb31ca1 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesOverlordUtilsTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesOverlordUtilsTest.java
@@ -71,8 +71,8 @@ public class KubernetesOverlordUtilsTest
@Test
public void test_stripJobName_avoidDuplicatesWithLongDataSourceName()
{
- String jobName1 =
KubernetesOverlordUtils.convertTaskIdToJobName("coordinator-issued_compact_1234_telemetry_wikipedia_geteditfailuresinnorthamerica_agg_summ_116_pcgkebcl_2023-07-19T16:53:11.416Z");
- String jobName2 =
KubernetesOverlordUtils.convertTaskIdToJobName("coordinator-issued_compact_1234_telemetry_wikipedia_geteditfailuresinnorthamerica_agg_summ_117_pcgkebcl_2023-07-19T16:53:11.416Z");
+ String jobName1 =
KubernetesOverlordUtils.convertTaskIdToJobName("auto_compact_1234_telemetry_wikipedia_geteditfailuresinnorthamerica_agg_summ_116_pcgkebcl_2023-07-19T16:53:11.416Z");
+ String jobName2 =
KubernetesOverlordUtils.convertTaskIdToJobName("auto_compact_1234_telemetry_wikipedia_geteditfailuresinnorthamerica_agg_summ_117_pcgkebcl_2023-07-19T16:53:11.416Z");
Assertions.assertNotEquals(jobName1, jobName2);
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
index ffbc7b992e2..8fd23da5e15 100644
---
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
+++
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
@@ -51,7 +51,7 @@ public class SegmentsMetadataManagerConfig
)
{
this.pollDuration = Configs.valueOrDefault(pollDuration,
Period.minutes(1));
- this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache,
SegmentMetadataCache.UsageMode.NEVER);
+ this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache,
SegmentMetadataCache.UsageMode.IF_SYNCED);
this.killUnused = Configs.valueOrDefault(killUnused, new
UnusedSegmentKillerConfig(null, null, null));
if (this.killUnused.isEnabled() && this.useIncrementalCache ==
SegmentMetadataCache.UsageMode.NEVER) {
throw DruidException
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java
index 08e543504a6..5de14f594f4 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java
@@ -61,7 +61,7 @@ public class ClusterCompactionConfig
this.maxCompactionTaskSlots =
Configs.valueOrDefault(maxCompactionTaskSlots, Integer.MAX_VALUE);
this.compactionPolicy = Configs.valueOrDefault(compactionPolicy,
DEFAULT_COMPACTION_POLICY);
this.engine = Configs.valueOrDefault(engine, CompactionEngine.NATIVE);
- this.useSupervisors = Configs.valueOrDefault(useSupervisors, false);
+ this.useSupervisors = Configs.valueOrDefault(useSupervisors, true);
this.storeCompactionStatePerSegment = Configs.valueOrDefault(
storeCompactionStatePerSegment,
true
@@ -158,4 +158,76 @@ public class ClusterCompactionConfig
", storeCompactionStatePerSegment=" +
storeCompactionStatePerSegment +
'}';
}
+
+ public Builder toBuilder()
+ {
+ return new Builder().compactionTaskSlotRatio(compactionTaskSlotRatio)
+ .maxCompactionTaskSlots(maxCompactionTaskSlots)
+ .compactionPolicy(compactionPolicy)
+ .useSupervisors(useSupervisors)
+
.storeCompactionStatePerSegment(storeCompactionStatePerSegment);
+ }
+
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ public static class Builder
+ {
+ private Double compactionTaskSlotRatio;
+ private Integer maxCompactionTaskSlots;
+ private CompactionCandidateSearchPolicy compactionPolicy;
+ private Boolean useSupervisors;
+ private CompactionEngine engine;
+ private Boolean storeCompactionStatePerSegment;
+
+ public Builder compactionTaskSlotRatio(double compactionTaskSlotRatio)
+ {
+ this.compactionTaskSlotRatio = compactionTaskSlotRatio;
+ return this;
+ }
+
+ public Builder maxCompactionTaskSlots(int maxCompactionTaskSlots)
+ {
+ this.maxCompactionTaskSlots = maxCompactionTaskSlots;
+ return this;
+ }
+
+ public Builder compactionPolicy(CompactionCandidateSearchPolicy
compactionPolicy)
+ {
+ this.compactionPolicy = compactionPolicy;
+ return this;
+ }
+
+ public Builder useSupervisors(boolean useSupervisors)
+ {
+ this.useSupervisors = useSupervisors;
+ return this;
+ }
+
+ public Builder engine(CompactionEngine engine)
+ {
+ this.engine = engine;
+ return this;
+ }
+
+ public Builder storeCompactionStatePerSegment(boolean
storeCompactionStatePerSegment)
+ {
+ this.storeCompactionStatePerSegment = storeCompactionStatePerSegment;
+ return this;
+ }
+
+ public ClusterCompactionConfig build()
+ {
+ return new ClusterCompactionConfig(
+ compactionTaskSlotRatio,
+ maxCompactionTaskSlots,
+ compactionPolicy,
+ useSupervisors,
+ engine,
+ storeCompactionStatePerSegment
+ );
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java
index 3af188d9a03..54ca14ae260 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java
@@ -42,6 +42,9 @@ public class DruidCompactionConfig
private static final DruidCompactionConfig EMPTY_INSTANCE
= new DruidCompactionConfig(List.of(), null, null, null, null, null,
null);
+ private static final DruidCompactionConfig LEGACY_RUN_AS_COORDINATOR_DUTY =
+ new DruidCompactionConfig(List.of(), null, null, null, false,
CompactionEngine.NATIVE, null);
+
private final List<DataSourceCompactionConfig> compactionConfigs;
private final ClusterCompactionConfig clusterConfig;
@@ -79,6 +82,11 @@ public class DruidCompactionConfig
return EMPTY_INSTANCE;
}
+ public static DruidCompactionConfig legacy()
+ {
+ return LEGACY_RUN_AS_COORDINATOR_DUTY;
+ }
+
@JsonCreator
public DruidCompactionConfig(
@JsonProperty("compactionConfigs") List<DataSourceCompactionConfig>
compactionConfigs,
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index e60c512e6a5..cbb72762651 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -94,7 +94,7 @@ public class CompactSegments implements CoordinatorCustomDuty
private static final Logger LOG = new Logger(CompactSegments.class);
- private static final String TASK_ID_PREFIX = "coordinator-issued";
+ private static final String TASK_ID_PREFIX = "auto";
private final CompactionStatusTracker statusTracker;
private final OverlordClient overlordClient;
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/ClusterCompactionConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/ClusterCompactionConfigTest.java
new file mode 100644
index 00000000000..74dcd21f036
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/coordinator/ClusterCompactionConfigTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClusterCompactionConfigTest
+{
+ private static final ObjectMapper MAPPER = new DefaultObjectMapper();
+
+ @Test
+ public void testDefaults()
+ {
+ ClusterCompactionConfig config = new ClusterCompactionConfig(null, null,
null, null, null, null);
+
+ Assert.assertEquals(0.1, config.getCompactionTaskSlotRatio(), 0.0001);
+ Assert.assertEquals(Integer.MAX_VALUE, config.getMaxCompactionTaskSlots());
+ Assert.assertTrue(config.isUseSupervisors());
+ Assert.assertEquals(CompactionEngine.NATIVE, config.getEngine());
+ Assert.assertNotNull(config.getCompactionPolicy());
+ Assert.assertTrue(config.isStoreCompactionStatePerSegment());
+ }
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ ClusterCompactionConfig config = ClusterCompactionConfig.builder()
+
.compactionTaskSlotRatio(0.5)
+
.maxCompactionTaskSlots(10)
+
.useSupervisors(false)
+
.engine(CompactionEngine.NATIVE)
+
.storeCompactionStatePerSegment(false)
+ .build();
+
+ String json = MAPPER.writeValueAsString(config);
+ ClusterCompactionConfig deserialized = MAPPER.readValue(json,
ClusterCompactionConfig.class);
+
+ Assert.assertEquals(config, deserialized);
+ }
+
+ @Test
+ public void testBuilder()
+ {
+ NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy(null);
+ ClusterCompactionConfig config = ClusterCompactionConfig.builder()
+
.compactionTaskSlotRatio(0.3)
+
.maxCompactionTaskSlots(5)
+
.useSupervisors(true)
+
.engine(CompactionEngine.MSQ)
+
.compactionPolicy(policy)
+
.storeCompactionStatePerSegment(true)
+ .build();
+
+ Assert.assertEquals(0.3, config.getCompactionTaskSlotRatio(), 0.0001);
+ Assert.assertEquals(5, config.getMaxCompactionTaskSlots());
+ Assert.assertTrue(config.isUseSupervisors());
+ Assert.assertEquals(CompactionEngine.MSQ, config.getEngine());
+ Assert.assertEquals(policy, config.getCompactionPolicy());
+ Assert.assertTrue(config.isStoreCompactionStatePerSegment());
+ }
+
+ @Test
+ public void testToBuilder()
+ {
+ ClusterCompactionConfig original = ClusterCompactionConfig.builder()
+
.compactionTaskSlotRatio(0.7)
+
.maxCompactionTaskSlots(20)
+
.useSupervisors(false)
+
.engine(CompactionEngine.NATIVE)
+
.storeCompactionStatePerSegment(false)
+ .build();
+
+ ClusterCompactionConfig modified = original.toBuilder()
+ .compactionTaskSlotRatio(0.8)
+ .build();
+
+ Assert.assertEquals(0.8, modified.getCompactionTaskSlotRatio(), 0.0001);
+ Assert.assertEquals(20, modified.getMaxCompactionTaskSlots());
+ Assert.assertFalse(modified.isUseSupervisors());
+ }
+
+ @Test
+ public void testMsqEngineRequiresSupervisors()
+ {
+ DruidException e = Assert.assertThrows(
+ DruidException.class,
+ () ->
ClusterCompactionConfig.builder().useSupervisors(false).engine(CompactionEngine.MSQ).build()
+ );
+ Assert.assertEquals("MSQ Compaction engine can be used only with
compaction supervisors.", e.getMessage());
+ Assert.assertEquals(DruidException.Category.INVALID_INPUT,
e.getCategory());
+ }
+
+ @Test
+ public void testEqualsAndHashCode()
+ {
+ ClusterCompactionConfig config1 = ClusterCompactionConfig.builder()
+
.compactionTaskSlotRatio(0.5)
+
.maxCompactionTaskSlots(10)
+ .build();
+
+ ClusterCompactionConfig config2 = ClusterCompactionConfig.builder()
+
.compactionTaskSlotRatio(0.5)
+
.maxCompactionTaskSlots(10)
+ .build();
+
+ ClusterCompactionConfig config3 = ClusterCompactionConfig.builder()
+
.compactionTaskSlotRatio(0.6)
+
.maxCompactionTaskSlots(10)
+ .build();
+
+ Assert.assertEquals(config1, config2);
+ Assert.assertEquals(config1.hashCode(), config2.hashCode());
+ Assert.assertNotEquals(config1, config3);
+ Assert.assertNotEquals(config3, config2);
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java
index 8fae5623db4..29520111642 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java
@@ -45,6 +45,13 @@ public class DruidCompactionConfigTest
Assert.assertEquals(defaultConfig, deserialized);
}
+ @Test
+ public void testSerdeWithLegacyConfig() throws Exception
+ {
+ final String json =
"{\"compactionConfigs\":[],\"useSupervisors\":false,\"engine\":\"native\"}";
+ Assert.assertEquals(DruidCompactionConfig.legacy(), MAPPER.readValue(json,
DruidCompactionConfig.class));
+ }
+
@Test
public void testSerdeWithDatasourceConfigs() throws Exception
{
@@ -118,6 +125,7 @@ public class DruidCompactionConfigTest
Assert.assertTrue(config.getCompactionConfigs().isEmpty());
Assert.assertTrue(config.getCompactionPolicy() instanceof
NewestSegmentFirstPolicy);
Assert.assertEquals(CompactionEngine.NATIVE, config.getEngine());
+ Assert.assertTrue(config.isUseSupervisors());
Assert.assertEquals(0.1, config.getCompactionTaskSlotRatio(), 1e-9);
Assert.assertEquals(Integer.MAX_VALUE, config.getMaxCompactionTaskSlots());
Assert.assertTrue(config.isStoreCompactionStatePerSegment());
diff --git
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
index 75aefc82289..e96d800bec8 100644
---
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
@@ -105,7 +105,7 @@ public class CoordinatorCompactionConfigsResourceTest
Assert.assertEquals(0.1, defaultConfig.getCompactionTaskSlotRatio(),
DELTA);
Assert.assertEquals(Integer.MAX_VALUE,
defaultConfig.getMaxCompactionTaskSlots());
Assert.assertTrue(defaultConfig.getCompactionConfigs().isEmpty());
- Assert.assertFalse(defaultConfig.isUseSupervisors());
+ Assert.assertTrue(defaultConfig.isUseSupervisors());
Assert.assertEquals(CompactionEngine.NATIVE, defaultConfig.getEngine());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]