[ 
https://issues.apache.org/jira/browse/SPARK-56975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

You Zhou updated SPARK-56975:
-----------------------------
    Description: 
{{DataStreamReader.table()}} accepts a user-specified schema without complaint 
and then silently ignores it. Catalog tables declare their own schema, and 
{{TableCatalog.loadTable(Identifier)}} has no parameter to receive one, so a 
{{.schema(...)}} call before {{.table(...)}} is always a misconfiguration:

{code:scala}
spark.readStream
  .schema(new StructType().add("a", IntegerType))
  .table("some_table")   // no error; the schema has no effect
{code}

Users can write {{readStream.schema(s).table(name)}}, see a working query, and 
reasonably assume {{s}} took effect -- when in fact {{s}} is dropped and the 
catalog schema is used. The rest of {{DataStreamReader}} already surfaces this 
kind of misconfiguration: {{.load()}} throws {{_LEGACY_ERROR_TEMP_2242}} for 
providers without {{supportsExternalMetadata()}}, and {{.changes()}} calls 
{{assertNoSpecifiedSchema("changes")}}. {{.table()}} is the odd one out.

This fix makes {{.table()}} reject a user-specified schema in *both* 
{{DataStreamReader}} implementations:

* *Classic* ({{sql/core/.../classic/DataStreamReader.scala}}).
* *Spark Connect* ({{sql/connect/common/.../connect/DataStreamReader.scala}}), 
where the same bug exists -- {{schema()}} populates the read builder but 
{{table()}} forwards only options and drops the schema. This keeps classic and 
Connect consistent.

The rejection is gated behind a new config 
{{spark.sql.streaming.disallowUserSpecifiedSchemaInTable.enabled}} (default 
{{true}}) and raises {{STREAMING_USER_SPECIFIED_SCHEMA_NOT_ALLOWED_IN_TABLE}}, 
whose message names the config so users can opt out from the error itself. 
Setting it to {{false}} restores the previous silent-ignore behavior, letting 
affected queries start without code changes.

On Spark Connect, enforcement runs on the client (the server builds the plan 
from the proto and never calls {{table()}}), so the client reads the config 
from the server via {{sparkSession.conf.get(...)}} before deciding whether to 
throw -- keeping a single, server-controlled source of truth for the flag and 
avoiding breakage for existing Connect workloads.

  was:
{{DataStreamReader.table()}} accepts a user-specified schema without complaint 
and then silently ignores it:

{{spark.readStream}}
{{.schema(new StructType().add("a", IntegerType))}}
{{.table("some_table") // no error; the schema has no effect}}

User-specified schema is not a meaningful input to {{.table()}} — catalog 
tables declare their own schema, and {{TableCatalog.loadTable(Identifier)}} has 
no parameter to receive a user schema, so even if Spark wanted to forward one 
it couldn't. The user's {{.schema(...)}} call is therefore always a 
misconfiguration.

The rest of {{DataStreamReader}} already surfaces this kind of misconfiguration 
as a clear error:
 - {{.load()}} goes through {{{}DataSourceV2Utils.getTableFromProvider{}}}, 
which throws {{_LEGACY_ERROR_TEMP_2242}} ("{{{}<provider>{}}} source does not 
support user-specified schema") when the provider does not implement 
{{{}supportsExternalMetadata(){}}}.
 - {{.changes()}} explicitly calls {{assertNoSpecifiedSchema("changes")}} and 
throws {{_LEGACY_ERROR_TEMP_1189}} ("User specified schema not supported with 
{{{}changes{}}}.").

{{.table()}} is the odd one out: same invalid configuration, no error. Users 
can write {{{}readStream.schema(s).table(name){}}}, see a working query, and 
reasonably assume {{s}} had an effect — when in fact the resulting stream uses 
the catalog schema and {{s}} was dropped. Surfacing this as a clear error 
aligns {{.table()}} with the existing behavior of {{.load()}} and 
{{.changes().}}


> DataStreamReader.table() should reject user-specified schema instead of 
> silently ignoring it
> --------------------------------------------------------------------------------------------
>
>                 Key: SPARK-56975
>                 URL: https://issues.apache.org/jira/browse/SPARK-56975
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 4.2.0
>            Reporter: You Zhou
>            Priority: Trivial
>              Labels: pull-request-available
>
> {{DataStreamReader.table()}} accepts a user-specified schema without 
> complaint and then silently ignores it. Catalog tables declare their own 
> schema, and {{TableCatalog.loadTable(Identifier)}} has no parameter to 
> receive one, so a {{.schema(...)}} call before {{.table(...)}} is always a 
> misconfiguration:
> {code:scala}
> spark.readStream
>   .schema(new StructType().add("a", IntegerType))
>   .table("some_table")   // no error; the schema has no effect
> {code}
> Users can write {{readStream.schema(s).table(name)}}, see a working query, 
> and reasonably assume {{s}} took effect -- when in fact {{s}} is dropped and 
> the catalog schema is used. The rest of {{DataStreamReader}} already surfaces 
> this kind of misconfiguration: {{.load()}} throws {{_LEGACY_ERROR_TEMP_2242}} 
> for providers without {{supportsExternalMetadata()}}, and {{.changes()}} 
> calls {{assertNoSpecifiedSchema("changes")}}. {{.table()}} is the odd one out.
> This fix makes {{.table()}} reject a user-specified schema in *both* 
> {{DataStreamReader}} implementations:
> * *Classic* ({{sql/core/.../classic/DataStreamReader.scala}}).
> * *Spark Connect* 
> ({{sql/connect/common/.../connect/DataStreamReader.scala}}), where the same 
> bug exists -- {{schema()}} populates the read builder but {{table()}} 
> forwards only options and drops the schema. This keeps classic and Connect 
> consistent.
> The rejection is gated behind a new config 
> {{spark.sql.streaming.disallowUserSpecifiedSchemaInTable.enabled}} (default 
> {{true}}) and raises 
> {{STREAMING_USER_SPECIFIED_SCHEMA_NOT_ALLOWED_IN_TABLE}}, whose message names 
> the config so users can opt out from the error itself. Setting it to 
> {{false}} restores the previous silent-ignore behavior, letting affected 
> queries start without code changes.
> On Spark Connect, enforcement runs on the client (the server builds the plan 
> from the proto and never calls {{table()}}), so the client reads the config 
> from the server via {{sparkSession.conf.get(...)}} before deciding whether to 
> throw -- keeping a single, server-controlled source of truth for the flag and 
> avoiding breakage for existing Connect workloads.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to