Hi Till,

it actually seems to be two issues, one is a bug and one would be an
improvement (it never worked before). I'll split the tickets later. I'll
raise the bug one to Critical since I want to fix it for 1.14, but it was
also broken in 1.13 already, so it shouldn't be a blocker.


Best
Ingo

On Mon, Aug 23, 2021 at 11:43 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Does this also affect Flink 1.14.0? If yes, do we want to fix this issue
> for the upcoming release? If yes, then please make this issue a blocker or
> at least critical.
>
> Cheers,
> Till
>
> On Mon, Aug 23, 2021 at 8:39 AM Ingo Bürk <i...@ververica.com> wrote:
>
> > Thanks Timo for the confirmation. I've also raised FLINK-23911[1] for
> this.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-23911
> >
> >
> > Best
> > Ingo
> >
> > On Mon, Aug 23, 2021 at 8:34 AM Timo Walther <twal...@apache.org> wrote:
> >
> > > Hi everyone,
> > >
> > > this sounds definitely like a bug to me. Computing metadata might be
> > > very expensive and a connector might expose a long list of metadata
> > > keys. It was therefore intended to project the metadata if possible.
> I'm
> > > pretty sure that this worked before (at least when implementing
> > > SupportsProjectionPushDown). Maybe a bug was introduced when adding the
> > > Spec support.
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 23.08.21 08:24, Ingo Bürk wrote:
> > > > Hi Jingsong,
> > > >
> > > > thanks for your answer. Even if the source implements
> > > > SupportsProjectionPushDown, #applyProjections will never be called
> with
> > > > projections for metadata columns. For example, I have the following
> > test:
> > > >
> > > > @Test
> > > > def test(): Unit = {
> > > >    val tableId = TestValuesTableFactory.registerData(Seq())
> > > >
> > > >    tEnv.createTemporaryTable("T",
> > TableDescriptor.forConnector("values")
> > > >      .schema(Schema.newBuilder()
> > > >        .column("f0", DataTypes.INT())
> > > >        .columnByMetadata("m1", DataTypes.STRING())
> > > >        .columnByMetadata("m2", DataTypes.STRING())
> > > >        .build())
> > > >      .option("data-id", tableId)
> > > >      .option("bounded", "true")
> > > >      .option("readable-metadata", "m1:STRING,m2:STRING")
> > > >      .build())
> > > >
> > > >    tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
> > > > }
> > > >
> > > > Regardless of whether I select only f0 or f0 + m1,
> > #applyReadableMetadata
> > > > is always called with m1 + m2, and #applyProjections only ever sees
> f0.
> > > So
> > > > as far as I can tell, the source has no way of knowing which metadata
> > > > columns are actually needed (under the projection), it always has to
> > > > produce metadata for all metadata columns declared in the table's
> > schema.
> > > >
> > > > In PushProjectIntoTableSourceScanRule I also haven't yet found
> anything
> > > > that would suggest that metadata are first projected and only then
> > pushed
> > > > to the source. I think the correct behavior should be to call
> > > > #applyReadableMetadata only after they have been considered in the
> > > > projection.
> > > >
> > > >
> > > > Best
> > > > Ingo
> > > >
> > > >
> > > > On Mon, Aug 23, 2021 at 5:05 AM Jingsong Li <jingsongl...@gmail.com>
> > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> I remember the projection only works with
> SupportsProjectionPushDown.
> > > >>
> > > >> You can take a look at
> > > >>
> `PushProjectIntoTableSourceScanRuleTest.testNestProjectWithMetadata`.
> > > >>
> > > >> Will applyReadableMetadata again in the
> > > PushProjectIntoTableSourceScanRule.
> > > >>
> > > >> But there may be bug in
> > > >> PushProjectIntoTableSourceScanRule.applyPhysicalAndMetadataPushDown:
> > > >>
> > > >> if (!usedMetadataNames.isEmpty()) {
> > > >>      sourceAbilitySpecs.add(new
> ReadingMetadataSpec(usedMetadataNames,
> > > >> newProducedType));
> > > >> }
> > > >>
> > > >> If there is no meta column left, we should apply again, We should
> tell
> > > >> the source that there is no meta column left after projection.
> > > >>
> > > >> Best,
> > > >> Jingsong
> > > >>
> > > >> On Fri, Aug 20, 2021 at 7:56 PM Ingo Bürk <i...@ververica.com>
> wrote:
> > > >>>
> > > >>> Hi everyone,
> > > >>>
> > > >>> according to the SupportsReadableMetadata interface, the planner is
> > > >>> supposed to project required metadata columns prior to applying
> them:
> > > >>>
> > > >>>> The planner will select required metadata columns (i.e. perform
> > > >>> projection push down) and will call applyReadableMetadata(List,
> > > DataType)
> > > >>> with a list of metadata keys.
> > > >>>
> > > >>> However, from my experiments it seems that this is not true:
> > regardless
> > > >> of
> > > >>> what columns I select from a table, #applyReadableMetadata always
> > seems
> > > >> to
> > > >>> be called with all metadata declared in the schema of the table.
> > > Metadata
> > > >>> columns are also excluded from
> > > >> SupportsProjectionPushDown#applyProjection,
> > > >>> so the source cannot perform the projection either.
> > > >>>
> > > >>> This is in Flink 1.13.2. Am I misreading the docs here or is this
> not
> > > >>> working as intended?
> > > >>>
> > > >>>
> > > >>> Best
> > > >>> Ingo
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> Best, Jingsong Lee
> > > >>
> > > >
> > >
> > >
> >
>

Reply via email to