This is an automated email from the ASF dual-hosted git repository.
sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 325763fef23d [SPARK-55793][CORE] Add multiple log directories support
to SHS
325763fef23d is described below
commit 325763fef23d88ed1bf1b4a0f9922689d8f3c63e
Author: Kousuke Saruta <[email protected]>
AuthorDate: Fri Mar 6 14:53:19 2026 +0900
[SPARK-55793][CORE] Add multiple log directories support to SHS
### What changes were proposed in this pull request?
This PR proposes to add multiple log directories support to SHS, allowing
it to monitor event logs from multiple directories simultaneously.
This PR extends `spark.history.fs.logDirectory` to accept a comma-separated
list of directories (e.g., `hdfs:///logs/prod,s3a://bucket/logs/staging`).
Directories can be on the same or different filesystems. Also, a new optional
config `spark.history.fs.logDirectory.names` is added which allows users to
assign display names to directories by position (e.g., `Production,Staging`).
Empty entries fall back to the full path. Duplicate display names are rejected
at startup.
**Behavior of existing `spark.history.fs.*` settings with multiple
directories:**
All existing settings apply globally — there are no per-directory
configurations.
| Setting | Behavior |
|---------|----------|
| `update.interval` | One scan cycle covers all directories sequentially |
| `cleaner.interval` | One cleaner cycle operates on the unified listing
across all directories |
| `cleaner.maxAge` | Applied to each log entry regardless of which
directory it belongs to |
| `cleaner.maxNum` | Total count across all directories; oldest entries are
removed first regardless of directory |
| `numReplayThreads` | Thread pool is shared across all directories |
| `numCompactThreads` | Thread pool is shared across all directories |
| `eventLog.rolling.maxFilesToRetain` | Applied per-directory independently
|
| `update.batchSize` | Applied per-directory independently |
Regarding UI changes, a "Log Source" column is added to the History UI
table showing the display name (or full path) for each application, with a
tooltip showing the full path.
Regarding UI changes, A "Log Source" column is added to the History UI
table showing the display name (or full path) for each application, with a
tooltip showing the full path.
<img width="1699" height="681" alt="all-log-dirs"
src="https://github.com/user-attachments/assets/fc67aca3-563c-4cec-8b9c-8e44e81caa04"
/>
Users can filter applications by their log directory using `Filter by Log
Source` dropdown.
<img width="1714" height="390" alt="filter-by-log-dir"
src="https://github.com/user-attachments/assets/b590cedc-9c33-46ca-9314-fa50eb31777f"
/>
The `Event log directory` section in the History UI collapses into a
`<details>/<summary>` element when multiple directories are configured.
<img width="431" height="99" alt="unexpand"
src="https://github.com/user-attachments/assets/613ef9a0-348a-4e6a-b742-8b33b36142af"
/>
<img width="423" height="187" alt="expand"
src="https://github.com/user-attachments/assets/d3bbef26-13bd-408d-85e4-c0f9af2dfc5b"
/>
### Why are the changes needed?
Some organization run multiple clusters and have corresponding log
directory for each cluster. So if SHS supports multiple log directories, it can
be used as a single end point to view event logs, which helps such
organizations.
### Does this PR introduce _any_ user-facing change?
Yes but will not affect existing users.
### How was this patch tested?
Manually confirmed WebUI as screenshots above and added new tests.
### Was this patch authored or co-authored using generative AI tooling?
Kiro CLI / Opus 4.6
Closes #54575 from sarutak/shs-multi-log-dirs.
Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Kousuke Saruta <[email protected]>
---
.../spark/ui/static/historypage-template.html | 15 +-
.../org/apache/spark/ui/static/historypage.js | 53 ++-
.../spark/deploy/history/FsHistoryProvider.scala | 356 ++++++++++++++++-----
.../apache/spark/deploy/history/HistoryPage.scala | 18 +-
.../org/apache/spark/internal/config/History.scala | 13 +-
.../scala/org/apache/spark/status/api/v1/api.scala | 4 +-
.../application_list_json_expectation.json | 38 +++
.../completed_app_list_json_expectation.json | 38 +++
.../limit_app_list_json_expectation.json | 6 +
.../maxDate2_app_list_json_expectation.json | 2 +
.../maxDate_app_list_json_expectation.json | 4 +
.../maxEndDate_app_list_json_expectation.json | 14 +
...e_and_maxEndDate_app_list_json_expectation.json | 8 +
.../minDate_app_list_json_expectation.json | 34 ++
...e_and_maxEndDate_app_list_json_expectation.json | 8 +
.../minEndDate_app_list_json_expectation.json | 28 ++
.../one_app_json_expectation.json | 2 +
.../one_app_multi_attempt_json_expectation.json | 4 +
.../deploy/history/FsHistoryProviderSuite.scala | 260 ++++++++++++++-
.../spark/deploy/history/HistoryServerSuite.scala | 22 +-
docs/monitoring.md | 36 +++
project/MimaExcludes.scala | 6 +-
22 files changed, 874 insertions(+), 95 deletions(-)
diff --git
a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html
b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html
index 27daf08fed12..fd9003c4f91b 100644
---
a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html
+++
b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html
@@ -16,7 +16,15 @@
-->
<script id="history-summary-template" type="text/html">
-<table id="history-summary-table" class="table table-striped compact">
+<div class="row mb-2">
+ <div class="col-md-12">
+ <label for="log-path-filter" class="me-2">Filter by Log Source:</label>
+ <select id="log-path-filter" class="form-control d-inline-block w-auto">
+ <option value="">All Sources</option>
+ </select>
+ </div>
+</div>
+<table id="history-summary-table" class="table table-striped table-hover
compact">
<thead>
<tr>
<th>
@@ -34,6 +42,11 @@
App Name
</span>
</th>
+ <th>
+ <span data-bs-toggle="tooltip" title="Log directory where this
application's event log is stored.">
+ Log Source
+ </span>
+ </th>
{{#hasMultipleAttempts}}
<th>
<span data-bs-toggle="tooltip" title="The attempt ID of this
application since one application might be launched several times">
diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage.js
b/core/src/main/resources/org/apache/spark/ui/static/historypage.js
index b8d7cf2154f5..80d9ba050645 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/historypage.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/historypage.js
@@ -188,6 +188,7 @@ $(document).ready(function() {
var startedColumnName = 'started';
var completedColumnName = 'completed';
var durationColumnName = 'duration';
+ var logPathColumnName = 'logPath';
var conf = {
"data": array,
"columns": [
@@ -203,6 +204,18 @@ $(document).ready(function() {
data: 'name',
render: (name) => stringAbbreviate(name, 60)
},
+ {
+ name: logPathColumnName,
+ data: 'logSourceName',
+ render: (logSourceName, type, row) => {
+ if (logSourceName && row && row.logSourceFullPath) {
+ const safeName = escapeHtml(logSourceName);
+ const safePath = escapeHtml(row.logSourceFullPath);
+ return `<span title="${safePath}">${safeName}</span>`;
+ }
+ return '';
+ }
+ },
{
name: attemptIdColumnName,
data: 'attemptId',
@@ -230,7 +243,7 @@ $(document).ready(function() {
],
"aoColumnDefs": [
{
- aTargets: [0, 1, 2],
+ aTargets: [0, 1, 2, 3],
fnCreatedCell: (nTd, _ignored_sData, _ignored_oData,
_ignored_iRow, _ignored_iCol) => {
if (hasMultipleAttempts) {
$(nTd).css('background-color', 'var(--bs-body-bg)');
@@ -246,7 +259,8 @@ $(document).ready(function() {
conf.rowsGroup = [
'appId:name',
'version:name',
- 'appName:name'
+ 'appName:name',
+ 'logPath:name'
];
} else {
conf.columns = removeColumnByName(conf.columns, attemptIdColumnName);
@@ -263,8 +277,41 @@ $(document).ready(function() {
{"searchable": false, "targets": [getColumnIndex(conf.columns,
durationColumnName)]}
];
historySummary.append(apps);
- apps.DataTable(conf);
+ var dataTable = apps.filter("table").DataTable(conf);
sibling.after(historySummary);
+
+ // Populate log path filter dropdown
+ var logPathNames = new Set();
+ array.forEach(function(row) {
+ if (row.logSourceName) {
+ logPathNames.add(row.logSourceName);
+ }
+ });
+
+ var logPathFilter = $('#log-path-filter');
+ Array.from(logPathNames).sort().forEach(function(name) {
+ logPathFilter.append($('<option></option>').val(name).text(name));
+ });
+
+ // Add custom filter function
+ $.fn.dataTable.ext.search.push(function(settings, data, dataIndex,
rowData) {
+ if (settings.nTable.id !== 'history-summary-table') {
+ return true; // Don't filter other tables
+ }
+ var selectedPath = logPathFilter.val();
+ if (!selectedPath) {
+ return true; // Show all if no filter selected
+ }
+ return rowData && typeof rowData.logSourceName === 'string' &&
+ rowData.logSourceName === selectedPath;
+ });
+
+ // Trigger filter on dropdown change
+ logPathFilter.on('change', function() {
+ dataTable.draw();
+ });
+
+ $('#history-summary [data-toggle="tooltip"]').tooltip();
});
});
});
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index c723a8de8c44..5937d0d7ecb7 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -106,7 +106,8 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
// Number of threads used to compact rolling event logs.
private val numCompactThreads = conf.get(History.NUM_COMPACT_THREADS)
- private val logDir = conf.get(History.HISTORY_LOG_DIR)
+ private val logDirs = conf.get(History.HISTORY_LOG_DIR)
+ .split(",").map(_.trim).filter(_.nonEmpty).toSeq
private val historyUiAclsEnable =
conf.get(History.HISTORY_SERVER_UI_ACLS_ENABLE)
private val historyUiAdminAcls =
conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS)
@@ -120,7 +121,44 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
// Visible for testing
- private[history] val fs: FileSystem = new
Path(logDir).getFileSystem(hadoopConf)
+ private[history] val fs: FileSystem = new
Path(logDirs.head).getFileSystem(hadoopConf)
+
+ // Map from logDir to its FileSystem, for multi-path support.
+ // Visible for testing
+ private[history] val logDirFs: Map[String, FileSystem] = logDirs.map { dir =>
+ dir -> new Path(dir).getFileSystem(hadoopConf)
+ }.toMap
+
+ // Map from logDir to display name for UI
+ private val logDirToDisplayName: Map[String, String] = {
+ val names = conf.get(HISTORY_LOG_DIR_NAMES).map(_.split(",",
-1).map(_.trim))
+ names match {
+ case Some(n) if n.length == logDirs.length =>
+ val mapping = logDirs.zip(n).map { case (dir, name) =>
+ dir -> (if (name.nonEmpty) name else dir)
+ }
+ val displayNames = mapping.map(_._2)
+ val duplicates = displayNames.groupBy(identity).filter(_._2.size >
1).keys
+ require(duplicates.isEmpty,
+ s"Duplicate display names found in ${HISTORY_LOG_DIR_NAMES.key}: " +
+ s"${duplicates.mkString(", ")}")
+ mapping.toMap
+ case Some(n) =>
+ logWarning(s"${HISTORY_LOG_DIR_NAMES.key} has ${n.length} names but " +
+ s"${logDirs.length} directories were specified. Falling back to full
paths.")
+ logDirs.map(d => d -> d).toMap
+ case None =>
+ logDirs.map(d => d -> d).toMap
+ }
+ }
+
+ // Normalized logDir paths for efficient and safe matching
+ private val normalizedLogDirs: Seq[(Path, String, String)] = logDirs.map {
dir =>
+ val path = new Path(dir)
+ val dirFs = logDirFs(dir)
+ val normalized = path.makeQualified(dirFs.getUri,
dirFs.getWorkingDirectory)
+ (normalized, logDirToDisplayName(dir), dir)
+ }
// Used by check event thread and clean log thread.
// Scheduled thread pool size must be one, otherwise it will have concurrent
issues about fs
@@ -154,8 +192,12 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
memoryManager = new HistoryServerMemoryManager(conf)
}
- private val fileCompactor = new EventLogFileCompactor(conf, hadoopConf, fs,
- conf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN),
conf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD))
+ private val fileCompactors: Map[String, EventLogFileCompactor] =
logDirFs.map {
+ case (dir, dirFs) =>
+ dir -> new EventLogFileCompactor(conf, hadoopConf, dirFs,
+ conf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN),
+ conf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD))
+ }
// Used to store the paths, which are being processed. This enable the
replay log tasks execute
// asynchronously and make sure that checkForLogs would not process a path
repeatedly.
@@ -276,21 +318,33 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
memoryManager.initialize()
}
- // Validate the log directory.
- val path = new Path(logDir)
- try {
- if (!fs.getFileStatus(path).isDirectory) {
- throw new IllegalArgumentException(
- "Logging directory specified is not a directory: %s".format(logDir))
- }
- } catch {
- case f: FileNotFoundException =>
- var msg = s"Log directory specified does not exist: $logDir"
- if (logDir == DEFAULT_LOG_DIR) {
- msg += " Did you configure the correct one through
spark.history.fs.logDirectory?"
+ // Validate the log directories at startup.
+ // Invalid directories are logged as warnings but not removed from logDirs;
+ // checkForLogsInDir already handles per-directory errors with try-catch.
+ // We only fail if ALL directories are invalid.
+ val validDirs = logDirs.filter { dir =>
+ val path = new Path(dir)
+ val dirFs = logDirFs(dir)
+ try {
+ if (!dirFs.getFileStatus(path).isDirectory) {
+ logWarning(log"Logging directory specified is not a directory: " +
+ log"${MDC(HISTORY_DIR, dir)}, skipping.")
+ false
+ } else {
+ true
}
- throw new FileNotFoundException(msg).initCause(f)
+ } catch {
+ case f: FileNotFoundException =>
+ var msg = s"Log directory specified does not exist: $dir"
+ if (dir == DEFAULT_LOG_DIR) {
+ msg += " Did you configure the correct one through
spark.history.fs.logDirectory?"
+ }
+ logWarning(msg)
+ false
+ }
}
+ require(validDirs.nonEmpty,
+ s"None of the specified log directories exist: ${logDirs.mkString(",
")}")
// Disable the background thread during tests.
if (!conf.contains(IS_TESTING)) {
@@ -402,11 +456,14 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
: ApplicationInfoWrapper = {
val date = new Date(0)
val lastUpdate = new Date()
+ val (logSourceName, logSourceFullPath) = getLogDirInfo(logPath)
val info = ApplicationAttemptInfo(
- attemptId, date, date, lastUpdate, 0, "spark", false, "unknown")
+ attemptId, date, date, lastUpdate, 0, "spark", false, "unknown",
+ Some(logSourceName), Some(logSourceFullPath))
addListing(new ApplicationInfoWrapper(
ApplicationInfo(appId, appId, None, None, None, None, List.empty),
- List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None,
None))))
+ List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None,
None,
+ logSourceName, logSourceFullPath))))
load(appId)
}
@@ -432,7 +489,10 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
} else {
Map()
}
- Map("Event log directory" -> logDir) ++ safeMode ++ driverLog
+ Map("Event log directory" -> logDirs.map { dir =>
+ val name = logDirToDisplayName(dir)
+ if (name != dir) s"$name ($dir)" else dir
+ }.mkString(", ")) ++ safeMode ++ driverLog
}
override def start(): Unit = {
@@ -499,18 +559,53 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
* from running for too long which blocks updating newly appeared eventlog
files.
*/
private[history] def checkForLogs(): Unit = {
+ val newLastScanTime = clock.getTimeMillis()
+ val allNotStale = mutable.HashSet[String]()
+
+ logDirs.foreach { dir =>
+ try {
+ checkForLogsInDir(dir, newLastScanTime, allNotStale)
+ } catch {
+ case e: IOException =>
+ logError(log"Error checking for logs in directory ${MDC(HISTORY_DIR,
dir)}", e)
+ }
+ }
+
+ // Delete all information about applications whose log files disappeared
from storage.
+ // This is done after scanning ALL directories to avoid incorrectly
marking entries from
+ // other directories as stale.
+ val stale = listing.synchronized {
+ KVUtils.viewToSeq(listing.view(classOf[LogInfo])
+ .index("lastProcessed")
+ .last(newLastScanTime - 1))
+ }
+ stale.filterNot(isProcessing)
+ .filterNot(info => allNotStale.contains(info.logPath))
+ .foreach { log =>
+ log.appId.foreach { appId =>
+ cleanAppData(appId, log.attemptId, log.logPath)
+ listing.delete(classOf[LogInfo], log.logPath)
+ }
+ }
+
+ lastScanTime.set(newLastScanTime)
+ }
+
+ private def checkForLogsInDir(
+ dir: String,
+ newLastScanTime: Long,
+ notStale: mutable.HashSet[String]): Unit = {
+ val dirFs = logDirFs(dir)
var count: Int = 0
try {
- val newLastScanTime = clock.getTimeMillis()
- logDebug(log"Scanning ${MDC(HISTORY_DIR, logDir)} with " +
+ logDebug(log"Scanning ${MDC(HISTORY_DIR, dir)} with " +
log"lastScanTime=${MDC(LAST_SCAN_TIME, lastScanTime)}")
// Mark entries that are processing as not stale. Such entries do not
have a chance to be
// updated with the new 'lastProcessed' time and thus any entity that
completes processing
// right after this check and before the check for stale entities will
be identified as stale
// and will be deleted from the UI until the next 'checkForLogs' run.
- val notStale = mutable.HashSet[String]()
- val updated = Option(fs.listStatus(new Path(logDir)))
+ val updated = Option(dirFs.listStatus(new Path(dir)))
.map(_.toImmutableArraySeq).getOrElse(Seq.empty)
.filter { entry => isAccessible(entry.getPath) }
.filter { entry =>
@@ -521,7 +616,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
true
}
}
- .flatMap { entry => EventLogFileReader(fs, entry) }
+ .flatMap { entry => EventLogFileReader(dirFs, entry) }
.filter { reader =>
try {
reader.modificationTime
@@ -570,7 +665,9 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
attempt.adminAcls,
attempt.viewAcls,
attempt.adminAclsGroups,
- attempt.viewAclsGroups)
+ attempt.viewAclsGroups,
+ attempt.logSourceName,
+ attempt.logSourceFullPath)
} else {
attempt
}
@@ -597,7 +694,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
if (conf.get(CLEANER_ENABLED) && reader.modificationTime <
clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000) {
logInfo(log"Deleting expired event log ${MDC(PATH,
reader.rootPath.toString)}")
- deleteLog(fs, reader.rootPath)
+ deleteLog(dirFs, reader.rootPath)
// If the LogInfo read had succeeded, but the
ApplicationInafoWrapper
// read failure and throw the exception, we should also
cleanup the log
// info from listing db.
@@ -637,27 +734,6 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
}
}
- // Delete all information about applications whose log files disappeared
from storage.
- // This is done by identifying the event logs which were not touched by
the current
- // directory scan.
- //
- // Only entries with valid applications are cleaned up here. Cleaning up
invalid log
- // files is done by the periodic cleaner task.
- val stale = listing.synchronized {
- KVUtils.viewToSeq(listing.view(classOf[LogInfo])
- .index("lastProcessed")
- .last(newLastScanTime - 1))
- }
- stale.filterNot(isProcessing)
- .filterNot(info => notStale.contains(info.logPath))
- .foreach { log =>
- log.appId.foreach { appId =>
- cleanAppData(appId, log.attemptId, log.logPath)
- listing.delete(classOf[LogInfo], log.logPath)
- }
- }
-
- lastScanTime.set(newLastScanTime)
} catch {
case e: Exception => logError("Exception in checking for event log
updates", e)
}
@@ -743,8 +819,8 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
.map { id => app.attempts.filter(_.info.attemptId == Some(id)) }
.getOrElse(app.attempts)
.foreach { attempt =>
- val reader = EventLogFileReader(fs, new Path(logDir,
attempt.logPath),
- attempt.lastIndex)
+ val (logFs, logPath) = resolveLogPath(attempt.logPath,
attempt.logSourceFullPath)
+ val reader = EventLogFileReader(logFs, logPath, attempt.lastIndex)
reader.zipEventLogFiles(zipStream)
}
} finally {
@@ -752,6 +828,119 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
}
}
+ /**
+ * Resolve the log path by searching across all log directories.
+ * Returns the FileSystem and full Path for the given log path name.
+ *
+ * If logSourceFullPath is available, it is used first to avoid ambiguity
+ * when the same filename exists in multiple directories.
+ * Falls back to scanning all directories if the source path is not found.
+ */
+ private def resolveLogPath(
+ logPathName: String, logSourceFullPath: String): (FileSystem, Path) = {
+ // Try the known source directory first
+ if (logSourceFullPath.nonEmpty) {
+ try {
+ val sourceFs = logDirFs(logSourceFullPath)
+ val fullPath = new Path(logSourceFullPath, logPathName)
+ if (sourceFs.exists(fullPath)) {
+ return (sourceFs, fullPath)
+ }
+ } catch {
+ case _: Exception => // fall through to scan
+ }
+ }
+
+ // Fall back to scanning all directories
+ logDirs.iterator.map { dir =>
+ val dirFs = logDirFs(dir)
+ val fullPath = new Path(dir, logPathName)
+ (dirFs, fullPath)
+ }.find { case (dirFs, fullPath) =>
+ try {
+ dirFs.exists(fullPath)
+ } catch {
+ case _: Exception => false
+ }
+ }.getOrElse {
+ // Fall back to the first directory (preserves original behavior)
+ (fs, new Path(logDirs.head, logPathName))
+ }
+ }
+
+ /**
+ * Find the log directory string that contains the given path.
+ * Falls back to the first directory if no match is found.
+ */
+ private def logDirForPath(path: Path): String = {
+ // Try normalized path matching first (handles scheme differences like
file:// vs raw path)
+ normalizedLogDirs.find { case (dirPath, _, _) =>
+ isDescendantOf(path, dirPath)
+ }.orElse {
+ // Fallback: string-based matching for non-normalized paths
+ val pathStr = path.toUri.getPath
+ normalizedLogDirs.find { case (dirPath, _, _) =>
+ val dirStr = dirPath.toUri.getPath
+ pathStr == dirStr || pathStr.startsWith(dirStr.stripSuffix("/") + "/")
+ }
+ }.map(_._3).getOrElse(logDirs.head)
+ }
+
+ /**
+ * Get the FileSystem for a full path by matching against known log
directories.
+ * Falls back to the default FileSystem if no match is found.
+ */
+ private def fsForPath(path: Path): FileSystem = {
+ logDirFs(logDirForPath(path))
+ }
+
+ /**
+ * Get the display name and full path for a log path.
+ * Returns (displayName, fullPath) tuple.
+ * Uses FileSystem path resolution for security and handles symbolic links
properly.
+ */
+ private[history] def getLogDirInfo(logPath: String): (String, String) = {
+ try {
+ val path = new Path(logPath)
+ val logPathFs = fsForPath(path)
+ val normalized = path.makeQualified(logPathFs.getUri,
logPathFs.getWorkingDirectory)
+
+ // Find the matching log directory
+ normalizedLogDirs.find { case (dirPath, _, _) =>
+ isDescendantOf(normalized, dirPath)
+ } match {
+ case Some((_, displayName, fullPath)) => (displayName, fullPath)
+ case None =>
+ // Fallback to first directory
+ (logDirToDisplayName(logDirs.head), logDirs.head)
+ }
+ } catch {
+ case e: Exception =>
+ logWarning(log"Failed to resolve log dir info for path ${MDC(PATH,
logPath)}: " +
+ log"${MDC(ERROR, e.getMessage)}")
+ (logDirToDisplayName(logDirs.head), logDirs.head)
+ }
+ }
+
+ /**
+ * Check if a path is a descendant of another path.
+ * Handles both exact match and subdirectory cases safely.
+ */
+ private def isDescendantOf(path: Path, ancestor: Path): Boolean = {
+ if (path.equals(ancestor)) {
+ true
+ } else {
+ var current = path.getParent
+ while (current != null) {
+ if (current.equals(ancestor)) {
+ return true
+ }
+ current = current.getParent
+ }
+ false
+ }
+ }
+
private def mergeApplicationListing(
reader: EventLogFileReader,
scanTime: Long,
@@ -788,7 +977,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
if reader.rootPath.getName.endsWith(EventLogFileWriter.IN_PROGRESS)
=>
val finalFileName =
reader.rootPath.getName.stripSuffix(EventLogFileWriter.IN_PROGRESS)
val finalFilePath = new Path(reader.rootPath.getParent, finalFileName)
- if (fs.exists(finalFilePath)) {
+ if (fsForPath(finalFilePath).exists(finalFilePath)) {
// Do nothing, the application completed during processing, the
final event log file
// will be processed by next around.
} else {
@@ -844,7 +1033,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
((!appCompleted && fastInProgressParsing) || reparseChunkSize > 0)
val bus = new ReplayListenerBus()
- val listener = new AppListingListener(reader, clock, shouldHalt)
+ val listener = new AppListingListener(reader, clock, shouldHalt, this)
bus.addListener(listener)
logInfo(log"Parsing ${MDC(PATH, logPath)} for listing data...")
@@ -872,7 +1061,8 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
val lookForEndEvent = shouldHalt && (appCompleted ||
!fastInProgressParsing)
if (lookForEndEvent && listener.applicationInfo.isDefined) {
val lastFile = logFiles.last
- Utils.tryWithResource(EventLogFileReader.openEventLog(lastFile.getPath,
fs)) { in =>
+ Utils.tryWithResource(EventLogFileReader.openEventLog(lastFile.getPath,
+ fsForPath(lastFile.getPath))) { in =>
val target = lastFile.getLen - reparseChunkSize
if (target > 0) {
logInfo(log"Looking for end event; skipping ${MDC(NUM_BYTES,
target)} bytes" +
@@ -916,7 +1106,8 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
try {
// Fetch the entry first to avoid an RPC when it's already removed.
listing.read(classOf[LogInfo], inProgressLog)
- if (!SparkHadoopUtil.isFile(fs, new Path(inProgressLog))) {
+ if (!SparkHadoopUtil.isFile(fsForPath(new Path(inProgressLog)),
+ new Path(inProgressLog))) {
listing.synchronized {
listing.delete(classOf[LogInfo], inProgressLog)
}
@@ -955,7 +1146,8 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
if (info.lastEvaluatedForCompaction.isEmpty ||
info.lastEvaluatedForCompaction.get < lastIndex) {
// haven't tried compaction for this index, do compaction
- fileCompactor.compact(reader.listEventLogFiles)
+ val compactor = fileCompactors(logDirForPath(reader.rootPath))
+ compactor.compact(reader.listEventLogFiles)
listing.write(info.copy(lastEvaluatedForCompaction =
Some(lastIndex)))
}
} catch {
@@ -1000,7 +1192,8 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
if (log.lastProcessed <= maxTime && log.appId.isEmpty) {
logInfo(log"Deleting invalid / corrupt event log ${MDC(PATH,
log.logPath)}")
- deleteLog(fs, new Path(log.logPath))
+ val logPath = new Path(log.logPath)
+ deleteLog(fsForPath(logPath), logPath)
listing.delete(classOf[LogInfo], log.logPath)
}
@@ -1042,7 +1235,8 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
stale.filterNot(isProcessing).foreach { log =>
if (log.appId.isEmpty) {
logInfo(log"Deleting invalid / corrupt event log ${MDC(PATH,
log.logPath)}")
- deleteLog(fs, new Path(log.logPath))
+ val logPath = new Path(log.logPath)
+ deleteLog(fsForPath(logPath), logPath)
listing.delete(classOf[LogInfo], log.logPath)
}
}
@@ -1083,10 +1277,9 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
var countDeleted = 0
toDelete.foreach { attempt =>
logInfo(log"Deleting expired event log for ${MDC(PATH,
attempt.logPath)}")
- val logPath = new Path(logDir, attempt.logPath)
- listing.delete(classOf[LogInfo], logPath.toString())
+ val (logFs, logPath) = resolveLogPath(attempt.logPath,
attempt.logSourceFullPath)
cleanAppData(app.id, attempt.info.attemptId, logPath.toString())
- if (deleteLog(fs, logPath)) {
+ if (deleteLog(logFs, logPath)) {
countDeleted += 1
}
}
@@ -1194,7 +1387,8 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
var continueReplay = true
logFiles.foreach { file =>
if (continueReplay) {
- Utils.tryWithResource(EventLogFileReader.openEventLog(file.getPath,
fs)) { in =>
+ Utils.tryWithResource(EventLogFileReader.openEventLog(file.getPath,
+ fsForPath(file.getPath))) { in =>
continueReplay = replayBus.replay(in, file.getPath.toString,
maybeTruncated = maybeTruncated, eventsFilter = eventsFilter)
}
@@ -1208,11 +1402,9 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
* Note that DistributedFileSystem is a `@LimitedPrivate` class, which for
all practical reasons
* makes it more public than not.
*/
- private[history] def isFsInSafeMode(): Boolean = fs match {
- case dfs: DistributedFileSystem =>
- isFsInSafeMode(dfs)
- case _ =>
- false
+ private[history] def isFsInSafeMode(): Boolean = logDirFs.values.exists {
+ case dfs: DistributedFileSystem => isFsInSafeMode(dfs)
+ case _ => false
}
private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
@@ -1226,7 +1418,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
*/
override def toString: String = {
val count = listing.count(classOf[ApplicationInfoWrapper])
- s"""|FsHistoryProvider{logdir=$logDir,
+ s"""|FsHistoryProvider{logdirs=${logDirs.mkString(",")},
| storedir=$storePath,
| last scan time=$lastScanTime
| application count=$count}""".stripMargin
@@ -1314,7 +1506,8 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
metadata: AppStatusStoreMetadata): KVStore = {
var retried = false
var hybridStore: HybridStore = null
- val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+ val (logFs, logPath) = resolveLogPath(attempt.logPath,
attempt.logSourceFullPath)
+ val reader = EventLogFileReader(logFs, logPath,
attempt.lastIndex)
// Use InMemoryStore to rebuild app store
@@ -1389,7 +1582,8 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
var retried = false
var newStorePath: File = null
while (newStorePath == null) {
- val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+ val (logFs, logPath) = resolveLogPath(attempt.logPath,
attempt.logSourceFullPath)
+ val reader = EventLogFileReader(logFs, logPath,
attempt.lastIndex)
val isCompressed = reader.compressionCodec.isDefined
logInfo(log"Leasing disk manager space for app" +
@@ -1424,7 +1618,8 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
while (store == null) {
try {
val s = new InMemoryStore()
- val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+ val (logFs, logPath) = resolveLogPath(attempt.logPath,
attempt.logSourceFullPath)
+ val reader = EventLogFileReader(logFs, logPath,
attempt.lastIndex)
rebuildAppStore(s, reader, attempt.info.lastUpdated.getTime())
store = s
@@ -1555,7 +1750,9 @@ private[history] class AttemptInfoWrapper(
val adminAcls: Option[String],
val viewAcls: Option[String],
val adminAclsGroups: Option[String],
- val viewAclsGroups: Option[String])
+ val viewAclsGroups: Option[String],
+ val logSourceName: String,
+ val logSourceFullPath: String)
private[history] class ApplicationInfoWrapper(
val info: ApplicationInfo,
@@ -1577,11 +1774,12 @@ private[history] class ApplicationInfoWrapper(
private[history] class AppListingListener(
reader: EventLogFileReader,
clock: Clock,
- haltEnabled: Boolean) extends SparkListener {
+ haltEnabled: Boolean,
+ provider: FsHistoryProvider) extends SparkListener {
private val app = new MutableApplicationInfo()
private val attempt = new MutableAttemptInfo(reader.rootPath.getName(),
- reader.fileSizeForLastIndex, reader.lastIndex)
+ reader.rootPath.toString(), reader.fileSizeForLastIndex, reader.lastIndex)
private var gotEnvUpdate = false
private var halted = false
@@ -1661,7 +1859,8 @@ private[history] class AppListingListener(
}
- private class MutableAttemptInfo(logPath: String, fileSize: Long, lastIndex:
Option[Long]) {
+ private class MutableAttemptInfo(logPath: String, fullLogPath: String,
+ fileSize: Long, lastIndex: Option[Long]) {
var attemptId: Option[String] = None
var startTime = new Date(-1)
var endTime = new Date(-1)
@@ -1677,6 +1876,7 @@ private[history] class AppListingListener(
var viewAclsGroups: Option[String] = None
def toView(): AttemptInfoWrapper = {
+ val (logSourceName, logSourceFullPath) =
provider.getLogDirInfo(fullLogPath)
val apiInfo = ApplicationAttemptInfo(
attemptId,
startTime,
@@ -1685,7 +1885,9 @@ private[history] class AppListingListener(
duration,
sparkUser,
completed,
- appSparkVersion)
+ appSparkVersion,
+ Some(logSourceName),
+ Some(logSourceFullPath))
new AttemptInfoWrapper(
apiInfo,
logPath,
@@ -1694,7 +1896,9 @@ private[history] class AppListingListener(
adminAcls,
viewAcls,
adminAclsGroups,
- viewAclsGroups)
+ viewAclsGroups,
+ logSourceName,
+ logSourceFullPath)
}
}
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index f0131ba0081f..ec918e10c0ec 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -39,7 +39,23 @@ private[history] class HistoryPage(parent: HistoryServer)
extends WebUIPage("")
val summary =
<div class="container-fluid">
<ul class="list-unstyled">
- {providerConfig.map { case (k, v) => <li><strong>{k}:</strong>
{v}</li> }}
+ {providerConfig.map { case (k, v) =>
+ if (k == "Event log directory" && v.contains(",")) {
+ val dirs = v.split(",").map(_.trim)
+ <li>
+ <strong>{k}:</strong> {dirs.length} directories
+ <a class="ms-1" data-bs-toggle="collapse" href="#logDirList"
role="button"
+ aria-expanded="false" aria-controls="logDirList">
+ (show)
+ </a>
+ <ul class="collapse mt-1" id="logDirList">
+ {dirs.map(d => <li>{d}</li>)}
+ </ul>
+ </li>
+ } else {
+ <li><strong>{k}:</strong> {v}</li>
+ }
+ }}
</ul>
{
if (eventLogsUnderProcessCount > 0) {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala
b/core/src/main/scala/org/apache/spark/internal/config/History.scala
index d44d3c32dfef..e12f0b8eeaad 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/History.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala
@@ -28,10 +28,21 @@ private[spark] object History {
val HISTORY_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
.version("1.1.0")
- .doc("Directory where app logs are stored")
+ .doc("Directory where app logs are stored. Multiple directories can be
specified " +
+ "as a comma-separated list to monitor event logs from multiple paths.")
.stringConf
+ .checkValue(v => v.split(",").map(_.trim).exists(_.nonEmpty),
+ "must specify at least one non-empty directory")
.createWithDefault(DEFAULT_LOG_DIR)
+ val HISTORY_LOG_DIR_NAMES =
ConfigBuilder("spark.history.fs.logDirectory.names")
+ .version("4.2.0")
+ .doc("Optional comma-separated list of display names for the log
directories specified " +
+ "in spark.history.fs.logDirectory. Names correspond to directories by
position. " +
+ "If not set, the full path is shown in the UI.")
+ .stringConf
+ .createOptional
+
val SAFEMODE_CHECK_INTERVAL_S =
ConfigBuilder("spark.history.fs.safemodeCheck.interval")
.version("1.6.0")
.doc("Interval between HDFS safemode checks for the event log directory")
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 6ae1dce57f31..f3fb916f0820 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -53,7 +53,9 @@ case class ApplicationAttemptInfo private[spark](
duration: Long,
sparkUser: String,
completed: Boolean = false,
- appSparkVersion: String) {
+ appSparkVersion: String,
+ logSourceName: Option[String] = None,
+ logSourceFullPath: Option[String] = None) {
def getStartTimeEpoch: Long = startTime.getTime
diff --git
a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
index 5bda5747c187..ac6e5c63c0b6 100644
---
a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
+++
b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
@@ -1,6 +1,8 @@
[ {
"attempts" : [ {
"appSparkVersion" : "4.2.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 2006,
"endTime" : "2025-12-27T14:15:12.221GMT",
@@ -16,6 +18,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.3.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 86045,
"endTime" : "2022-01-13T02:05:36.564GMT",
@@ -31,6 +35,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.1.1.119",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 274875,
"endTime" : "2021-08-10T23:29:30.208GMT",
@@ -46,6 +52,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.1.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 363996,
"endTime" : "2020-07-07T03:17:04.231GMT",
@@ -61,6 +69,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.0.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 139764,
"endTime" : "2020-01-11T17:46:42.615GMT",
@@ -76,6 +86,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.0.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 18794,
"endTime" : "2019-07-02T21:02:35.974GMT",
@@ -91,6 +103,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.0.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 62168,
"endTime" : "2019-04-08T20:40:46.454GMT",
@@ -106,6 +120,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.3.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 472819,
"endTime" : "2018-01-18T18:38:27.938GMT",
@@ -121,6 +137,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.3.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 535234,
"endTime" : "2018-01-09T10:24:37.606GMT",
@@ -136,6 +154,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.1.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10671,
"endTime" : "2016-11-16T22:33:40.587GMT",
@@ -151,6 +171,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.1.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 101795,
"endTime" : "2016-11-15T23:22:18.874GMT",
@@ -166,6 +188,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "1.4.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10505,
"endTime" : "2015-05-06T13:03:11.398GMT",
@@ -182,6 +206,8 @@
"attempts" : [ {
"appSparkVersion" : "1.4.0-SNAPSHOT",
"attemptId" : "2",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 57,
"endTime" : "2015-05-06T13:03:00.950GMT",
@@ -194,6 +220,8 @@
}, {
"appSparkVersion" : "1.4.0-SNAPSHOT",
"attemptId" : "1",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10,
"endTime" : "2015-05-06T13:03:00.890GMT",
@@ -210,6 +238,8 @@
"attempts" : [ {
"appSparkVersion" : "",
"attemptId" : "2",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 34935,
"endTime" : "2015-03-17T23:12:25.177GMT",
@@ -222,6 +252,8 @@
}, {
"appSparkVersion" : "",
"attemptId" : "1",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 34935,
"endTime" : "2015-03-16T19:25:45.177GMT",
@@ -237,6 +269,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 8635,
"endTime" : "2015-02-28T00:02:46.912GMT",
@@ -252,6 +286,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 9011,
"endTime" : "2015-02-03T16:43:08.731GMT",
@@ -267,6 +303,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 8635,
"endTime" : "2015-02-03T16:42:46.912GMT",
diff --git
a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
index 5bda5747c187..ac6e5c63c0b6 100644
---
a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
+++
b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
@@ -1,6 +1,8 @@
[ {
"attempts" : [ {
"appSparkVersion" : "4.2.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 2006,
"endTime" : "2025-12-27T14:15:12.221GMT",
@@ -16,6 +18,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.3.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 86045,
"endTime" : "2022-01-13T02:05:36.564GMT",
@@ -31,6 +35,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.1.1.119",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 274875,
"endTime" : "2021-08-10T23:29:30.208GMT",
@@ -46,6 +52,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.1.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 363996,
"endTime" : "2020-07-07T03:17:04.231GMT",
@@ -61,6 +69,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.0.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 139764,
"endTime" : "2020-01-11T17:46:42.615GMT",
@@ -76,6 +86,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.0.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 18794,
"endTime" : "2019-07-02T21:02:35.974GMT",
@@ -91,6 +103,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.0.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 62168,
"endTime" : "2019-04-08T20:40:46.454GMT",
@@ -106,6 +120,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.3.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 472819,
"endTime" : "2018-01-18T18:38:27.938GMT",
@@ -121,6 +137,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.3.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 535234,
"endTime" : "2018-01-09T10:24:37.606GMT",
@@ -136,6 +154,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.1.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10671,
"endTime" : "2016-11-16T22:33:40.587GMT",
@@ -151,6 +171,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.1.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 101795,
"endTime" : "2016-11-15T23:22:18.874GMT",
@@ -166,6 +188,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "1.4.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10505,
"endTime" : "2015-05-06T13:03:11.398GMT",
@@ -182,6 +206,8 @@
"attempts" : [ {
"appSparkVersion" : "1.4.0-SNAPSHOT",
"attemptId" : "2",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 57,
"endTime" : "2015-05-06T13:03:00.950GMT",
@@ -194,6 +220,8 @@
}, {
"appSparkVersion" : "1.4.0-SNAPSHOT",
"attemptId" : "1",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10,
"endTime" : "2015-05-06T13:03:00.890GMT",
@@ -210,6 +238,8 @@
"attempts" : [ {
"appSparkVersion" : "",
"attemptId" : "2",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 34935,
"endTime" : "2015-03-17T23:12:25.177GMT",
@@ -222,6 +252,8 @@
}, {
"appSparkVersion" : "",
"attemptId" : "1",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 34935,
"endTime" : "2015-03-16T19:25:45.177GMT",
@@ -237,6 +269,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 8635,
"endTime" : "2015-02-28T00:02:46.912GMT",
@@ -252,6 +286,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 9011,
"endTime" : "2015-02-03T16:43:08.731GMT",
@@ -267,6 +303,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 8635,
"endTime" : "2015-02-03T16:42:46.912GMT",
diff --git
a/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
b/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
index 1f03d9f97395..06ef3a5ecc9f 100644
---
a/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
+++
b/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
@@ -1,6 +1,8 @@
[ {
"attempts" : [ {
"appSparkVersion" : "4.2.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 2006,
"endTime" : "2025-12-27T14:15:12.221GMT",
@@ -16,6 +18,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.3.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 86045,
"endTime" : "2022-01-13T02:05:36.564GMT",
@@ -31,6 +35,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.1.1.119",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 274875,
"endTime" : "2021-08-10T23:29:30.208GMT",
diff --git
a/core/src/test/resources/HistoryServerExpectations/maxDate2_app_list_json_expectation.json
b/core/src/test/resources/HistoryServerExpectations/maxDate2_app_list_json_expectation.json
index ce928dca9838..b496b60f114a 100644
---
a/core/src/test/resources/HistoryServerExpectations/maxDate2_app_list_json_expectation.json
+++
b/core/src/test/resources/HistoryServerExpectations/maxDate2_app_list_json_expectation.json
@@ -1,6 +1,8 @@
[ {
"attempts" : [ {
"appSparkVersion" : "",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 8635,
"endTime" : "2015-02-03T16:42:46.912GMT",
diff --git
a/core/src/test/resources/HistoryServerExpectations/maxDate_app_list_json_expectation.json
b/core/src/test/resources/HistoryServerExpectations/maxDate_app_list_json_expectation.json
index a747bc66844a..c9610d5feeb6 100644
---
a/core/src/test/resources/HistoryServerExpectations/maxDate_app_list_json_expectation.json
+++
b/core/src/test/resources/HistoryServerExpectations/maxDate_app_list_json_expectation.json
@@ -1,6 +1,8 @@
[ {
"attempts" : [ {
"appSparkVersion" : "",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 9011,
"endTime" : "2015-02-03T16:43:08.731GMT",
@@ -16,6 +18,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 8635,
"endTime" : "2015-02-03T16:42:46.912GMT",
diff --git
a/core/src/test/resources/HistoryServerExpectations/maxEndDate_app_list_json_expectation.json
b/core/src/test/resources/HistoryServerExpectations/maxEndDate_app_list_json_expectation.json
index 5d65fff3ebad..5088134a3169 100644
---
a/core/src/test/resources/HistoryServerExpectations/maxEndDate_app_list_json_expectation.json
+++
b/core/src/test/resources/HistoryServerExpectations/maxEndDate_app_list_json_expectation.json
@@ -2,6 +2,8 @@
"attempts" : [ {
"appSparkVersion" : "1.4.0-SNAPSHOT",
"attemptId" : "2",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 57,
"endTime" : "2015-05-06T13:03:00.950GMT",
@@ -14,6 +16,8 @@
}, {
"appSparkVersion" : "1.4.0-SNAPSHOT",
"attemptId" : "1",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10,
"endTime" : "2015-05-06T13:03:00.890GMT",
@@ -30,6 +34,8 @@
"attempts" : [ {
"appSparkVersion" : "",
"attemptId" : "2",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 34935,
"endTime" : "2015-03-17T23:12:25.177GMT",
@@ -42,6 +48,8 @@
}, {
"appSparkVersion" : "",
"attemptId" : "1",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 34935,
"endTime" : "2015-03-16T19:25:45.177GMT",
@@ -57,6 +65,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 8635,
"endTime" : "2015-02-28T00:02:46.912GMT",
@@ -72,6 +82,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 9011,
"endTime" : "2015-02-03T16:43:08.731GMT",
@@ -87,6 +99,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 8635,
"endTime" : "2015-02-03T16:42:46.912GMT",
diff --git
a/core/src/test/resources/HistoryServerExpectations/minDate_and_maxEndDate_app_list_json_expectation.json
b/core/src/test/resources/HistoryServerExpectations/minDate_and_maxEndDate_app_list_json_expectation.json
index 8e09a800fae2..8b974582d717 100644
---
a/core/src/test/resources/HistoryServerExpectations/minDate_and_maxEndDate_app_list_json_expectation.json
+++
b/core/src/test/resources/HistoryServerExpectations/minDate_and_maxEndDate_app_list_json_expectation.json
@@ -2,6 +2,8 @@
"attempts" : [ {
"appSparkVersion" : "1.4.0-SNAPSHOT",
"attemptId" : "2",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 57,
"endTime" : "2015-05-06T13:03:00.950GMT",
@@ -14,6 +16,8 @@
}, {
"appSparkVersion" : "1.4.0-SNAPSHOT",
"attemptId" : "1",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10,
"endTime" : "2015-05-06T13:03:00.890GMT",
@@ -30,6 +34,8 @@
"attempts" : [ {
"appSparkVersion" : "",
"attemptId" : "2",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 34935,
"endTime" : "2015-03-17T23:12:25.177GMT",
@@ -42,6 +48,8 @@
}, {
"appSparkVersion" : "",
"attemptId" : "1",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 34935,
"endTime" : "2015-03-16T19:25:45.177GMT",
diff --git
a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
index 3e9c4b1d0f30..a1d2f9e9ef4d 100644
---
a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
+++
b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
@@ -1,6 +1,8 @@
[ {
"attempts" : [ {
"appSparkVersion" : "4.2.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 2006,
"endTime" : "2025-12-27T14:15:12.221GMT",
@@ -16,6 +18,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.3.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 86045,
"endTime" : "2022-01-13T02:05:36.564GMT",
@@ -31,6 +35,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.1.1.119",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 274875,
"endTime" : "2021-08-10T23:29:30.208GMT",
@@ -46,6 +52,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.1.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 363996,
"endTime" : "2020-07-07T03:17:04.231GMT",
@@ -61,6 +69,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.0.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 139764,
"endTime" : "2020-01-11T17:46:42.615GMT",
@@ -76,6 +86,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.0.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 18794,
"endTime" : "2019-07-02T21:02:35.974GMT",
@@ -91,6 +103,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.0.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 62168,
"endTime" : "2019-04-08T20:40:46.454GMT",
@@ -106,6 +120,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.3.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 472819,
"endTime" : "2018-01-18T18:38:27.938GMT",
@@ -121,6 +137,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.3.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 535234,
"endTime" : "2018-01-09T10:24:37.606GMT",
@@ -136,6 +154,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.1.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10671,
"endTime" : "2016-11-16T22:33:40.587GMT",
@@ -151,6 +171,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.1.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 101795,
"endTime" : "2016-11-15T23:22:18.874GMT",
@@ -166,6 +188,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "1.4.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10505,
"endTime" : "2015-05-06T13:03:11.398GMT",
@@ -182,6 +206,8 @@
"attempts" : [ {
"appSparkVersion" : "1.4.0-SNAPSHOT",
"attemptId" : "2",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 57,
"endTime" : "2015-05-06T13:03:00.950GMT",
@@ -194,6 +220,8 @@
}, {
"appSparkVersion" : "1.4.0-SNAPSHOT",
"attemptId" : "1",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10,
"endTime" : "2015-05-06T13:03:00.890GMT",
@@ -210,6 +238,8 @@
"attempts" : [ {
"appSparkVersion" : "",
"attemptId" : "2",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 34935,
"endTime" : "2015-03-17T23:12:25.177GMT",
@@ -222,6 +252,8 @@
}, {
"appSparkVersion" : "",
"attemptId" : "1",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 34935,
"endTime" : "2015-03-16T19:25:45.177GMT",
@@ -237,6 +269,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 8635,
"endTime" : "2015-02-28T00:02:46.912GMT",
diff --git
a/core/src/test/resources/HistoryServerExpectations/minEndDate_and_maxEndDate_app_list_json_expectation.json
b/core/src/test/resources/HistoryServerExpectations/minEndDate_and_maxEndDate_app_list_json_expectation.json
index 8e09a800fae2..8b974582d717 100644
---
a/core/src/test/resources/HistoryServerExpectations/minEndDate_and_maxEndDate_app_list_json_expectation.json
+++
b/core/src/test/resources/HistoryServerExpectations/minEndDate_and_maxEndDate_app_list_json_expectation.json
@@ -2,6 +2,8 @@
"attempts" : [ {
"appSparkVersion" : "1.4.0-SNAPSHOT",
"attemptId" : "2",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 57,
"endTime" : "2015-05-06T13:03:00.950GMT",
@@ -14,6 +16,8 @@
}, {
"appSparkVersion" : "1.4.0-SNAPSHOT",
"attemptId" : "1",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10,
"endTime" : "2015-05-06T13:03:00.890GMT",
@@ -30,6 +34,8 @@
"attempts" : [ {
"appSparkVersion" : "",
"attemptId" : "2",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 34935,
"endTime" : "2015-03-17T23:12:25.177GMT",
@@ -42,6 +48,8 @@
}, {
"appSparkVersion" : "",
"attemptId" : "1",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 34935,
"endTime" : "2015-03-16T19:25:45.177GMT",
diff --git
a/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
b/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
index 8c938661d7b2..413660c4b689 100644
---
a/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
+++
b/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
@@ -1,6 +1,8 @@
[ {
"attempts" : [ {
"appSparkVersion" : "4.2.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 2006,
"endTime" : "2025-12-27T14:15:12.221GMT",
@@ -16,6 +18,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.3.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 86045,
"endTime" : "2022-01-13T02:05:36.564GMT",
@@ -31,6 +35,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.1.1.119",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 274875,
"endTime" : "2021-08-10T23:29:30.208GMT",
@@ -46,6 +52,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.1.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 363996,
"endTime" : "2020-07-07T03:17:04.231GMT",
@@ -61,6 +69,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.0.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 139764,
"endTime" : "2020-01-11T17:46:42.615GMT",
@@ -76,6 +86,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.0.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 18794,
"endTime" : "2019-07-02T21:02:35.974GMT",
@@ -91,6 +103,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "3.0.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 62168,
"endTime" : "2019-04-08T20:40:46.454GMT",
@@ -106,6 +120,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.3.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 472819,
"endTime" : "2018-01-18T18:38:27.938GMT",
@@ -121,6 +137,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.3.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 535234,
"endTime" : "2018-01-09T10:24:37.606GMT",
@@ -136,6 +154,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.1.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10671,
"endTime" : "2016-11-16T22:33:40.587GMT",
@@ -151,6 +171,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "2.1.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 101795,
"endTime" : "2016-11-15T23:22:18.874GMT",
@@ -166,6 +188,8 @@
}, {
"attempts" : [ {
"appSparkVersion" : "1.4.0-SNAPSHOT",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10505,
"endTime" : "2015-05-06T13:03:11.398GMT",
@@ -182,6 +206,8 @@
"attempts" : [ {
"appSparkVersion" : "1.4.0-SNAPSHOT",
"attemptId" : "2",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 57,
"endTime" : "2015-05-06T13:03:00.950GMT",
@@ -194,6 +220,8 @@
}, {
"appSparkVersion" : "1.4.0-SNAPSHOT",
"attemptId" : "1",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 10,
"endTime" : "2015-05-06T13:03:00.890GMT",
diff --git
a/core/src/test/resources/HistoryServerExpectations/one_app_json_expectation.json
b/core/src/test/resources/HistoryServerExpectations/one_app_json_expectation.json
index 7e375af0ee9e..5b0ce588b563 100644
---
a/core/src/test/resources/HistoryServerExpectations/one_app_json_expectation.json
+++
b/core/src/test/resources/HistoryServerExpectations/one_app_json_expectation.json
@@ -1,6 +1,8 @@
{
"attempts" : [ {
"appSparkVersion" : "",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 9011,
"endTime" : "2015-02-03T16:43:08.731GMT",
diff --git
a/core/src/test/resources/HistoryServerExpectations/one_app_multi_attempt_json_expectation.json
b/core/src/test/resources/HistoryServerExpectations/one_app_multi_attempt_json_expectation.json
index 0e265f11af64..3b116f958dc1 100644
---
a/core/src/test/resources/HistoryServerExpectations/one_app_multi_attempt_json_expectation.json
+++
b/core/src/test/resources/HistoryServerExpectations/one_app_multi_attempt_json_expectation.json
@@ -2,6 +2,8 @@
"attempts" : [ {
"appSparkVersion" : "",
"attemptId" : "2",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 34935,
"endTime" : "2015-03-17T23:12:25.177GMT",
@@ -14,6 +16,8 @@
}, {
"appSparkVersion" : "",
"attemptId" : "1",
+ "logSourceFullPath" : "",
+ "logSourceName" : "",
"completed" : true,
"duration" : 34935,
"endTime" : "2015-03-16T19:25:45.177GMT",
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 8b1b6a5bad38..a1d0b7dc4c05 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -156,9 +156,13 @@ abstract class FsHistoryProviderSuite extends
SparkFunSuite with Matchers with P
completed: Boolean): ApplicationInfo = {
val duration = if (end > 0) end - start else 0
+ // Get log dir info from the provider
+ val logPath = new File(testDir, name).getAbsolutePath
+ val (logSourceName, logSourceFullPath) =
provider.getLogDirInfo(logPath)
new ApplicationInfo(id, name, None, None, None, None,
List(ApplicationAttemptInfo(None, new Date(start),
- new Date(end), new Date(lastMod), duration, user, completed,
SPARK_VERSION)))
+ new Date(end), new Date(lastMod), duration, user, completed,
SPARK_VERSION,
+ Some(logSourceName), Some(logSourceFullPath))))
}
// For completed files, lastUpdated would be lastModified time.
@@ -1201,6 +1205,9 @@ abstract class FsHistoryProviderSuite extends
SparkFunSuite with Matchers with P
!isReadable))
val mockedProvider = spy[FsHistoryProvider](provider)
when(mockedProvider.fs).thenReturn(mockedFs)
+ when(mockedProvider.logDirFs).thenReturn(provider.logDirFs.map {
+ case (dir, _) => dir -> mockedFs
+ })
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
@@ -1383,11 +1390,11 @@ abstract class FsHistoryProviderSuite extends
SparkFunSuite with Matchers with P
val appInfo = new ApplicationAttemptInfo(None, new Date(1), new Date(1),
new Date(1),
10, "spark", false, "dummy")
val attemptInfoWithIndexAsNone = new AttemptInfoWrapper(appInfo,
"dummyPath", 10, None,
- None, None, None, None)
+ None, None, None, None, "test-cluster", "/test/path")
assertSerDe(serializer, attemptInfoWithIndexAsNone)
val attemptInfoWithIndex = new AttemptInfoWrapper(appInfo, "dummyPath",
10, Some(1),
- None, None, None, None)
+ None, None, None, None, "test-cluster", "/test/path")
assertSerDe(serializer, attemptInfoWithIndex)
}
@@ -1894,6 +1901,253 @@ abstract class FsHistoryProviderSuite extends
SparkFunSuite with Matchers with P
new ExecutorInfo(host, 1, executorLogUrlMap, executorAttributes)
}
+ test("SPARK-55793: applications from multiple log directories") {
+ val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+ try {
+ val conf = createTestConf()
+ .set(HISTORY_LOG_DIR,
s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
+ val provider = new FsHistoryProvider(conf)
+
+ // Write app log in first directory
+ val log1 = newLogFile("app1", None, inProgress = false)
+ writeFile(log1, None,
+ SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test",
None),
+ SparkListenerApplicationEnd(5L))
+
+ // Write app log in second directory
+ val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2",
None, None)
+ val log2 = new File(new Path(logUri2).toUri.getPath)
+ writeFile(log2, None,
+ SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test",
None),
+ SparkListenerApplicationEnd(6L))
+
+ updateAndCheck(provider) { list =>
+ list.size should be (2)
+ list.map(_.id).toSet should be (Set("app1-id", "app2-id"))
+
+ // Verify each app has the correct display name derived from its logDir
+ val app1 = list.find(_.id == "app1-id").get
+ val app2 = list.find(_.id == "app2-id").get
+ app1.attempts.head.logSourceName.get should be
(testDir.getAbsolutePath)
+ app2.attempts.head.logSourceName.get should be (dir2.getAbsolutePath)
+ }
+
+ provider.stop()
+ } finally {
+ Utils.deleteRecursively(dir2)
+ }
+ }
+
+ test("SPARK-55793: error in one directory does not affect others") {
+ val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+ try {
+ val conf = createTestConf()
+ .set(HISTORY_LOG_DIR,
+
s"${testDir.getAbsolutePath},/nonexistent/path,${dir2.getAbsolutePath}")
+ val provider = new FsHistoryProvider(conf)
+
+ val log1 = newLogFile("app1", None, inProgress = false)
+ writeFile(log1, None,
+ SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test",
None),
+ SparkListenerApplicationEnd(5L))
+
+ val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2",
None, None)
+ val log2 = new File(new Path(logUri2).toUri.getPath)
+ writeFile(log2, None,
+ SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test",
None),
+ SparkListenerApplicationEnd(6L))
+
+ updateAndCheck(provider) { list =>
+ list.size should be (2)
+ }
+
+ provider.stop()
+ } finally {
+ Utils.deleteRecursively(dir2)
+ }
+ }
+
+ test("SPARK-55793: empty name at head falls back to full path") {
+ val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+ val dir3 = Utils.createTempDir(namePrefix = "logDir3")
+ try {
+ val conf = createTestConf()
+ .set(HISTORY_LOG_DIR,
+
s"${testDir.getAbsolutePath},${dir2.getAbsolutePath},${dir3.getAbsolutePath}")
+ .set(HISTORY_LOG_DIR_NAMES, ",NameB,NameC")
+ val provider = new FsHistoryProvider(conf)
+
+ val log1 = newLogFile("app1", None, inProgress = false)
+ writeFile(log1, None,
+ SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test",
None),
+ SparkListenerApplicationEnd(5L))
+
+ val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2",
None, None)
+ val log2 = new File(new Path(logUri2).toUri.getPath)
+ writeFile(log2, None,
+ SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test",
None),
+ SparkListenerApplicationEnd(6L))
+
+ val logUri3 = SingleEventLogFileWriter.getLogPath(dir3.toURI, "app3",
None, None)
+ val log3 = new File(new Path(logUri3).toUri.getPath)
+ writeFile(log3, None,
+ SparkListenerApplicationStart("app3", Some("app3-id"), 3L, "test",
None),
+ SparkListenerApplicationEnd(7L))
+
+ updateAndCheck(provider) { list =>
+ val app1 = list.find(_.id == "app1-id").get
+ val app2 = list.find(_.id == "app2-id").get
+ val app3 = list.find(_.id == "app3-id").get
+ app1.attempts.head.logSourceName.get should be
(testDir.getAbsolutePath)
+ app2.attempts.head.logSourceName.get should be ("NameB")
+ app3.attempts.head.logSourceName.get should be ("NameC")
+ }
+
+ provider.stop()
+ } finally {
+ Utils.deleteRecursively(dir2)
+ Utils.deleteRecursively(dir3)
+ }
+ }
+
+ test("SPARK-55793: empty name at tail falls back to full path") {
+ val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+ val dir3 = Utils.createTempDir(namePrefix = "logDir3")
+ try {
+ val conf = createTestConf()
+ .set(HISTORY_LOG_DIR,
+
s"${testDir.getAbsolutePath},${dir2.getAbsolutePath},${dir3.getAbsolutePath}")
+ .set(HISTORY_LOG_DIR_NAMES, "NameA,NameB,")
+ val provider = new FsHistoryProvider(conf)
+
+ val log1 = newLogFile("app1", None, inProgress = false)
+ writeFile(log1, None,
+ SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test",
None),
+ SparkListenerApplicationEnd(5L))
+
+ val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2",
None, None)
+ val log2 = new File(new Path(logUri2).toUri.getPath)
+ writeFile(log2, None,
+ SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test",
None),
+ SparkListenerApplicationEnd(6L))
+
+ val logUri3 = SingleEventLogFileWriter.getLogPath(dir3.toURI, "app3",
None, None)
+ val log3 = new File(new Path(logUri3).toUri.getPath)
+ writeFile(log3, None,
+ SparkListenerApplicationStart("app3", Some("app3-id"), 3L, "test",
None),
+ SparkListenerApplicationEnd(7L))
+
+ updateAndCheck(provider) { list =>
+ val app1 = list.find(_.id == "app1-id").get
+ val app2 = list.find(_.id == "app2-id").get
+ val app3 = list.find(_.id == "app3-id").get
+ app1.attempts.head.logSourceName.get should be ("NameA")
+ app2.attempts.head.logSourceName.get should be ("NameB")
+ app3.attempts.head.logSourceName.get should be (dir3.getAbsolutePath)
+ }
+
+ provider.stop()
+ } finally {
+ Utils.deleteRecursively(dir2)
+ Utils.deleteRecursively(dir3)
+ }
+ }
+
+ test("SPARK-55793: empty name in middle falls back to full path") {
+ val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+ val dir3 = Utils.createTempDir(namePrefix = "logDir3")
+ try {
+ val conf = createTestConf()
+ .set(HISTORY_LOG_DIR,
+
s"${testDir.getAbsolutePath},${dir2.getAbsolutePath},${dir3.getAbsolutePath}")
+ .set(HISTORY_LOG_DIR_NAMES, "NameA,,NameC")
+ val provider = new FsHistoryProvider(conf)
+
+ val log1 = newLogFile("app1", None, inProgress = false)
+ writeFile(log1, None,
+ SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test",
None),
+ SparkListenerApplicationEnd(5L))
+
+ val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2",
None, None)
+ val log2 = new File(new Path(logUri2).toUri.getPath)
+ writeFile(log2, None,
+ SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test",
None),
+ SparkListenerApplicationEnd(6L))
+
+ val logUri3 = SingleEventLogFileWriter.getLogPath(dir3.toURI, "app3",
None, None)
+ val log3 = new File(new Path(logUri3).toUri.getPath)
+ writeFile(log3, None,
+ SparkListenerApplicationStart("app3", Some("app3-id"), 3L, "test",
None),
+ SparkListenerApplicationEnd(7L))
+
+ updateAndCheck(provider) { list =>
+ val app1 = list.find(_.id == "app1-id").get
+ val app2 = list.find(_.id == "app2-id").get
+ val app3 = list.find(_.id == "app3-id").get
+ app1.attempts.head.logSourceName.get should be ("NameA")
+ app2.attempts.head.logSourceName.get should be (dir2.getAbsolutePath)
+ app3.attempts.head.logSourceName.get should be ("NameC")
+ }
+
+ provider.stop()
+ } finally {
+ Utils.deleteRecursively(dir2)
+ Utils.deleteRecursively(dir3)
+ }
+ }
+
+ test("SPARK-55793: duplicate display names should fail") {
+ val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+ try {
+ val conf = createTestConf()
+ .set(HISTORY_LOG_DIR,
s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
+ .set(HISTORY_LOG_DIR_NAMES, "SameName,SameName")
+ val e = intercept[IllegalArgumentException] {
+ new FsHistoryProvider(conf)
+ }
+ assert(e.getMessage.contains("Duplicate display names"))
+ } finally {
+ Utils.deleteRecursively(dir2)
+ }
+ }
+
+ test("SPARK-55793: same log file name across dirs resolves correctly with
logSourceFullPath") {
+ val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+ try {
+ val conf = createTestConf().set(HISTORY_LOG_DIR,
+ s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
+ val provider = new FsHistoryProvider(conf)
+
+ val collidingLogName = "shared-event-log"
+ val log1 = new File(testDir, collidingLogName)
+ writeFile(log1, None,
+ SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test",
None),
+ SparkListenerApplicationEnd(5L))
+ val log2 = new File(dir2, collidingLogName)
+ writeFile(log2, None,
+ SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test",
None),
+ SparkListenerApplicationEnd(6L))
+
+ updateAndCheck(provider) { list =>
+ list.size should be(2)
+ list.map(_.id).toSet should be(Set("app1-id", "app2-id"))
+ }
+
+ val attempt = provider.getAttempt("app2-id", None)
+ attempt.logSourceFullPath should endWith(dir2.getAbsolutePath)
+
+ val resolveLogPathMethod =
+ PrivateMethod[(FileSystem, Path)](Symbol("resolveLogPath"))
+ val (_, resolvedPath) = provider invokePrivate
+ resolveLogPathMethod(attempt.logPath, attempt.logSourceFullPath)
+ resolvedPath.toUri.getPath should be(log2.getAbsolutePath)
+
+ provider.stop()
+ } finally {
+ Utils.deleteRecursively(dir2)
+ }
+ }
+
private class SafeModeTestProvider(conf: SparkConf, clock: Clock)
extends FsHistoryProvider(conf, clock) {
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index cf024b8afb4e..13432b6ed9fc 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -232,7 +232,7 @@ abstract class HistoryServerSuite extends SparkFunSuite
with BeforeAndAfter with
val goldenFile =
new File(expRoot, HistoryServerSuite.sanitizePath(name) +
"_expectation.json")
- val jsonAst = parse(clearLastUpdated(jsonOpt.get))
+ val jsonAst = parse(clearDynamicFields(jsonOpt.get))
if (regenerateGoldenFiles) {
Utils.tryWithResource(new FileWriter(goldenFile)) { out =>
@@ -255,9 +255,11 @@ abstract class HistoryServerSuite extends SparkFunSuite
with BeforeAndAfter with
// the REST API returns the last modified time of EVENT LOG file for this
field.
// It is not applicable to hard-code this dynamic field in a static expected
file,
// so here we skip checking the lastUpdated field's value (setting it as "").
- private def clearLastUpdated(json: String): String = {
- if (json.indexOf("lastUpdated") >= 0) {
- val subStrings = json.split(",")
+ // Similarly, logSourceName and logSourceFullPath contain
environment-specific absolute paths.
+ private def clearDynamicFields(json: String): String = {
+ var result = json
+ if (result.indexOf("lastUpdated") >= 0) {
+ val subStrings = result.split(",")
for (i <- subStrings.indices) {
if (subStrings(i).indexOf("lastUpdatedEpoch") >= 0) {
subStrings(i) = subStrings(i).replaceAll("(\\d+)", "0")
@@ -266,10 +268,14 @@ abstract class HistoryServerSuite extends SparkFunSuite
with BeforeAndAfter with
subStrings(i) = regex.replaceAllIn(subStrings(i), "\"lastUpdated\" :
\"\"")
}
}
- subStrings.mkString(",")
- } else {
- json
- }
+ result = subStrings.mkString(",")
+ }
+ // logSourceName and logSourceFullPath contain environment-specific
absolute paths
+ result = "\"logSourceName\"\\s*:\\s*\"[^\"]*\"".r
+ .replaceAllIn(result, "\"logSourceName\" : \"\"")
+ result = "\"logSourceFullPath\"\\s*:\\s*\"[^\"]*\"".r
+ .replaceAllIn(result, "\"logSourceFullPath\" : \"\"")
+ result
}
test("download all logs for app with multiple attempts") {
diff --git a/docs/monitoring.md b/docs/monitoring.md
index e75f83110d19..bd9a8b134dc3 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -170,9 +170,29 @@ Security options for the Spark History Server are covered
more detail in the
logs to load. This can be a local <code>file://</code> path,
an HDFS path <code>hdfs://namenode/shared/spark-logs</code>
or that of an alternative filesystem supported by the Hadoop APIs.
+ Multiple directories can be specified as a comma-separated list
+ (e.g., <code>hdfs:///logs/prod,s3a://bucket/logs/staging</code>).
+ Directories can be on the same or different filesystems.
+ The directories should be disjoint (not nested within each other).
+ If event log files with the same name exist in different directories,
+ each file is indexed separately based on its source directory.
+ When multiple directories are configured, all existing
+ <code>spark.history.fs.*</code> settings apply globally across all
directories
+ (there are no per-directory configurations).
</td>
<td>1.1.0</td>
</tr>
+ <tr>
+ <td>spark.history.fs.logDirectory.names</td>
+ <td>(none)</td>
+ <td>
+ Optional comma-separated list of display names for the log directories
specified
+ in <code>spark.history.fs.logDirectory</code>. Names correspond to
directories by position.
+ If not set, the full path is shown in the UI. Empty entries fall back to
the full path.
+ Duplicate display names are rejected at startup.
+ </td>
+ <td>4.2.0</td>
+ </tr>
<tr>
<td>spark.history.fs.update.interval</td>
<td>10s</td>
@@ -182,6 +202,8 @@ Security options for the Spark History Server are covered
more detail in the
at the expense of more server load re-reading updated applications.
As soon as an update has completed, listings of the completed and
incomplete applications
will reflect the changes.
+ When multiple log directories are configured, one scan cycle covers all
directories
+ sequentially.
</td>
<td>1.4.0</td>
</tr>
@@ -255,6 +277,7 @@ Security options for the Spark History Server are covered
more detail in the
They are also deleted if the number of files is more than
<code>spark.history.fs.cleaner.maxNum</code>, Spark tries to clean up
the completed attempts
from the applications based on the order of their oldest attempt time.
+ When multiple log directories are configured, one cleaner cycle covers
all directories.
</td>
<td>1.4.0</td>
</tr>
@@ -263,6 +286,8 @@ Security options for the Spark History Server are covered
more detail in the
<td>7d</td>
<td>
When <code>spark.history.fs.cleaner.enabled=true</code>, job history
files older than this will be deleted when the filesystem history cleaner runs.
+ When multiple log directories are configured, this age threshold applies
to files across
+ all directories.
</td>
<td>1.4.0</td>
</tr>
@@ -274,6 +299,9 @@ Security options for the Spark History Server are covered
more detail in the
Spark tries to clean up the completed attempt logs to maintain the log
directory under this limit.
This should be smaller than the underlying file system limit like
`dfs.namenode.fs-limits.max-directory-items` in HDFS.
+ When multiple log directories are configured, this limit applies to the
total number of
+ files across all directories. The oldest completed attempts are deleted
first regardless
+ of which directory they belong to.
</td>
<td>3.0.0</td>
</tr>
@@ -326,6 +354,8 @@ Security options for the Spark History Server are covered
more detail in the
<td>25% of available cores</td>
<td>
Number of threads that will be used by history server to process event
logs.
+ When multiple log directories are configured, the thread pool is shared
across
+ all directories.
</td>
<td>2.0.0</td>
</tr>
@@ -334,6 +364,8 @@ Security options for the Spark History Server are covered
more detail in the
<td>25% of available cores</td>
<td>
Number of threads that will be used by history server to compact event
logs.
+ When multiple log directories are configured, the thread pool is shared
across
+ all directories.
</td>
<td>4.1.0</td>
</tr>
@@ -398,6 +430,8 @@ Security options for the Spark History Server are covered
more detail in the
The maximum number of event log files which will be retained as
non-compacted. By default,
all event log files will be retained. The lowest value is 1 for
technical reason.<br/>
Please read the section of "Applying compaction of old event log files"
for more details.
+ When multiple log directories are configured, this setting applies
independently to each
+ directory.
</td>
<td>3.0.0</td>
</tr>
@@ -444,6 +478,8 @@ Security options for the Spark History Server are covered
more detail in the
This controls each scan process to be completed within a reasonable
time, and such
prevent the initial scan from running too long and blocking new eventlog
files to
be scanned in time in large environments.
+ When multiple log directories are configured, this batch size applies
independently
+ to each directory's scan.
</td>
<td>3.4.0</td>
</tr>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 9add6542841e..c6c012600c3d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -42,7 +42,11 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.ProxyRedirectHandler#ResponseWrapper.sendRedirect"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ui.ProxyRedirectHandler#ResponseWrapper.this"),
// [SPARK-55228][SQL] Implement Dataset.zipWithIndex in Scala API
-
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.Dataset.zipWithIndex")
+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.Dataset.zipWithIndex"),
+ // [SPARK-55793][CORE] Add multiple log directories support to SHS
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ApplicationAttemptInfo.apply"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ApplicationAttemptInfo.copy"),
+
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.status.api.v1.ApplicationAttemptInfo$")
)
// Exclude rules for 4.1.x from 4.0.0
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]