Thanks Leonard, we are working towards 1.12 upgrade and should be able
to try upsert-kafka after that.

> Your first workaround should have been worked, but looks like an exception 
> was thrown in Type conversion phase, could you share you table schema and 
> query that can reproduce the issue.

I was able to get past this but ran into another issue which is
detailed further down.  My table schema is:
Table "T0"
- column "C0" (int64) [rowtime attribute]
- column "C1" (string)

Query:
select INTERVAL '1' HOUR as E0, * from T0

In the original code that I posted, the failure happens at:
```
var rowType = table.getSchema().toRowType();
```

I got past this by bridging duration types to long/int before
converting to TypeInformation using
`TypeConversions.fromDataTypeToLegacyInfo`:

```
      var dataType = schema.getFieldDataTypes()[i];
      var typeRoot = dataType.getLogicalType().getTypeRoot();
      if (typeRoot.equals(LogicalTypeRoot.INTERVAL_DAY_TIME)) {
        dataType = dataType.bridgedTo(Long.class);
      }
      if (typeRoot.equals(LogicalTypeRoot.INTERVAL_YEAR_MONTH)) {
        dataType = dataType.bridgedTo(Integer.class);
      }
```

After getting past this, the next problem I've now run into is as
follows.  Like I noted above, I'm converting from Table API to
DataStream API.  We are now seeing the following error at runtime:

```
java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to
class java.lang.Long (java.sql.Timestamp is in module java.sql of
loader 'platform'; java.lang.Long is in module java.base of loader
'bootstrap')
```

This error comes from the rowtime attribute column which our custom
table source generates as `java.sql.Timestamp` objects.  That works
out OK when we were applying entirely at table API level.

Our guess is that after conversion to DataStream API, the rowtime
attribute column uses `TimeIndicatorTypeInfo` which is serialized as
long.  Hence the error converting from `Timestamp` to `Long`.

In our case, we would like to continue using the unmodified table
source (generating Timestamp type for the rowtime attribute column),
hence this approach also seems to have hit a dead end.  We are now
planning to try out the upsert-kafka sink following 1.12 upgrade.

Thanks,
Abhishek

On Fri, Jan 15, 2021 at 3:50 AM Leonard Xu <xbjt...@gmail.com> wrote:
>
> Hi, Rai
>
> What are my options to still write to Kafka?  I don't mind introducing
> another boolean/etc field in the Kafka output records containing the
> row kind or similar info.
>
>
> The recommended way is use `upset-kafka`[1] connector which you can write 
> insert/update/retract message to a
> compacted kafka topic and read insert/update/retract messages from this topic 
> as well. It’s a new feature since 1.12,
> there’s no options to control write boolean/etc fields before 1.12 version, 
> because the boolean flag(rowkind) is not exposed to users.
>
>
> The first workaround that I tried is to convert the table to
> ```
> TableException: Unsupported conversion from data type 'INTERVAL
> SECOND(3) NOT NULL' (conversion class: java.time.Duration) to type
> information. Only data types that originated from type information
> fully support a reverse conversion.
> ```
>
>
> Your first workaround should have been worked, but looks like an exception 
> was thrown in Type conversion phase, could you share you table schema and 
> query that can reproduce the issue.
>
> Best,
> Leonard
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/upsert-kafka.html

Reply via email to