Hi all, This is the bi-weekly Apache Spark digest from the Databricks OSS team. For each API/configuration/behavior change, an [API] tag is added in the title.
Hi all, This is the bi-weekly Apache Spark digest from the Databricks OSS team. For each API/configuration/behavior change, an [API] tag is added in the title. CORE[API][8.0][SPARK-29150][CORE] Update RDD API for Stage level scheduling to be public (+29, -25)>This PR makes the access level of the RDD api for stage level scheduling public.[API][7.1][SPARK-8981][CORE] Add MDC support in Executor (+27, -1)>This PR added MDC(Mapped Diagnostic Context) support for task threads. By default, each log line printed by the same task thread will include the same unique task name. Besides, user can also add the custom content to logs by configuring the log4j pattern. For example, application IDs/names. This is important when the clusters is shared by different users/applications.Before:scala> testDf.collect()...20/04/28 16:41:58 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_1 in memory.20/04/28 16:41:58 WARN MemoryStore: Not enough space to cache broadcast_1 in memory! (computed 384.0 B so far)20/04/28 16:41:58 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_0 in memory.20/04/28 16:41:58 WARN MemoryStore: Not enough space to cache broadcast_0 in memory! (computed 384.0 B so far)20/04/28 16:41:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.20/04/28 16:41:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.20/04/28 16:41:58 WARN RowBasedKeyValueBatch: Failed to allocate page (1048576 bytes).20/04/28 16:41:58 WARN RowBasedKeyValueBatch: Failed to allocate page (1048576 bytes).20/04/28 16:41:58 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 262144 bytes of memory, got 22200...After(please note the end of each line):scala> testDf.collect()...20/04/28 16:40:59 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_1 in memory [task 1.0 in stage 0.0].20/04/28 16:40:59 WARN MemoryStore: Not enough space to cache broadcast_1 in memory! (computed 384.0 B so far) [task 1.0 in stage 0.0]20/04/28 16:40:59 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_0 in memory. [task 1.0 in stage 0.0]20/04/28 16:40:59 WARN MemoryStore: Not enough space to cache broadcast_0 in memory! (computed 384.0 B so far) [task 1.0 in stage 0.0] 20/04/28 16:40:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0. [task 0.0 in stage 0.0]20/04/28 16:40:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0. [task 1.0 in stage 0.0]20/04/28 16:40:59 WARN RowBasedKeyValueBatch: Failed to allocate page (1048576 bytes). [task 0.0 in stage 0.0]20/04/28 16:40:59 WARN RowBasedKeyValueBatch: Failed to allocate page (1048576 bytes). [task 1.0 in stage 0.0] 20/04/28 16:41:00 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 262144 bytes of memory, got 22200 [task 0.0 in stage 0.0]...SQL[API][7.0][SPARK-31750][SQL] Eliminate UpCast if child's dataType is DecimalType (+52, -8)>Eliminate the UpCast that are implicitly added by Spark, if its child data type is already the decimal type. Otherwise, for cases like:sql("select cast(1 as decimal(38, 0)) as d") .write.mode("overwrite") .parquet(f.getAbsolutePath)spark.read.parquet(f.getAbsolutePath).as[BigDecimal]could fail as follow:[info] org.apache.spark.sql.AnalysisException: Cannot up cast `d` from decimal(38,0) to decimal(38,18).[info] The type path of the target object is:[info] - root class: "scala.math.BigDecimal"[info] You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:3060)[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$33$$anonfun$applyOrElse$174.applyOrElse(Analyzer.scala:3087)[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$33$$anonfun$applyOrElse$174.applyOrElse(Analyzer.scala:3071)[info] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)[info] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)[info] at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)[info] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)[API][7.0][SPARK-31755][SQL] Allow missing year/hour when parsing date/timestamp string (+370, -92)>In order to keep backward compatibility with Spark 2.4, this PR allows missing year and hour fields when parsing date/timestamp string and uses the year 1970 and the hour 0 as the default values.In Spark 3.0,Before:scala> sql("select to_timestamp('16', 'dd')").show+--------------------+|to_timestamp(16, dd)|+--------------------+| null|+--------------------+scala> sql("select to_date('16', 'dd')").show+---------------+|to_date(16, dd)|+---------------+| null|+---------------+scala> sql("select to_timestamp('2019 40', 'yyyy mm')").show+------------------------------+|to_timestamp(2019 40, yyyy mm)|+------------------------------+| 2019-01-01 00:00:00|+------------------------------+scala> sql("select to_timestamp('2019 10:10:10', 'yyyy hh:mm:ss')").show+------------------------------------------+|to_timestamp(2019 10:10:10, yyyy hh:mm:ss)|+------------------------------------------+| 2019-01-01 00:00:00|+------------------------------------------+After:scala> sql("select to_timestamp('16', 'dd')").show+------------------------+|to_timestamp('16', 'dd')|+------------------------+| 1970-01-16 00:00:00|+------------------------+scala> sql("select to_date('16', 'dd')").show+-------------------+|to_date('16', 'dd')|+-------------------+| 1970-01-16|+-------------------+scala> sql("select to_timestamp('2019 40', 'yyyy mm')").show+----------------------------------+|to_timestamp('2019 40', 'yyyy mm')|+----------------------------------+| 2019-01-01 00:40:00|+----------------------------------+scala> sql("select to_timestamp('2019 10:10:10', 'yyyy hh:mm:ss')").show+----------------------------------------------+|to_timestamp('2019 10:10:10', 'yyyy hh:mm:ss')|+----------------------------------------------+| 2019-01-01 10:10:10|+----------------------------------------------+[7.0][SPARK-31762][SQL] Fix perf regression of date/timestamp formatting in toHiveString (+138, -52)>This PR avoids both unnecessary overhead of converting Java date-time types to micros/days before formatting and unnecessary conversion from input micros/days to Java types for the formatters.[API][7.0][SPARK-31771][SQL] Disable Narrow TextStyle for datetime pattern 'G/M/L/E/u/Q/q' (+1441, -50)>In Spark 3.0, 5 continuous pattern characters with 'G/M/L/E/u/Q/q' is Narrow-Text Style while it's Full-Text Style in Spark 2.4. With Narrow-Text Style, Spark will only output the leading single letter of the value, e.g. December would be D. This PR disables Narrow-Text Style for these pattern characters in order to avoid the silent data change.After this PR, queries with DateTime operations using DateTime patterns, e.g. G/M/L/E/u, will fail if the pattern length is 5. But for other patterns, e,g. 'k', 'm', they can still accept a certain number of letters.As a result, using DateTime patterns like "GGGGG", "MMMMM", "LLLLL", "EEEEE", "uuuuu", "aa", "aaa", which are not supported by the new parser but the legacy parser, will hit an SparkUpgradeException. To bypass the exception, users can switch to the legacy parser or change to the new DateTime patterns. However, using DateTime patterns like "QQQQQ", "qqqqq", which are not supported by both the new parser and the legacy parser, will hit an IllegalArgumentException. This exception will be swallowed by Spark and the value null will be returned.[API][7.0][SPARK-31808][SQL] Makes struct function's output name and class name pretty (+40, -16)>This PR corrects struct's alias name and class name in ExpressionInfo.Before:scala> sql("DESC FUNCTION struct").show(false)+------------------------------------------------------------------------------------+|function_desc |+------------------------------------------------------------------------------------+|Function: struct ||Class: org.apache.spark.sql.catalyst.expressions.NamedStruct ||Usage: struct(col1, col2, col3, ...) - Creates a struct with the given field values.|+------------------------------------------------------------------------------------+scala> sql("SELECT struct(1, 2)").show(false)+------------------------------+|named_struct(col1, 1, col2, 2)|+------------------------------+|[1, 2] |+------------------------------+After:scala> sql("DESC FUNCTION struct").show(false)+------------------------------------------------------------------------------------+|function_desc |+------------------------------------------------------------------------------------+|Function: struct ||Class: org.apache.spark.sql.catalyst.expressions.CreateNamedStruct ||Usage: struct(col1, col2, col3, ...) - Creates a struct with the given field values.|+------------------------------------------------------------------------------------+scala> sql("SELECT struct(1, 2)").show(false)+------------+|struct(1, 2)|+------------+|[1, 2] |+------------+[API][7.0][SPARK-31818][SQL] Fix pushing down filters with java.time.Instant values in ORC (+77, -48)>When spark.sql.datetime.java8API.enabled=true, pushing down filters with java.time.Instant to ORC datasource can fail by:java.lang.IllegalArgumentException: Wrong value class java.time.Instant for TIMESTAMP.EQUALS leaf at org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.checkLiteralType(SearchArgumentImpl.java:192) at org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.<init>(SearchArgumentImpl.java:75)This PR fixes the error by converting java.time.Instant to java.sql.Timestamp in the filters.[API][8.0][SPARK-30352][SQL] DataSourceV2: Add CURRENT_CATALOG function (+43, -5)>This PR adds the built-in SQL function, CURRENT_CATALOG for DataSourceV2 only.[API][8.0][SPARK-31673][SQL] QueryExection.debug.toFile() to take an additional explain mode param (+90, -34)>Previously, QueryExecution.debug.toFile always uses the Extended mode to dump the query plan information. This PR allows users to specify the desired explain mode.[API][8.0][SPARK-31710][SQL] Adds TIMESTAMP_SECONDS, TIMESTAMP_MILLIS and TIMESTAMP_MICROS functions (+237, -2)>This PR adds the three built-in SQL functions [TIMESTAMP_SECONDS, TIMESTAMP_MILLIS, TIMESTAMP_MICROS] in order to provide the convenient ways to create timestamps by interpreting the values as the number of seconds, milliseconds and microseconds since 1970-01-01 00:00:00 UTC respectively.For example,sql("select TIMESTAMP_SECONDS(t.a) as timestamp from values(1230219000),(-1230219000) as t(a)").show(false)+-------------------------+|timestamp |+-------------------------+|2008-12-25 23:30:00 ||1931-01-07 16:30:00 |+-------------------------+sql("select TIMESTAMP_MILLIS(t.a) as timestamp from values(1230219000123),(-1230219000123) as t(a)").show(false)+-------------------------------+|timestamp |+-------------------------------+|2008-12-25 23:30:00.123 ||1931-01-07 16:29:59.877 |+-------------------------------+sql("select TIMESTAMP_MICROS(t.a) as timestamp from values(1230219000123123),(-1230219000123123) as t(a)").show(false)+------------------------------------+|timestamp |+------------------------------------+|2008-12-25 23:30:00.123123 ||1931-01-07 16:29:59.876877 |+------------------------------------+[API][7.0][SPARK-31761][SQL] Cast integer to Long to avoid IntegerOverflow for IntegralDivide operator (+57, -12)>This PR casts Byte/Short/Integer to Long for the left and right children of IntegralDivide to avoid overflow.For cases like:// the df is constructed from : (-2147483648, -1) --> (_c0, _c1)val res = df.selectExpr("_c0 div _c1")res.collectBefore:res1: Array[org.apache.spark.sql.Row] = Array([-2147483648])After:res1: Array[org.apache.spark.sql.Row] = Array([2147483648])[8.0][SPARK-31793][SQL] Reduce the memory usage in file scan location metadata (+61, -4)>Currently, the operator of Data Source Scan stores all the paths in its metadata. The metadata is kept when a SparkPlan is converted into SparkPlanInfo. SparkPlanInfo can be used to construct the Spark plan graph in UI. However, the paths can be still very large (e.g., when many partitions still remain after partition pruning), while UI pages only can show up to 100 bytes for the location metadata. Thus, this PR reduces the number of paths stored in metadata to reduce memory usage.[API][6.7][SPARK-31854][SQL] Invoke in MapElementsExec should not propagate null (+12, -2)>This PR fixes a NullPointerException caused by Dataset.map when whole-stage codegen is enabled by setting propagateNull to false when initializing Invoke.[7.0][SPARK-31827][SQL] fail datetime parsing/formatting if detect the Java 8 bug of stand-alone form (+19, -4)>This PR detects the usage of the LLL/qqq datetime pattern string under JDK8 and throws an exception with a clear error message, to avoid hitting a JDK8 bug: https://bugs.openjdk.java.net/browse/JDK-8114833.[7.0][SPARK-31867][SQL] Disable year type datetime patterns which are longer than 10 (+99, -10)>This PR fails usage of datetime pattern in the form of y..y and Y..Y with lengths greater than 10 to avoid hitting a JDK bug.[7.0][SPARK-31874][SQL] Use FastDateFormat as the legacy fractional formatter (+6, -1)>This PR:replaces SimpleDateFormat by FastDateFormat as the legacy formatter of FractionTimestampFormatter, to utilize the internal cache of FastDateFormat, and avoid parsing the default pattern yyyy-MM-dd HH:mm:ss.optimizes LegacyFastTimestampFormatter for java.sql.Timestamp w/o fractional part, to avoid conversions to microseconds for patterns without the fractional part.[7.0][SPARK-31885][SQL] Fix filter push down for old millis timestamps to Parquet (+25, -22)>This PR fixes conversions of java.sql.Timestamp to milliseconds in ParquetFilter by using existing functions from DateTimeUtils fromJavaTimestamp() and microsToMillis().[7.0][API][SPARK-31888][SQL] Support java.time.Instant in Parquet filter pushdown (+83, -70)>This PR enables push down of filters with java.time.Instant attributes by:modifying ParquetFilters.valueCanMakeFilterOn() to accept filters with java.time.Instant attributes.adding ParquetFilters.timestampToMicros() to support both types java.sql.Timestamp and java.time.Instant in conversions to microseconds.reusing timestampToMicros in constructing of Parquet filters.[8.0][API][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow (+211, -25)>This PR fixes wrong results in sum aggregate function with decimals in case of overflow by adding an extra flag field in the sum agg function.[8.0][SPARK-28481][SQL] More expressions should extend NullIntolerant (+180, -103)>This PR makes more expressions extend NullIntolerant, and as a result can avoid skew join if the join column has many null values and improve query performance.[7.0][SPARK-31859][SPARK-31861][SPARK-31863] Fix Thriftserver session timezone issues (+209, -185)>This PR fixes:SPARK-31861 "Thriftserver collecting timestamp not using spark.sql.session.timeZone" by converting the Timestamp values to String earlier, in SparkExecuteStatementOperation, using HiveResults.toHiveString().SPARK-31859 "Thriftserver not working with spark.sql.datetime.java8API.enabled=true" by using HiveResults.toHiveString().[7.0][SPARK-31354] SparkContext should only register one SparkSession ApplicationEnd listener (+41, -11)>This PR makes sure that getOrCreate only registers Spark listener once. For example,SparkSession.builder().master("local").getOrCreate()SparkSession.clearActiveSession()SparkSession.clearDefaultSession()SparkSession.builder().master("local").getOrCreate()SparkSession.clearActiveSession()SparkSession.clearDefaultSession()Before this PR, there are two listeners registered at ListenerBus:[org.apache.spark.status.AppStatusListener5f610071,org.apache.spark.HeartbeatReceiverd400c17,org.apache.spark.sql.SparkSession$$anon$125849aeb, <- first listenerorg.apache.spark.sql.SparkSession$$anon$1fadb9a0] <- second listenerAfter this PR, there's only one listener registered at ListenerBus:[org.apache.spark.status.AppStatusListener5f610071,org.apache.spark.HeartbeatReceiverd400c17,org.apache.spark.sql.SparkSession$$anon$125849aeb] <- only one listenerML[API][8.0][SPARK-31734][ML][PYSPARK] Add weight support in ClusteringEvaluator (+167, -67)>This PR adds setWeightCol method to ClusteringEvaluator as BinaryClassificationEvaluator, RegressionEvaluator, MulticlassClassificationEvaluator do.[8.0][SPARK-31803][ML] Make sure instance weight is not negative (+32, -16)>This PR adds checks to make sure instance weight is not negative in the algorithms that support instance weight.[8.0][SPARK-31840][ML] Add instance weight support in LogisticRegressionSummary (+134, -43)>Add instance weight support in LogisticRegressionSummary to match its capabilities with those of LogisticRegression, MulticlassClassificationEvaluator and BinaryClassificationEvaluator.[API][8.0][SPARK-31768][ML] Add getMetrics in Evaluators (+905, -591)>Currently, Evaluator.evaluate can only access to one metrics. This PR adds getMetrics method in all the Evaluators to allow users to get multiple metrics.For example: val trainer = new LinearRegression val model = trainer.fit(dataset) val predictions = model.transform(dataset) val evaluator = new RegressionEvaluator() val metrics = evaluator.getMetrics(predictions) val rmse = metrics.rootMeanSquaredError val r2 = metrics.r2 val mae = metrics.meanAbsoluteError val variance = metrics.explainedVariancePYTHON[7.0][SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams (+39, -5)>This PR manually specifies the class for the input array being used in (SparkContext|StreamingContext).union. It fixes a regression introduced from SPARK-25737.[7.0][SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic (+61, -17)>This PR makes PySpark exception more Pythonic by hiding JVM stacktrace by default, when the JVM exceptions are the analyzer's exceptions, AnalysisException, ParseException, StreamingQueryException, QueryExecutionException, IllegalArgumentException, and PythonException [thrown by Python UDFs]. It can be enabled by turning on spark.sql.pyspark.jvmStacktrace.enabled configuration.[8.0][SPARK-25351][SQL][PYTHON] Handle Pandas category type when converting from Python with Arrow (+52, -0)>This PR adds support for Pandas category type while converting from python with Arrow enabled. The category column will be converted to whatever type the category elements are as is the case with Arrow disabled.[API][8.0][SPARK-31763][PYSPARK] Add inputFiles method in PySpark DataFrame Class (+32, -0)>This PR adds inputFiles() method to PySpark DataFrame to enable PySpark users to list all files constituting a DataFrame.SS[API][7.0][SPARK-31706][SS] Add back the support of streaming update mode (+54, -20)>In Spark 2.4, all built-in v2 streaming sinks support all streaming output modes, and v2 sinks are enabled by default(see SPARK-22911). To keep the backward compatibility, we added back the support of streaming UPDATE mode that was dropped in the unreleased 3.0 branch.[API][7.0][SPARK-31792][SS][DOCS] Introduce the structured streaming UI in the Web UI doc (+28, -0)>This PR adds the structured streaming UI introduction in the Web UI doc.[8.0][SPARK-30915][SS] CompactibleFileStreamLog: Avoid reading the metadata log file when finding the latest batch ID (+102, -10)>When the compacted metadata log file becomes huge, writing outputs for the compact + 1 batch can be slow due to unnecessarily reading the compacted metadata log file. To get rid of the unnecessary reading, this PR adds getLatestBatchId() method in CompactibleFileStreamLog in the complement of getLatest(). The method doesn't read the content of the latest batch metadata log file, and can be applied to both FileStreamSource and FileStreamSink to avoid unnecessary latency when reading the log file.UI[API][8.0][SPARK-29303][WEB UI] Add UI support for stage level scheduling (+657, -103)>This PR adds the UI support for stage level scheduling and ResourceProfiles:Add ResourceProfile Id to Stage pageAdd ResourceProfile Id to Executor pageAdd a section with ResourceProfile Id to Environment pageAlso, the rest API for the environment page is updated to include the ResourceProfile information. [7.0][SPARK-31882][WEBUI] DAG-viz is not rendered correctly with pagination (+31, -6)>This PR fixes a DAG-viz bug, in which rendering fails with pagination when DAG-viz fetches link urls for each stage of a job from the stage table.[API][8.0][SPARK-31642] Add Pagination Support for Structured Streaming Page (+209, -91)>This PR adds the pagination support for the structured streaming page in order to:Help users to analyze structured streaming queries in a much better wayImprove the consistent of Spark Web UI across various pagesPrevent potential OOMOTHER[7.0][SPARK-31786][K8S][BUILD] Upgrade kubernetes-client to 4.9.2 (+14, -11)>[API][8.0][SPARK-31759][DEPLOY] Support configurable max number of rotate logs for spark daemons (+11, -3)> [8.0][SPARK-31214][BUILD] Upgrade Janino to 3.1.2 (+14, -12)>This PR upgrades Janino to 3.1.2: http://janino-compiler.github.io/janino/changelog.html[8.0][SPARK-31858][BUILD] Upgrade commons-io to 2.5 in Hadoop 3.2 profile (+2, -2)>This PR upgrades commons-io from 2.4 to 2.5 for Apache Spark 3.1[8.0][SPARK-31876][BUILD] Upgrade to Zstd 1.4.5 (+4, -4)>This PR aims to upgrade to Zstd 1.4.5: https://github.com/facebook/zstd/releases/tag/v1.4.5 -- Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org