Thanks Leonard, we are working towards 1.12 upgrade and should be able to try upsert-kafka after that.
> Your first workaround should have been worked, but looks like an exception > was thrown in Type conversion phase, could you share you table schema and > query that can reproduce the issue. I was able to get past this but ran into another issue which is detailed further down. My table schema is: Table "T0" - column "C0" (int64) [rowtime attribute] - column "C1" (string) Query: select INTERVAL '1' HOUR as E0, * from T0 In the original code that I posted, the failure happens at: ``` var rowType = table.getSchema().toRowType(); ``` I got past this by bridging duration types to long/int before converting to TypeInformation using `TypeConversions.fromDataTypeToLegacyInfo`: ``` var dataType = schema.getFieldDataTypes()[i]; var typeRoot = dataType.getLogicalType().getTypeRoot(); if (typeRoot.equals(LogicalTypeRoot.INTERVAL_DAY_TIME)) { dataType = dataType.bridgedTo(Long.class); } if (typeRoot.equals(LogicalTypeRoot.INTERVAL_YEAR_MONTH)) { dataType = dataType.bridgedTo(Integer.class); } ``` After getting past this, the next problem I've now run into is as follows. Like I noted above, I'm converting from Table API to DataStream API. We are now seeing the following error at runtime: ``` java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to class java.lang.Long (java.sql.Timestamp is in module java.sql of loader 'platform'; java.lang.Long is in module java.base of loader 'bootstrap') ``` This error comes from the rowtime attribute column which our custom table source generates as `java.sql.Timestamp` objects. That works out OK when we were applying entirely at table API level. Our guess is that after conversion to DataStream API, the rowtime attribute column uses `TimeIndicatorTypeInfo` which is serialized as long. Hence the error converting from `Timestamp` to `Long`. In our case, we would like to continue using the unmodified table source (generating Timestamp type for the rowtime attribute column), hence this approach also seems to have hit a dead end. We are now planning to try out the upsert-kafka sink following 1.12 upgrade. Thanks, Abhishek On Fri, Jan 15, 2021 at 3:50 AM Leonard Xu <xbjt...@gmail.com> wrote: > > Hi, Rai > > What are my options to still write to Kafka? I don't mind introducing > another boolean/etc field in the Kafka output records containing the > row kind or similar info. > > > The recommended way is use `upset-kafka`[1] connector which you can write > insert/update/retract message to a > compacted kafka topic and read insert/update/retract messages from this topic > as well. It’s a new feature since 1.12, > there’s no options to control write boolean/etc fields before 1.12 version, > because the boolean flag(rowkind) is not exposed to users. > > > The first workaround that I tried is to convert the table to > ``` > TableException: Unsupported conversion from data type 'INTERVAL > SECOND(3) NOT NULL' (conversion class: java.time.Duration) to type > information. Only data types that originated from type information > fully support a reverse conversion. > ``` > > > Your first workaround should have been worked, but looks like an exception > was thrown in Type conversion phase, could you share you table schema and > query that can reproduce the issue. > > Best, > Leonard > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/upsert-kafka.html