[
https://issues.apache.org/jira/browse/SPARK-56975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
You Zhou updated SPARK-56975:
-----------------------------
Description:
{{DataStreamReader.table()}} accepts a user-specified schema without complaint
and then silently ignores it. Catalog tables declare their own schema, and
{{TableCatalog.loadTable(Identifier)}} has no parameter to receive one, so a
{{.schema(...)}} call before {{.table(...)}} is always a misconfiguration:
{code:scala}
spark.readStream
.schema(new StructType().add("a", IntegerType))
.table("some_table") // no error; the schema has no effect
{code}
Users can write {{readStream.schema(s).table(name)}}, see a working query, and
reasonably assume {{s}} took effect -- when in fact {{s}} is dropped and the
catalog schema is used. The rest of {{DataStreamReader}} already surfaces this
kind of misconfiguration: {{.load()}} throws {{_LEGACY_ERROR_TEMP_2242}} for
providers without {{supportsExternalMetadata()}}, and {{.changes()}} calls
{{assertNoSpecifiedSchema("changes")}}. {{.table()}} is the odd one out.
This fix makes {{.table()}} reject a user-specified schema in *both*
{{DataStreamReader}} implementations:
* *Classic* ({{sql/core/.../classic/DataStreamReader.scala}}).
* *Spark Connect* ({{sql/connect/common/.../connect/DataStreamReader.scala}}),
where the same bug exists -- {{schema()}} populates the read builder but
{{table()}} forwards only options and drops the schema. This keeps classic and
Connect consistent.
The rejection is gated behind a new config
{{spark.sql.streaming.disallowUserSpecifiedSchemaInTable.enabled}} (default
{{true}}) and raises {{STREAMING_USER_SPECIFIED_SCHEMA_NOT_ALLOWED_IN_TABLE}},
whose message names the config so users can opt out from the error itself.
Setting it to {{false}} restores the previous silent-ignore behavior, letting
affected queries start without code changes.
On Spark Connect, enforcement runs on the client (the server builds the plan
from the proto and never calls {{table()}}), so the client reads the config
from the server via {{sparkSession.conf.get(...)}} before deciding whether to
throw -- keeping a single, server-controlled source of truth for the flag and
avoiding breakage for existing Connect workloads.
was:
{{DataStreamReader.table()}} accepts a user-specified schema without complaint
and then silently ignores it:
{{spark.readStream}}
{{.schema(new StructType().add("a", IntegerType))}}
{{.table("some_table") // no error; the schema has no effect}}
User-specified schema is not a meaningful input to {{.table()}} — catalog
tables declare their own schema, and {{TableCatalog.loadTable(Identifier)}} has
no parameter to receive a user schema, so even if Spark wanted to forward one
it couldn't. The user's {{.schema(...)}} call is therefore always a
misconfiguration.
The rest of {{DataStreamReader}} already surfaces this kind of misconfiguration
as a clear error:
- {{.load()}} goes through {{{}DataSourceV2Utils.getTableFromProvider{}}},
which throws {{_LEGACY_ERROR_TEMP_2242}} ("{{{}<provider>{}}} source does not
support user-specified schema") when the provider does not implement
{{{}supportsExternalMetadata(){}}}.
- {{.changes()}} explicitly calls {{assertNoSpecifiedSchema("changes")}} and
throws {{_LEGACY_ERROR_TEMP_1189}} ("User specified schema not supported with
{{{}changes{}}}.").
{{.table()}} is the odd one out: same invalid configuration, no error. Users
can write {{{}readStream.schema(s).table(name){}}}, see a working query, and
reasonably assume {{s}} had an effect — when in fact the resulting stream uses
the catalog schema and {{s}} was dropped. Surfacing this as a clear error
aligns {{.table()}} with the existing behavior of {{.load()}} and
{{.changes().}}
> DataStreamReader.table() should reject user-specified schema instead of
> silently ignoring it
> --------------------------------------------------------------------------------------------
>
> Key: SPARK-56975
> URL: https://issues.apache.org/jira/browse/SPARK-56975
> Project: Spark
> Issue Type: Improvement
> Components: Structured Streaming
> Affects Versions: 4.2.0
> Reporter: You Zhou
> Priority: Trivial
> Labels: pull-request-available
>
> {{DataStreamReader.table()}} accepts a user-specified schema without
> complaint and then silently ignores it. Catalog tables declare their own
> schema, and {{TableCatalog.loadTable(Identifier)}} has no parameter to
> receive one, so a {{.schema(...)}} call before {{.table(...)}} is always a
> misconfiguration:
> {code:scala}
> spark.readStream
> .schema(new StructType().add("a", IntegerType))
> .table("some_table") // no error; the schema has no effect
> {code}
> Users can write {{readStream.schema(s).table(name)}}, see a working query,
> and reasonably assume {{s}} took effect -- when in fact {{s}} is dropped and
> the catalog schema is used. The rest of {{DataStreamReader}} already surfaces
> this kind of misconfiguration: {{.load()}} throws {{_LEGACY_ERROR_TEMP_2242}}
> for providers without {{supportsExternalMetadata()}}, and {{.changes()}}
> calls {{assertNoSpecifiedSchema("changes")}}. {{.table()}} is the odd one out.
> This fix makes {{.table()}} reject a user-specified schema in *both*
> {{DataStreamReader}} implementations:
> * *Classic* ({{sql/core/.../classic/DataStreamReader.scala}}).
> * *Spark Connect*
> ({{sql/connect/common/.../connect/DataStreamReader.scala}}), where the same
> bug exists -- {{schema()}} populates the read builder but {{table()}}
> forwards only options and drops the schema. This keeps classic and Connect
> consistent.
> The rejection is gated behind a new config
> {{spark.sql.streaming.disallowUserSpecifiedSchemaInTable.enabled}} (default
> {{true}}) and raises
> {{STREAMING_USER_SPECIFIED_SCHEMA_NOT_ALLOWED_IN_TABLE}}, whose message names
> the config so users can opt out from the error itself. Setting it to
> {{false}} restores the previous silent-ignore behavior, letting affected
> queries start without code changes.
> On Spark Connect, enforcement runs on the client (the server builds the plan
> from the proto and never calls {{table()}}), so the client reads the config
> from the server via {{sparkSession.conf.get(...)}} before deciding whether to
> throw -- keeping a single, server-controlled source of truth for the flag and
> avoiding breakage for existing Connect workloads.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]