Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, an [API] tag is added in the
title.

Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, an [API] tag is added in the
title.


CORE[API][8.0][SPARK-29150][CORE] Update RDD API for Stage level scheduling
to be public (+29, -25)>This PR makes the access level of the RDD api for
stage level scheduling public.[API][7.1][SPARK-8981][CORE] Add MDC support
in Executor (+27, -1)>This PR added MDC(Mapped Diagnostic Context) support
for task threads. By default, each log line printed by the same task thread
will include the same unique task name. Besides, user can also add the
custom content to logs by configuring the log4j pattern. For example,
application IDs/names. This is important when the clusters is shared by
different users/applications.Before:scala> testDf.collect()...20/04/28
16:41:58 WARN MemoryStore: Failed to reserve initial memory threshold of
1024.0 KB for computing block broadcast_1 in memory.20/04/28 16:41:58 WARN
MemoryStore: Not enough space to cache broadcast_1 in memory! (computed
384.0 B so far)20/04/28 16:41:58 WARN MemoryStore: Failed to reserve initial
memory threshold of 1024.0 KB for computing block broadcast_0 in
memory.20/04/28 16:41:58 WARN MemoryStore: Not enough space to cache
broadcast_0 in memory! (computed 384.0 B so far)20/04/28 16:41:58 WARN
RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not
spill but return 0.20/04/28 16:41:58 WARN RowBasedKeyValueBatch: Calling
spill() on RowBasedKeyValueBatch. Will not spill but return 0.20/04/28
16:41:58 WARN RowBasedKeyValueBatch: Failed to allocate page (1048576
bytes).20/04/28 16:41:58 WARN RowBasedKeyValueBatch: Failed to allocate page
(1048576 bytes).20/04/28 16:41:58 ERROR Executor: Exception in task 1.0 in
stage 0.0 (TID 1)org.apache.spark.memory.SparkOutOfMemoryError: Unable to
acquire 262144 bytes of memory, got 22200...After(please note the end of
each line):scala> testDf.collect()...20/04/28 16:40:59 WARN MemoryStore:
Failed to reserve initial memory threshold of 1024.0 KB for computing block
broadcast_1 in memory [task 1.0 in stage 0.0].20/04/28 16:40:59 WARN
MemoryStore: Not enough space to cache broadcast_1 in memory! (computed
384.0 B so far) [task 1.0 in stage 0.0]20/04/28 16:40:59 WARN MemoryStore:
Failed to reserve initial memory threshold of 1024.0 KB for computing block
broadcast_0 in memory. [task 1.0 in stage 0.0]20/04/28 16:40:59 WARN
MemoryStore: Not enough space to cache broadcast_0 in memory! (computed
384.0 B so far) [task 1.0 in stage 0.0] 20/04/28 16:40:59 WARN
RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not
spill but return 0. [task 0.0 in stage 0.0]20/04/28 16:40:59 WARN
RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not
spill but return 0. [task 1.0 in stage 0.0]20/04/28 16:40:59 WARN
RowBasedKeyValueBatch: Failed to allocate page (1048576 bytes). [task 0.0 in
stage 0.0]20/04/28 16:40:59 WARN RowBasedKeyValueBatch: Failed to allocate
page (1048576 bytes). [task 1.0 in stage 0.0] 20/04/28 16:41:00 ERROR
Executor: Exception in task 0.0 in stage 0.0 (TID
0)org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 262144
bytes of memory, got 22200 [task 0.0 in stage
0.0]...SQL[API][7.0][SPARK-31750][SQL] Eliminate UpCast if child's dataType
is DecimalType (+52, -8)>Eliminate the UpCast that are implicitly added by
Spark, if its child data type is already the decimal type. Otherwise, for
cases like:sql("select cast(1 as decimal(38, 0)) as d")
.write.mode("overwrite")
.parquet(f.getAbsolutePath)spark.read.parquet(f.getAbsolutePath).as[BigDecimal]could
fail as follow:[info] org.apache.spark.sql.AnalysisException: Cannot up cast
`d` from decimal(38,0) to decimal(38,18).[info] The type path of the target
object is:[info] - root class: "scala.math.BigDecimal"[info] You can either
add an explicit cast to the input data or choose a higher precision type of
the field in the target object;[info] at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:3060)[info]
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$33$$anonfun$applyOrElse$174.applyOrElse(Analyzer.scala:3087)[info]
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$33$$anonfun$applyOrElse$174.applyOrElse(Analyzer.scala:3071)[info]
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)[info]
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)[info]
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)[info]
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)[API][7.0][SPARK-31755][SQL]
Allow missing year/hour when parsing date/timestamp string (+370, -92)>In
order to keep backward compatibility with Spark 2.4, this PR allows missing
year and hour fields when parsing date/timestamp string and uses the
year 1970 and the hour 0 as the default values.In Spark 3.0,Before:scala>
sql("select to_timestamp('16',
'dd')").show+--------------------+|to_timestamp(16,
dd)|+--------------------+| null|+--------------------+scala> sql("select
to_date('16', 'dd')").show+---------------+|to_date(16,
dd)|+---------------+| null|+---------------+scala> sql("select
to_timestamp('2019 40', 'yyyy
mm')").show+------------------------------+|to_timestamp(2019 40, yyyy
mm)|+------------------------------+| 2019-01-01
00:00:00|+------------------------------+scala> sql("select
to_timestamp('2019 10:10:10', 'yyyy
hh:mm:ss')").show+------------------------------------------+|to_timestamp(2019
10:10:10, yyyy hh:mm:ss)|+------------------------------------------+|
2019-01-01 00:00:00|+------------------------------------------+After:scala>
sql("select to_timestamp('16',
'dd')").show+------------------------+|to_timestamp('16',
'dd')|+------------------------+| 1970-01-16
00:00:00|+------------------------+scala> sql("select to_date('16',
'dd')").show+-------------------+|to_date('16', 'dd')|+-------------------+|
1970-01-16|+-------------------+scala> sql("select to_timestamp('2019 40',
'yyyy mm')").show+----------------------------------+|to_timestamp('2019
40', 'yyyy mm')|+----------------------------------+| 2019-01-01
00:40:00|+----------------------------------+scala> sql("select
to_timestamp('2019 10:10:10', 'yyyy
hh:mm:ss')").show+----------------------------------------------+|to_timestamp('2019
10:10:10', 'yyyy
hh:mm:ss')|+----------------------------------------------+| 2019-01-01
10:10:10|+----------------------------------------------+[7.0][SPARK-31762][SQL]
Fix perf regression of date/timestamp formatting in toHiveString (+138,
-52)>This PR avoids both unnecessary overhead of converting Java date-time
types to micros/days before formatting and unnecessary conversion from input
micros/days to Java types for the formatters.[API][7.0][SPARK-31771][SQL]
Disable Narrow TextStyle for datetime pattern 'G/M/L/E/u/Q/q' (+1441,
-50)>In Spark 3.0, 5 continuous pattern characters with 'G/M/L/E/u/Q/q' is
Narrow-Text Style while it's Full-Text Style in Spark 2.4. With Narrow-Text
Style, Spark will only output the leading single letter of the value,
e.g. December would be D. This PR disables Narrow-Text Style for these
pattern characters in order to avoid the silent data change.After this PR,
queries with DateTime operations using DateTime patterns, e.g. G/M/L/E/u,
will fail if the pattern length is 5. But for other patterns, e,g. 'k', 'm',
they can still accept a certain number of letters.As a result, using
DateTime patterns like "GGGGG", "MMMMM", "LLLLL", "EEEEE", "uuuuu", "aa",
"aaa", which are not supported by the new parser but the legacy parser, will
hit an SparkUpgradeException. To bypass the exception, users can switch to
the legacy parser or change to the new DateTime patterns. However, using
DateTime patterns like "QQQQQ", "qqqqq", which are not supported by both the
new parser and the legacy parser, will hit an IllegalArgumentException. This
exception will be swallowed by Spark and the value null will be
returned.[API][7.0][SPARK-31808][SQL] Makes struct function's output name
and class name pretty (+40, -16)>This PR corrects struct's alias name and
class name in ExpressionInfo.Before:scala> sql("DESC FUNCTION
struct").show(false)+------------------------------------------------------------------------------------+|function_desc
|+------------------------------------------------------------------------------------+|Function:
struct ||Class: org.apache.spark.sql.catalyst.expressions.NamedStruct
||Usage: struct(col1, col2, col3, ...) - Creates a struct with the given
field
values.|+------------------------------------------------------------------------------------+scala>
sql("SELECT struct(1,
2)").show(false)+------------------------------+|named_struct(col1, 1, col2,
2)|+------------------------------+|[1, 2]
|+------------------------------+After:scala> sql("DESC FUNCTION
struct").show(false)+------------------------------------------------------------------------------------+|function_desc
|+------------------------------------------------------------------------------------+|Function:
struct ||Class: org.apache.spark.sql.catalyst.expressions.CreateNamedStruct
||Usage: struct(col1, col2, col3, ...) - Creates a struct with the given
field
values.|+------------------------------------------------------------------------------------+scala>
sql("SELECT struct(1, 2)").show(false)+------------+|struct(1,
2)|+------------+|[1, 2] |+------------+[API][7.0][SPARK-31818][SQL] Fix
pushing down filters with java.time.Instant values in ORC (+77,
-48)>When spark.sql.datetime.java8API.enabled=true, pushing down filters
with java.time.Instant to ORC datasource can fail
by:java.lang.IllegalArgumentException: Wrong value class java.time.Instant
for TIMESTAMP.EQUALS leaf at
org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.checkLiteralType(SearchArgumentImpl.java:192)
at
org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.<init>(SearchArgumentImpl.java:75)This
PR fixes the error by converting java.time.Instant to java.sql.Timestamp in
the filters.[API][8.0][SPARK-30352][SQL] DataSourceV2: Add CURRENT_CATALOG
function (+43, -5)>This PR adds the built-in SQL
function, CURRENT_CATALOG for DataSourceV2 only.[API][8.0][SPARK-31673][SQL]
QueryExection.debug.toFile() to take an additional explain mode param (+90,
-34)>Previously, QueryExecution.debug.toFile always uses the Extended mode
to dump the query plan information. This PR allows users to specify the
desired explain mode.[API][8.0][SPARK-31710][SQL] Adds TIMESTAMP_SECONDS,
TIMESTAMP_MILLIS and TIMESTAMP_MICROS functions (+237, -2)>This PR adds the
three built-in SQL functions
[TIMESTAMP_SECONDS, TIMESTAMP_MILLIS, TIMESTAMP_MICROS] in order to provide
the convenient ways to create timestamps by interpreting the values as the
number of seconds, milliseconds and microseconds since 1970-01-01 00:00:00
UTC respectively.For example,sql("select TIMESTAMP_SECONDS(t.a) as timestamp
from values(1230219000),(-1230219000) as
t(a)").show(false)+-------------------------+|timestamp
|+-------------------------+|2008-12-25 23:30:00 ||1931-01-07 16:30:00
|+-------------------------+sql("select TIMESTAMP_MILLIS(t.a) as timestamp
from values(1230219000123),(-1230219000123) as
t(a)").show(false)+-------------------------------+|timestamp
|+-------------------------------+|2008-12-25 23:30:00.123 ||1931-01-07
16:29:59.877 |+-------------------------------+sql("select
TIMESTAMP_MICROS(t.a) as timestamp from
values(1230219000123123),(-1230219000123123) as
t(a)").show(false)+------------------------------------+|timestamp
|+------------------------------------+|2008-12-25 23:30:00.123123
||1931-01-07 16:29:59.876877
|+------------------------------------+[API][7.0][SPARK-31761][SQL] Cast
integer to Long to avoid IntegerOverflow for IntegralDivide operator (+57,
-12)>This PR casts Byte/Short/Integer to Long for the left and right
children of IntegralDivide to avoid overflow.For cases like:// the df is
constructed from : (-2147483648, -1) --> (_c0, _c1)val res =
df.selectExpr("_c0 div _c1")res.collectBefore:res1:
Array[org.apache.spark.sql.Row] = Array([-2147483648])After:res1:
Array[org.apache.spark.sql.Row] = Array([2147483648])[8.0][SPARK-31793][SQL]
Reduce the memory usage in file scan location metadata (+61, -4)>Currently,
the operator of Data Source Scan stores all the paths in its metadata. The
metadata is kept when a SparkPlan is converted
into SparkPlanInfo. SparkPlanInfo can be used to construct the Spark plan
graph in UI. However, the paths can be still very large (e.g., when many
partitions still remain after partition pruning), while UI pages only can
show up to 100 bytes for the location metadata. Thus, this PR reduces the
number of paths stored in metadata to reduce memory
usage.[API][6.7][SPARK-31854][SQL] Invoke in MapElementsExec should not
propagate null (+12, -2)>This PR fixes a NullPointerException caused
by Dataset.map when whole-stage codegen is enabled by
setting propagateNull to false when
initializing Invoke.[7.0][SPARK-31827][SQL] fail datetime parsing/formatting
if detect the Java 8 bug of stand-alone form (+19, -4)>This PR detects the
usage of the LLL/qqq datetime pattern string under JDK8 and throws an
exception with a clear error message, to avoid hitting a JDK8
bug: https://bugs.openjdk.java.net/browse/JDK-8114833.[7.0][SPARK-31867][SQL]
Disable year type datetime patterns which are longer than 10 (+99, -10)>This
PR fails usage of datetime pattern in the form of y..y and Y..Y with lengths
greater than 10 to avoid hitting a JDK bug.[7.0][SPARK-31874][SQL]
Use FastDateFormat as the legacy fractional formatter (+6, -1)>This
PR:replaces SimpleDateFormat by FastDateFormat as the legacy formatter
of FractionTimestampFormatter, to utilize the internal cache
of FastDateFormat, and avoid parsing the default pattern yyyy-MM-dd
HH:mm:ss.optimizes LegacyFastTimestampFormatter for java.sql.Timestamp w/o
fractional part, to avoid conversions to microseconds for patterns without
the fractional part.[7.0][SPARK-31885][SQL] Fix filter push down for old
millis timestamps to Parquet (+25, -22)>This PR fixes conversions
of java.sql.Timestamp to milliseconds in ParquetFilter by using existing
functions
from DateTimeUtils fromJavaTimestamp() and 
microsToMillis().[7.0][API][SPARK-31888][SQL]
Support java.time.Instant in Parquet filter pushdown (+83, -70)>This PR
enables push down of filters with java.time.Instant attributes
by:modifying ParquetFilters.valueCanMakeFilterOn() to accept filters
with java.time.Instant attributes.adding ParquetFilters.timestampToMicros() to
support both types java.sql.Timestamp and java.time.Instant in conversions
to microseconds.reusing timestampToMicros in constructing of Parquet
filters.[8.0][API][SPARK-28067][SQL] Fix incorrect results for decimal
aggregate sum by returning null on decimal overflow (+211, -25)>This PR
fixes wrong results in sum aggregate function with decimals in case of
overflow by adding an extra flag field in the sum agg
function.[8.0][SPARK-28481][SQL] More expressions should extend
NullIntolerant (+180, -103)>This PR makes more expressions
extend NullIntolerant, and as a result can avoid skew join if the join
column has many null values and improve query
performance.[7.0][SPARK-31859][SPARK-31861][SPARK-31863] Fix Thriftserver
session timezone issues (+209, -185)>This PR fixes:SPARK-31861 "Thriftserver
collecting timestamp not using spark.sql.session.timeZone" by converting the
Timestamp values to String earlier, in SparkExecuteStatementOperation,
using HiveResults.toHiveString().SPARK-31859 "Thriftserver not working with
spark.sql.datetime.java8API.enabled=true" by
using HiveResults.toHiveString().[7.0][SPARK-31354] SparkContext should only
register one SparkSession ApplicationEnd listener (+41, -11)>This PR makes
sure that getOrCreate only registers Spark listener once. For
example,SparkSession.builder().master("local").getOrCreate()SparkSession.clearActiveSession()SparkSession.clearDefaultSession()SparkSession.builder().master("local").getOrCreate()SparkSession.clearActiveSession()SparkSession.clearDefaultSession()Before
this PR, there are two listeners registered
at 
ListenerBus:[org.apache.spark.status.AppStatusListener5f610071,org.apache.spark.HeartbeatReceiverd400c17,org.apache.spark.sql.SparkSession$$anon$125849aeb,
<- first listenerorg.apache.spark.sql.SparkSession$$anon$1fadb9a0] <- second
listenerAfter this PR, there's only one listener registered
at 
ListenerBus:[org.apache.spark.status.AppStatusListener5f610071,org.apache.spark.HeartbeatReceiverd400c17,org.apache.spark.sql.SparkSession$$anon$125849aeb]
<- only one listenerML[API][8.0][SPARK-31734][ML][PYSPARK] Add weight
support in ClusteringEvaluator (+167, -67)>This PR adds setWeightCol method
to ClusteringEvaluator as BinaryClassificationEvaluator, RegressionEvaluator, 
MulticlassClassificationEvaluator do.[8.0][SPARK-31803][ML]
Make sure instance weight is not negative (+32, -16)>This PR adds checks to
make sure instance weight is not negative in the algorithms that support
instance weight.[8.0][SPARK-31840][ML] Add instance weight support in
LogisticRegressionSummary (+134, -43)>Add instance weight support in
LogisticRegressionSummary to match its capabilities with those of
LogisticRegression, MulticlassClassificationEvaluator and
BinaryClassificationEvaluator.[API][8.0][SPARK-31768][ML] Add getMetrics in
Evaluators (+905, -591)>Currently, Evaluator.evaluate can only access to one
metrics. This PR adds getMetrics method in all the Evaluators to allow users
to get multiple metrics.For example: val trainer = new LinearRegression val
model = trainer.fit(dataset) val predictions = model.transform(dataset) val
evaluator = new RegressionEvaluator() val metrics =
evaluator.getMetrics(predictions) val rmse = metrics.rootMeanSquaredError
val r2 = metrics.r2 val mae = metrics.meanAbsoluteError val variance =
metrics.explainedVariancePYTHON[7.0][SPARK-31788][CORE][DSTREAM][PYTHON]
Recover the support of union for different types of RDD and DStreams (+39,
-5)>This PR manually specifies the class for the input array being used
in (SparkContext|StreamingContext).union. It fixes a regression introduced
from SPARK-25737.[7.0][SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions
more Pythonic (+61, -17)>This PR makes PySpark exception more Pythonic by
hiding JVM stacktrace by default, when the JVM exceptions are the analyzer's
exceptions, AnalysisException, ParseException, StreamingQueryException,
QueryExecutionException, IllegalArgumentException, and PythonException
[thrown by Python UDFs]. It can be enabled by turning
on spark.sql.pyspark.jvmStacktrace.enabled 
configuration.[8.0][SPARK-25351][SQL][PYTHON]
Handle Pandas category type when converting from Python with Arrow (+52,
-0)>This PR adds support for Pandas category type while converting from
python with Arrow enabled. The category column will be converted to whatever
type the category elements are as is the case with Arrow
disabled.[API][8.0][SPARK-31763][PYSPARK] Add inputFiles method in PySpark
DataFrame Class (+32, -0)>This PR adds inputFiles() method to
PySpark DataFrame to enable PySpark users to list all files constituting
a DataFrame.SS[API][7.0][SPARK-31706][SS] Add back the support of streaming
update mode (+54, -20)>In Spark 2.4, all built-in v2 streaming sinks support
all streaming output modes, and v2 sinks are enabled by
default(see SPARK-22911). To keep the backward compatibility, we added back
the support of streaming UPDATE mode that was dropped in the unreleased 3.0
branch.[API][7.0][SPARK-31792][SS][DOCS] Introduce the structured streaming
UI in the Web UI doc (+28, -0)>This PR adds the structured streaming UI
introduction in the Web UI doc.[8.0][SPARK-30915][SS]
CompactibleFileStreamLog: Avoid reading the metadata log file when finding
the latest batch ID (+102, -10)>When the compacted metadata log file becomes
huge, writing outputs for the compact + 1 batch can be slow due to
unnecessarily reading the compacted metadata log file. To get rid of the
unnecessary reading, this PR adds getLatestBatchId() method
in CompactibleFileStreamLog in the complement of getLatest(). The method
doesn't read the content of the latest batch metadata log file, and can be
applied to both FileStreamSource and FileStreamSink to avoid unnecessary
latency when reading the log file.UI[API][8.0][SPARK-29303][WEB UI] Add UI
support for stage level scheduling (+657, -103)>This PR adds the UI support
for stage level scheduling and ResourceProfiles:Add ResourceProfile Id to
Stage pageAdd ResourceProfile Id to Executor pageAdd a section with
ResourceProfile Id to Environment pageAlso, the rest API for the environment
page is updated to include the ResourceProfile information.       
[7.0][SPARK-31882][WEBUI] DAG-viz is not rendered correctly with pagination
(+31, -6)>This PR fixes a DAG-viz bug, in which rendering fails with
pagination when DAG-viz fetches link urls for each stage of a job from the
stage table.[API][8.0][SPARK-31642] Add Pagination Support for Structured
Streaming Page (+209, -91)>This PR adds the pagination support for the
structured streaming page in order to:Help users to analyze structured
streaming queries in a much better wayImprove the consistent of Spark Web UI
across various pagesPrevent potential OOMOTHER[7.0][SPARK-31786][K8S][BUILD]
Upgrade kubernetes-client to 4.9.2 (+14,
-11)>[API][8.0][SPARK-31759][DEPLOY] Support configurable max number of
rotate logs for spark daemons (+11, -3)>
[8.0][SPARK-31214][BUILD] Upgrade Janino to 3.1.2 (+14, -12)>This PR
upgrades Janino to
3.1.2: 
http://janino-compiler.github.io/janino/changelog.html[8.0][SPARK-31858][BUILD]
Upgrade commons-io to 2.5 in Hadoop 3.2 profile (+2, -2)>This PR
upgrades commons-io from 2.4 to 2.5 for Apache Spark
3.1[8.0][SPARK-31876][BUILD] Upgrade to Zstd 1.4.5 (+4, -4)>This PR aims to
upgrade to Zstd 1.4.5: https://github.com/facebook/zstd/releases/tag/v1.4.5




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to