To make sure we are on the same page.

The end goal is to have the

CatalogTable#getTableSchema/TableSource#getTableSchema return a schema
that is compatible with TableSource#getProducedDataType.

If you want to use the new types, you should not implement the
TableSource#getReturnType. Moreover you should also not use any Flink
utilities that convert from TypeInformation to DataTypes as those
produce legacy types.

I am aware there is a lot of corner cases and we worked hard to improve
the situation with the new sources and sinks interfaces.

Below I add an example how you could pass different array types:

|StreamExecutionEnvironment exec =
StreamExecutionEnvironment.getExecutionEnvironment();||
||StreamTableEnvironment tEnv = StreamTableEnvironment.create(exec);||
||tEnv.registerTableSource(||
||    "T",||
||    new StreamTableSource<Row>() {||
||        @Override||
||        public TableSchema getTableSchema() {||
||            return TableSchema.builder()||
||                .field("f0",
DataTypes.ARRAY(DataTypes.BIGINT().notNull()))||
||                .field("f1", DataTypes.ARRAY(DataTypes.BIGINT()))||
||                .field("f2", DataTypes.ARRAY(DataTypes.STRING()))||
||                .build();||
||        }|||

|        @Override
        public DataStream<Row> getDataStream(StreamExecutionEnvironment
execEnv) {
            return execEnv.fromCollection(
                Arrays.asList(Row.of(new long[]{1}, new Long[]{new
Long(1)}, new String[]{"ABCDE"})),
                // this is necessary for STRING array, cause otherwise
DataStream produces a different
                // TypeInformation than the planner expects
                (TypeInformation<Row>)
TypeConversions.fromDataTypeToLegacyInfo(getProducedDataType())
            );
        }||
||
||        @Override||
||        public DataType getProducedDataType() {||
||            return DataTypes.ROW(||
||                DataTypes.FIELD(||
||                    "f0",||
||                   
DataTypes.ARRAY(DataTypes.BIGINT().notNull().bridgedTo(long.class))||
||                        .bridgedTo(long[].class)),||
||                DataTypes.FIELD(||
||                    "f1",||
||                    DataTypes.ARRAY(DataTypes.BIGINT())),||
||                DataTypes.FIELD(||
||                    "f2",||
||                    DataTypes.ARRAY(DataTypes.STRING()))||
||                );||
||        }||
||    });||
||
||Table table = tEnv.sqlQuery("SELECT f0, f1, f2 FROM T");||
||DataStream<Row> result = tEnv.toAppendStream(||
||    table,||
||    Types.ROW(||
||        Types.PRIMITIVE_ARRAY(Types.LONG),||
||        ObjectArrayTypeInfo.getInfoFor(Types.LONG),||
||        ObjectArrayTypeInfo.getInfoFor(Types.STRING)));||
||result.print();||
||env.execute();|

Hope this will help and that it will be much easier in Flink 1.11

Best,

Dawid

On 05/06/2020 13:33, Ramana Uppala wrote:
> Hi Dawid,
>
> We are using a custom connector that is very similar to Flink Kafka
> Connector and  instantiating TableSchema using a custom class which
> maps Avro types to Flink's DataTypes using TableSchema.Builder.
>
> For Array type, we have below mapping:
>
>  case ARRAY:
>                 return
> DataTypes.ARRAY(toFlinkType(schema.getElementType()));
>
>
> We are using Hive Catalog and creating tables
> using CatalogTableImpl with TableSchema.
>
> As you mentioned, if we create TableSchema with legacy types, our
> connectors works without any issues. But, we want to use the new Flink
> DataTypes API but having issues.
>
> Also, one more observation is if we use legacy types in TableSource
> creation, application not working using Blink Planner. We are getting
> the same error physical type not matching.
>
> Looking forward to the 1.11 changes.
>
>
> On Fri, Jun 5, 2020 at 3:34 AM Dawid Wysakowicz
> <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote:
>
>     Hi Ramana,
>
>     What connector do you use or how do you instantiate the TableSource?
>     Also which catalog do you use and how do you register your table
>     in that
>     catalog?
>
>     The problem is that conversion from TypeInformation to DataType
>     produces
>     legacy types (because they cannot be mapped exactyl 1-1 to the new
>     types).
>
>     If you can change the code of the TableSource you can return in the
>     TableSource#getProducedType the tableSchema.toRowDataType, where the
>     tableSchema is the schema coming from catalog. Or you can make
>     sure that
>     the catalog table produces the legacy type:
>
>     TableSchema.field("field", Types.OBJECT_ARRAY(Types.STRING));
>
>     In 1.11 we will introduce new sources and formats already working
>     entirely with the new type system
>     (AvroRowDataDeserializationSchema and
>     KafkaDynamicTable).
>
>     Hope this helps a bit.
>
>     Best,
>
>     Dawid
>
>     On 04/06/2020 13:43, Ramana Uppala wrote:
>     > Hi,
>     > Avro schema contains Array<String> type and we created
>     TableSchema out of the AvroSchema and created a table in catalog.
>     In the catalog, this specific filed type shown as
>     ARRAY<VARCHAR(2147483647)>. We are using
>     AvroRowDeserializationSchema with the connector and returnType of
>     TableSource showing Array<String> mapped to LEGACY('ARRAY',
>     'ANY<[Ljava.lang.String;, by AvroSchemaConverter
>     >
>     > when we are running the application, planner validating physical
>     types and logical types and we are getting below error.
>     >
>     > of table field 'XYZ' does not match with the physical type ROW<
>     >
>     > Any suggestions on how to resolve this ? is this a bug ?
>
> ------------------------------------------------------------------------
>
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The
> information transmitted herewith is intended only for use by the
> individual or entity to which it is addressed. If the reader of this
> message is not the intended recipient, you are hereby notified that
> any review, retransmission, dissemination, distribution, copying or
> other use of, or taking of any action in reliance upon this
> information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the
> material from your computer.
>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to