Hi Jingsong,

thanks for your answer. Even if the source implements
SupportsProjectionPushDown, #applyProjections will never be called with
projections for metadata columns. For example, I have the following test:

@Test
def test(): Unit = {
  val tableId = TestValuesTableFactory.registerData(Seq())

  tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values")
    .schema(Schema.newBuilder()
      .column("f0", DataTypes.INT())
      .columnByMetadata("m1", DataTypes.STRING())
      .columnByMetadata("m2", DataTypes.STRING())
      .build())
    .option("data-id", tableId)
    .option("bounded", "true")
    .option("readable-metadata", "m1:STRING,m2:STRING")
    .build())

  tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
}

Regardless of whether I select only f0 or f0 + m1, #applyReadableMetadata
is always called with m1 + m2, and #applyProjections only ever sees f0. So
as far as I can tell, the source has no way of knowing which metadata
columns are actually needed (under the projection), it always has to
produce metadata for all metadata columns declared in the table's schema.

In PushProjectIntoTableSourceScanRule I also haven't yet found anything
that would suggest that metadata are first projected and only then pushed
to the source. I think the correct behavior should be to call
#applyReadableMetadata only after they have been considered in the
projection.


Best
Ingo


On Mon, Aug 23, 2021 at 5:05 AM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi,
>
> I remember the projection only works with SupportsProjectionPushDown.
>
> You can take a look at
> `PushProjectIntoTableSourceScanRuleTest.testNestProjectWithMetadata`.
>
> Will applyReadableMetadata again in the PushProjectIntoTableSourceScanRule.
>
> But there may be bug in
> PushProjectIntoTableSourceScanRule.applyPhysicalAndMetadataPushDown:
>
> if (!usedMetadataNames.isEmpty()) {
>     sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames,
> newProducedType));
> }
>
> If there is no meta column left, we should apply again, We should tell
> the source that there is no meta column left after projection.
>
> Best,
> Jingsong
>
> On Fri, Aug 20, 2021 at 7:56 PM Ingo Bürk <i...@ververica.com> wrote:
> >
> > Hi everyone,
> >
> > according to the SupportsReadableMetadata interface, the planner is
> > supposed to project required metadata columns prior to applying them:
> >
> > > The planner will select required metadata columns (i.e. perform
> > projection push down) and will call applyReadableMetadata(List, DataType)
> > with a list of metadata keys.
> >
> > However, from my experiments it seems that this is not true: regardless
> of
> > what columns I select from a table, #applyReadableMetadata always seems
> to
> > be called with all metadata declared in the schema of the table. Metadata
> > columns are also excluded from
> SupportsProjectionPushDown#applyProjection,
> > so the source cannot perform the projection either.
> >
> > This is in Flink 1.13.2. Am I misreading the docs here or is this not
> > working as intended?
> >
> >
> > Best
> > Ingo
>
>
>
> --
> Best, Jingsong Lee
>

Reply via email to