Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, an *[API] *tag is added in the
title.

CORE
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#70spark-31923core-ignore-internal-accumulators-that-use-unrecognized-types-rather-than-crashing-63--5>[3.0][SPARK-31923][CORE]
Ignore internal accumulators that use unrecognized types rather than
crashing (+63, -5)>
<https://github.com/apache/spark/commit/b333ed0c4a5733a9c36ad79de1d4c13c6cf3c5d4>

A user may name his accumulators using the internal.metrics. prefix, so
that Spark treats them as internal accumulators and hides them from UI. We
should make JsonProtocol.accumValueToJson more robust and let it ignore
internal accumulators that use unrecognized types.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#api80spark-31486core-sparksubmitwaitappcompletion-flag-to-control-spark-submit-exit-in-standalone-cluster-mode-88--26>[API][3.1][SPARK-31486][CORE]
spark.submit.waitAppCompletion flag to control spark-submit exit in
Standalone Cluster Mode (+88, -26)>
<https://github.com/apache/spark/commit/6befb2d8bdc5743d0333f4839cf301af165582ce>

This PR implements an application wait mechanism that allows spark-submit to
wait until the application finishes in Standalone mode. This will delay the
exit of spark-submit JVM until the job is completed. This implementation
will keep monitoring the application until it is either finished, failed,
or killed. This will be controlled via the following conf:

   -

   spark.standalone.submit.waitAppCompletion (Default: false)

   In standalone cluster mode, controls whether the client waits to exit
   until the application completes. If set to true, the client process will
   stay alive polling the driver's status. Otherwise, the client process will
   exit after submission.

<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#sql>
SQL
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#71spark-31220sql-repartition-obeys-initialpartitionnum-when-adaptiveexecutionenabled-27--12>[3.0][SPARK-31220][SQL]
repartition obeys initialPartitionNum when adaptiveExecutionEnabled (+27,
-12)>
<https://github.com/apache/spark/commit/1d1eacde9d1b6fb75a20e4b909d221e70ad737db>

AQE and non-AQE use different configs to set the initial shuffle partition
number. This PR fixes repartition/DISTRIBUTE BY so that it also uses the
AQE config spark.sql.adaptive.coalescePartitions.initialPartitionNum to set
the initial shuffle partition number if AQE is enabled.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#70spark-31867sqlfollowup-check-result-differences-for-datetime-formatting-51--8>[3.0][SPARK-31867][SQL][FOLLOWUP]
Check result differences for datetime formatting (+51, -8)>
<https://github.com/apache/spark/commit/fc6af9d900ec6f6a1cbe8f987857a69e6ef600d1>

Spark should throw SparkUpgradeException when getting DateTimeException for
datetime formatting in the EXCEPTION legacy Time Parser Policy.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#api70spark-31879spark-31892sql-disable-week-based-pattern-letters-in-datetime-parsingformatting-1421--171-102--48>[API][3.0][SPARK-31879][SPARK-31892][SQL]
Disable week-based pattern letters in datetime parsing/formatting (+1421,
-171)>
<https://github.com/apache/spark/commit/9d5b5d0a5849ac329bbae26d9884d8843d8a8571>
(+102,
-48)>
<https://github.com/apache/spark/commit/afe95bd9ad7a07c49deecf05f0a1000bb8f80caa>

Week-based pattern letters have very weird behaviors during datetime
parsing in Spark 2.4, and it's very hard to simulate the legacy behaviors
with the new API. For formatting, the new API makes the start-of-week
localized, and it's not possible to keep the legacy behaviors. Since the
week-based fields are rarely used, we disable week-based pattern letters in
both parsing and formatting.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#70spark-31896sql-handle-am-pm-timestamp-parsing-when-hour-is-missing-39--3>[3.0][SPARK-31896][SQL]
Handle am-pm timestamp parsing when hour is missing (+39, -3)>
<https://github.com/apache/spark/commit/afcc14c6d27f9e0bd113e0d86b64dc6fa4eed551>

This PR sets the hour field to 0 or 12 when the AMPM_OF_DAY field is AM or
PM during datetime parsing, to keep the behavior the same as Spark 2.4.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#api80spark-31830sql-consistent-error-handling-for-datetime-formatting-and-parsing-functions-126--580>[API][3.1][SPARK-31830][SQL]
Consistent error handling for datetime formatting and parsing functions
(+126, -580)>
<https://github.com/apache/spark/commit/6a424b93e5bdb79b1f1310cf48bd034397779e14>

When parsing/formatting datetime values, it's better to fail fast if the
pattern string is invalid, instead of returning null for each input record.
The formatting functions such as date_format already do it, this PR applies
the fail-fast behavior to parsing functions: from_unixtime, unix_timestamp,
to_unix_timestamp, to_timestamp and to_date.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#80spark-31910sql-enable-java-8-time-api-in-thrift-server-23--0>[3.1][SPARK-31910][SQL]
Enable Java 8 time API in Thrift server (+23, -0)>
<https://github.com/apache/spark/commit/2c9988eaf31b7ebd97f2c2904ed7ee531eff0d20>

This PR enables Java 8 time API in thriftserver, so that we use the session
timezone more consistently.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#55spark-31935sql-hadoop-file-system-config-should-be-effective-in-data-source-options-52--7>[2.4][SPARK-31935][SQL]
Hadoop file system config should be effective in data source options (+52,
-7)>
<https://github.com/apache/spark/commit/f3771c6b47d0b3aef10b86586289a1f675c7cfe2>

This PR fixes a bug that the hadoop configs in read/write options are not
respected in data source V1.
[API][2.4][SPARK-31968][SQL] Duplicate partition columns check when writing
data (+12, -1)>
<https://github.com/apache/spark/commit/a4ea599b1b9b8ebaae0100b54e6ac1d7576c6d8c>

Add a check for duplicate partition columns when writing built-in file
sources. After the change, when the DataFrame has duplicate partition
columns, the users get an AnalysisException when writing it. Previously,
the writing would succeed, but reading the files with duplicate columns
will fail.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#api71spark-26905sql-add-type-in-the-ansi-non-reserved-list-2--0>[API][3.0][SPARK-26905][SQL]
Add TYPE in the ANSI non-reserved list (+2, -0)>
<https://github.com/apache/spark/commit/e14029b18df10db5094f8abf8b9874dbc9186b4e>

Add TYPE in the ANSI non-reserved list to follow the ANSI/SQL standard. The
change impacts the behavior only when ANSI mode is on
(spark.sql.ansi.enabled=true)
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#api71spark-26905sql-follow-the-sql2016-reserved-keywords-429--5>[API][3.0][SPARK-26905][SQL]
Follow the SQL:2016 reserved keywords (+429, -5)>
<https://github.com/apache/spark/commit/3698a14204dd861ea3ee3c14aa923123b52caba1>

Move keywords ANTI, SEMI, and MINUS from reserved to non-reserved to comply
with the ANSI/SQL standard. The change impacts the behavior only when ANSI
mode is on (spark.sql.ansi.enabled=true)
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#api70spark-31939sqltest-java11-fix-parsing-day-of-year-when-year-field-pattern-is-missing-465--3>[API][3.0][SPARK-31939][SQL][TEST-JAVA11]
Fix Parsing day of year when year field pattern is missing (+465, -3)>
<https://github.com/apache/spark/commit/22dda6e18e91c6db6fa8ff9fafaafe09a79db4ea>

When a datetime pattern does not contain a year field (ie. 'yyyy') but
contains the day of year field (ie. 'DD'), Spark should still be able to
respect the datetime pattern and parse the constants.

Before the change:

spark-sql> select to_timestamp('31', 'DD');
1970-01-01 00:00:00
spark-sql> select to_timestamp('31 30', 'DD dd');
1970-01-30 00:00:00

After the change:

spark-sql> select to_timestamp('31', 'DD');
1970-01-31 00:00:00
spark-sql> select to_timestamp('31 30', 'DD dd');
NULL

<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#70spark-31956sql-do-not-fail-if-there-is-no-ambiguous-self-join-7--2>[3.0][SPARK-31956][SQL]
Do not fail if there is no ambiguous self join (+7, -2)>
<https://github.com/apache/spark/commit/c40051932290db3a63f80324900a116019b1e589>

df("col").as("name") is not a column reference anymore, and should not have
the special column metadata that is used to identify the root attribute
(e.g., Dataset ID and col position). This PR fixes the corresponding
regression that could cause a DataFrame could fail even when there is no
ambiguous self-join. Below is an example,

val joined = df.join(spark.range(1)).select($"a")
joined.select(joined("a").alias("x"), sum(joined("a")).over(w))

<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#70spark-31958sql-normalize-special-floating-numbers-in-subquery-18--4>[3.0][SPARK-31958][SQL]
normalize special floating numbers in subquery (+18, -4)>
<https://github.com/apache/spark/commit/6fb9c80da129d0b43f9ff5b8be6ce8bad992a4ed>

The PR fixes a bug that special floating numbers in non-correlated subquery
expressions are not handled, now the subquery expressions will be handled
by OptimizeSubqueries.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#api80spark-21117sql-built-in-sql-function-support---width_bucket-431--30>[API][3.1][SPARK-21117][SQL]
Built-in SQL Function Support - WIDTH_BUCKET (+431, -30)>
<https://github.com/apache/spark/commit/b1adc3deee00058cba669534aee156dc7af243dc>

Add a built-in SQL function WIDTH_BUCKET, that returns the bucket number to
which value would be assigned in an equiwidth histogram with
num_bucket buckets,
in the range min_value to max_value. Examples:

> SELECT WIDTH_BUCKET(5.3, 0.2, 10.6, 5);
3
> SELECT WIDTH_BUCKET(-2.1, 1.3, 3.4, 3);
0
> SELECT WIDTH_BUCKET(8.1, 0.0, 5.7, 4);
5
> SELECT WIDTH_BUCKET(-0.9, 5.2, 0.5, 2);
3

<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#80spark-27217sql-nested-column-aliasing-for-more-operators-which-can-prune-nested-column-190--10>[3.1][SPARK-27217][SQL]
Nested column aliasing for more operators which can prune nested column
(+190, -10)>
<https://github.com/apache/spark/commit/43063e2db2bf7469f985f1954d8615b95cf5c578>

Support nested column pruning from an Aggregate or Expand operator.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#80spark-27633sql-remove-redundant-aliases-in-nestedcolumnaliasing-43--1>[3.1][SPARK-27633][SQL]
Remove redundant aliases in NestedColumnAliasing (+43, -1)>
<https://github.com/apache/spark/commit/8282bbf12d4e174986a649023ce3984aae7d7755>

Avoid generating redundant aliases if the parent nested field is aliased in
the NestedColumnAliasing rule. This slightly improves the performance.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#80spark-31736sql-nested-column-aliasing-for-repartitionbyexpressionjoin-197--16>[3.1][SPARK-31736][SQL]
Nested column aliasing for RepartitionByExpression/Join (+197, -16)>
<https://github.com/apache/spark/commit/ff89b1114319e783eb4f4187bf2583e5e21c64e4>

Support nested column pruning from a RepartitionByExpression or Join
operator.
ML
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#80spark-31925ml-summarytotaliterations-greater-than-maxiters-43--12>[3.1][SPARK-31925][ML]
Summary.totalIterations greater than maxIters (+43, -12)>
<https://github.com/apache/spark/commit/f83cb3cbb3ce3f22fd122bce620917bfd0699ce7>

The PR fixes a correctness issue in LogisticRegression and
LinearRegression, that the actual round of training iterations was larger
by 1 than the specified maxIter.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#80spark-31944-add-instance-weight-support-in-linearregressionsummary-56--24>[3.1][SPARK-31944]
Add instance weight support in LinearRegressionSummary (+56, -24)>
<https://github.com/apache/spark/commit/89c98a4c7068734e322d335cb7c9f22379ff00e8>

The PR adds instance weight support in LinearRegressionSummary, instance
weight is already supported by LinearRegression and RegressionMetrics.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#ss>
SS
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#71spark-31593ss-remove-unnecessary-streaming-query-progress-update-58--7>[3.0][SPARK-31593][SS]
Remove unnecessary streaming query progress update (+58, -7)>
<https://github.com/apache/spark/commit/1e40bccf447dccad9d31bccc75d21b8fca77ba52>

The PR fixes a bug that sets incorrect metrics in Structured Streaming. We
should make a progress update every 10 seconds when a stream doesn't have
any new data upstream. Without the fix, we zero out the input information
but not the output information when making the progress update.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#70spark-31990ss-use-tosettoseq-in-datasetdropduplicates-3--1>[3.0][SPARK-31990][SS]
Use toSet.toSeq in Dataset.dropDuplicates (+3, -1)>
<https://github.com/apache/spark/commit/7f7b4dd5199e7c185aedf51fccc400c7072bed05>

The PR proposes to preserve the input order of colNames for groupCols in
Dataset.dropDuplicates, because the Streaming's state store depends on the
groupCols order.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#80spark-24634ss-add-a-new-metric-regarding-number-of-inputs-later-than-watermark-plus-allowed-delay-94--29>[3.1][SPARK-24634][SS]
Add a new metric regarding number of inputs later than watermark plus
allowed delay (+94, -29)>
<https://github.com/apache/spark/commit/84815d05503460d58b85be52421d5923474aa08b>

Add a new metrics numLateInputs to count the number of inputs which are
later than watermark ('inputs' are relative to operators). The new metrics
will be provided both on the SparkUI - SQL Tab - query execution details
page, and on the Streaming Query Listener.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#python>
PYTHON
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#api70spark-31895pythonsql-support-dataframeexplainextended-str-case-to-be-consistent-with-scala-side-24--11>[API][3.0][SPARK-31895][PYTHON][SQL]
Support DataFrame.explain(extended: str) case to be consistent with Scala
side (+24, -11)>
<https://github.com/apache/spark/commit/e1d52011401c1989f26b230eb8c82adc63e147e7>

Improves DataFrame.explain in PySpark, so that it takes the explain mode
string as well, which is consistent with the Scala API.
[3.0][SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per
the case sensitivity in grouped and cogrouped pandas UDFs (+37, -8)>
<https://github.com/apache/spark/commit/00d06cad564d5e3e5f78a687776d02fe0695a861>

The PR proposes to resolve grouping attributes separately first so it can
be properly referred to when FlatMapGroupsInPandas and
FlatMapCoGroupsInPandas are resolved without ambiguity. Example:

from pyspark.sql.functions import *df = spark.createDataFrame([[1,
1]], ["column", "Score"])pandas_udf("column integer, Score float",
PandasUDFType.GROUPED_MAP)def my_pandas_udf(pdf):
    return pdf.assign(Score=0.5)
df.groupby('COLUMN').apply(my_pandas_udf).show()

df1 = spark.createDataFrame([(1, 1)], ("column", "value"))df2 =
spark.createDataFrame([(1, 1)], ("column", "value"))
df1.groupby("COLUMN").cogroup(
    df2.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, df1.schema).show()

Before:

pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous,
could be: COLUMN, COLUMN.;

pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given
input columns: [COLUMN, COLUMN, value, value];;
'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L,
value#10L, column#13L, value#14L), [column#22L, value#23L]
:- Project [COLUMN#9L, column#9L, value#10L]
:  +- LogicalRDD [column#9L, value#10L], false
+- Project [COLUMN#13L, column#13L, value#14L]
   +- LogicalRDD [column#13L, value#14L], false

After:

+------+-----+
|column|Score|
+------+-----+
|     1|  0.5|
+------+-----+

+------+-----+
|column|value|
+------+-----+
|     2|    2|
+------+-----+

<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#80spark-31945sqlpyspark-enable-cache-for-the-same-python-function-25--4>[3.1][SPARK-31945][SQL][PYSPARK]
Enable cache for the same Python function (+25, -4)>
<https://github.com/apache/spark/commit/032d17933b4009ed8a9d70585434ccdbf4d1d7df>

This PR proposes to make PythonFunction hold Seq[Byte] instead of
Array[Byte]. After the change, it can compare if the byte array has the
same values. With the proposed change, the cache manager will detect the
same function and use the cache for it if it exists.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#80spark-31964python-use-pandas-is_categorical-on-arrow-category-type-conversion-2--5>[3.1][SPARK-31964][PYTHON]
Use Pandas is_categorical on Arrow category type conversion (+2, -5)>
<https://github.com/apache/spark/commit/b7ef5294f17d54e7d90e36a4be02e8bd67200144>

When using PyArrow to convert a Pandas categorical column, use
is_categorical instead of trying to import CategoricalDtype, because the
former is a more stable API.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#ui>
UI
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#70spark-31903sqlpysparkr-fix-topandas-with-arrow-enabled-to-show-metrics-in-query-ui-4--4>[3.0][SPARK-31903][SQL][PYSPARK][R]
Fix toPandas with Arrow enabled to show metrics in Query UI (+4, -4)>
<https://github.com/apache/spark/commit/632b5bce23c94d25712b43be83252b34ebfd3e72>

In Dataset.collectAsArrowToR and Dataset.collectAsArrowToPython, since the
code block for serveToStream is run in the separate thread, withAction finishes
as soon as it starts the thread. As a result, it doesn't collect the
metrics of the actual action and Query UI shows the plan graph without
metrics. This PR fixes the issue.

The affected functions are:

   - collect() in SparkR
   - DataFrame.toPandas() in PySpark

<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#70spark-31886webui-fix-the-wrong-coloring-of-nodes-in-dag-viz-33--3>[3.0][SPARK-31886][WEBUI]
Fix the wrong coloring of nodes in DAG-viz (+33, -3)>
<https://github.com/apache/spark/commit/8ed93c9355bc2af6fe456d88aa693c8db69d0bbf>

In the Job Page and Stage Page, nodes which are associated with "barrier
mode" in the DAG-viz will be colored pale green. But, with some types of
jobs, nodes which are not associated with the mode will also be colored.
This PR fixes it.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-3-~-June-9,-2020#80spark-29431webui-improve-web-ui--sql-tab-visualization-with-cached-dataframes-46--0>[3.1][SPARK-29431][WEBUI]
Improve Web UI / Sql tab visualization with cached dataframes (+46, -0)>
<https://github.com/apache/spark/commit/e4db3b5b1742b4bdfa32937273e5d07a76cde79b>

Display the query plan of cached DataFrames as well in the web UI.
[2.4][SPARK-31967][UI] Downgrade to vis.js 4.21.0 to fix Jobs UI loading
time regression (+49, -86)>
<https://github.com/apache/spark/commit/f535004e14b197ceb1f2108a67b033c052d65bcb>

Fix the serious perf issue in web UI by falling back vis-timeline-graph2d
to 4.21.0.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#80spark-30119webui-support-pagination-for-streaming-tab-259--178>[3.1][SPARK-30119][WEBUI]
Support pagination for streaming tab (+259, -178)>
<https://github.com/apache/spark/commit/9b098f1eb91a5e9f488d573bfeea3f6bfd9b95b3>

The PR adds pagination support for the streaming tab.
<https://github.com/databricks/runtime/wiki/OSS-Digest-June-10-~-June-16,-2020#80spark-31642followup-fix-sorting-for-duration-column-and-make-status-column-sortable-7--6>[3.1][SPARK-31642][FOLLOWUP]
Fix Sorting for duration column and make Status column sortable (+7, -6)>
<https://github.com/apache/spark/commit/f5f6eee3045e90e02fc7e999f616b5a021d7c724>

The PR improves the pagination support in the streaming job, by fixing the
wrong sorting result and making Status column sortable.

Reply via email to