Hi Till, it actually seems to be two issues, one is a bug and one would be an improvement (it never worked before). I'll split the tickets later. I'll raise the bug one to Critical since I want to fix it for 1.14, but it was also broken in 1.13 already, so it shouldn't be a blocker.
Best Ingo On Mon, Aug 23, 2021 at 11:43 AM Till Rohrmann <trohrm...@apache.org> wrote: > Does this also affect Flink 1.14.0? If yes, do we want to fix this issue > for the upcoming release? If yes, then please make this issue a blocker or > at least critical. > > Cheers, > Till > > On Mon, Aug 23, 2021 at 8:39 AM Ingo Bürk <i...@ververica.com> wrote: > > > Thanks Timo for the confirmation. I've also raised FLINK-23911[1] for > this. > > > > [1] https://issues.apache.org/jira/browse/FLINK-23911 > > > > > > Best > > Ingo > > > > On Mon, Aug 23, 2021 at 8:34 AM Timo Walther <twal...@apache.org> wrote: > > > > > Hi everyone, > > > > > > this sounds definitely like a bug to me. Computing metadata might be > > > very expensive and a connector might expose a long list of metadata > > > keys. It was therefore intended to project the metadata if possible. > I'm > > > pretty sure that this worked before (at least when implementing > > > SupportsProjectionPushDown). Maybe a bug was introduced when adding the > > > Spec support. > > > > > > Regards, > > > Timo > > > > > > > > > On 23.08.21 08:24, Ingo Bürk wrote: > > > > Hi Jingsong, > > > > > > > > thanks for your answer. Even if the source implements > > > > SupportsProjectionPushDown, #applyProjections will never be called > with > > > > projections for metadata columns. For example, I have the following > > test: > > > > > > > > @Test > > > > def test(): Unit = { > > > > val tableId = TestValuesTableFactory.registerData(Seq()) > > > > > > > > tEnv.createTemporaryTable("T", > > TableDescriptor.forConnector("values") > > > > .schema(Schema.newBuilder() > > > > .column("f0", DataTypes.INT()) > > > > .columnByMetadata("m1", DataTypes.STRING()) > > > > .columnByMetadata("m2", DataTypes.STRING()) > > > > .build()) > > > > .option("data-id", tableId) > > > > .option("bounded", "true") > > > > .option("readable-metadata", "m1:STRING,m2:STRING") > > > > .build()) > > > > > > > > tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList > > > > } > > > > > > > > Regardless of whether I select only f0 or f0 + m1, > > #applyReadableMetadata > > > > is always called with m1 + m2, and #applyProjections only ever sees > f0. > > > So > > > > as far as I can tell, the source has no way of knowing which metadata > > > > columns are actually needed (under the projection), it always has to > > > > produce metadata for all metadata columns declared in the table's > > schema. > > > > > > > > In PushProjectIntoTableSourceScanRule I also haven't yet found > anything > > > > that would suggest that metadata are first projected and only then > > pushed > > > > to the source. I think the correct behavior should be to call > > > > #applyReadableMetadata only after they have been considered in the > > > > projection. > > > > > > > > > > > > Best > > > > Ingo > > > > > > > > > > > > On Mon, Aug 23, 2021 at 5:05 AM Jingsong Li <jingsongl...@gmail.com> > > > wrote: > > > > > > > >> Hi, > > > >> > > > >> I remember the projection only works with > SupportsProjectionPushDown. > > > >> > > > >> You can take a look at > > > >> > `PushProjectIntoTableSourceScanRuleTest.testNestProjectWithMetadata`. > > > >> > > > >> Will applyReadableMetadata again in the > > > PushProjectIntoTableSourceScanRule. > > > >> > > > >> But there may be bug in > > > >> PushProjectIntoTableSourceScanRule.applyPhysicalAndMetadataPushDown: > > > >> > > > >> if (!usedMetadataNames.isEmpty()) { > > > >> sourceAbilitySpecs.add(new > ReadingMetadataSpec(usedMetadataNames, > > > >> newProducedType)); > > > >> } > > > >> > > > >> If there is no meta column left, we should apply again, We should > tell > > > >> the source that there is no meta column left after projection. > > > >> > > > >> Best, > > > >> Jingsong > > > >> > > > >> On Fri, Aug 20, 2021 at 7:56 PM Ingo Bürk <i...@ververica.com> > wrote: > > > >>> > > > >>> Hi everyone, > > > >>> > > > >>> according to the SupportsReadableMetadata interface, the planner is > > > >>> supposed to project required metadata columns prior to applying > them: > > > >>> > > > >>>> The planner will select required metadata columns (i.e. perform > > > >>> projection push down) and will call applyReadableMetadata(List, > > > DataType) > > > >>> with a list of metadata keys. > > > >>> > > > >>> However, from my experiments it seems that this is not true: > > regardless > > > >> of > > > >>> what columns I select from a table, #applyReadableMetadata always > > seems > > > >> to > > > >>> be called with all metadata declared in the schema of the table. > > > Metadata > > > >>> columns are also excluded from > > > >> SupportsProjectionPushDown#applyProjection, > > > >>> so the source cannot perform the projection either. > > > >>> > > > >>> This is in Flink 1.13.2. Am I misreading the docs here or is this > not > > > >>> working as intended? > > > >>> > > > >>> > > > >>> Best > > > >>> Ingo > > > >> > > > >> > > > >> > > > >> -- > > > >> Best, Jingsong Lee > > > >> > > > > > > > > > > > > >