[ 
https://issues.apache.org/jira/browse/FLINK-15220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paul Lin updated FLINK-15220:
-----------------------------
    Description: 
KafkaTableSource supports all startup modes in DataStream API except 
`startFromTimestamp`, but `startFromTimestamp` is a common and valid use case 
in Table/SQL API as well.

 

The proposed changes are as follow:
h3. Table Descriptor

A new method should be added to Kafka table descriptor:

```

new Kafka().startFromTimestamp(long millisFromEpoch)

```

And the parameter would be milliseconds from epoch to stay aligned with 
FlinkKafkaConsumerBase#setStartFromTimestamp(long startupOffsetsTimestamp).

Since Kafka 0.8/0.9 that doesn’t support timestamp would likely be deprecated, 
we can assume users are using Kafka that supports timestamp by default, and 
throws exceptions if users try to use timestamp startup mode with deprecated 
Kafka versions during the property validation phase. 
h3. YAML & DDL

YAML and DDL use string-based properties to describe tables, and the proposed 
keys are as follow:

```

'connector.startup-mode' = 'timestamp',

'connector.startup-timestamp-millis' = '1576145410000',

'connector.startup-timestamp' = '2019-12-12 10:11:23.123'

```

The timestamp would need to be in form of milliseconds from epoch or 
"yyyy-MM-dd HH:mm:ss[.SSS]". If both are provided, an validation exception 
would be thrown.

  was:
KafkaTableSource supports all startup modes in DataStream API except 
`startFromTimestamp`, but `startFromTimestamp` is a common and valid use case 
in Table/SQL API as well.

 

The proposed changes are as follow:
h3. Table Descriptor

A new method should be added to Kafka table descriptor:

```

new Kafka().startFromTimestamp(long millisFromEpoch)

```

And the parameter would be milliseconds from epoch to stay aligned with 
FlinkKafkaConsumerBase#setStartFromTimestamp(long startupOffsetsTimestamp).

Since Kafka 0.8/0.9 that doesn’t support timestamp would likely be deprecated, 
we can assume users are using Kafka that supports timestamp by default, and 
throws exceptions if users try to use timestamp startup mode with deprecated 
Kafka versions during the property validation phase. 
h3. YAML & DDL

YAML and DDL use string-based properties to describe tables, and the proposed 
keys are as follow:

```

'connector.startup-mode' = 'timestamp',

'connector.startup-timestamp-millis' = '1576145410000',

'connector.startup-timestamp' = '2019-12-12 10:11:23.123'

```

The timestamp would need to be in form of milliseconds from epoch or 
"yyyy-MM-dd HH:mm:ss[.SSS]". If both are provided, milliseconds from epoch 
would be used.


> Add startFromTimestamp in KafkaTableSource
> ------------------------------------------
>
>                 Key: FLINK-15220
>                 URL: https://issues.apache.org/jira/browse/FLINK-15220
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.0
>            Reporter: Paul Lin
>            Assignee: Paul Lin
>            Priority: Major
>
> KafkaTableSource supports all startup modes in DataStream API except 
> `startFromTimestamp`, but `startFromTimestamp` is a common and valid use case 
> in Table/SQL API as well.
>  
> The proposed changes are as follow:
> h3. Table Descriptor
> A new method should be added to Kafka table descriptor:
> ```
> new Kafka().startFromTimestamp(long millisFromEpoch)
> ```
> And the parameter would be milliseconds from epoch to stay aligned with 
> FlinkKafkaConsumerBase#setStartFromTimestamp(long startupOffsetsTimestamp).
> Since Kafka 0.8/0.9 that doesn’t support timestamp would likely be 
> deprecated, we can assume users are using Kafka that supports timestamp by 
> default, and throws exceptions if users try to use timestamp startup mode 
> with deprecated Kafka versions during the property validation phase. 
> h3. YAML & DDL
> YAML and DDL use string-based properties to describe tables, and the proposed 
> keys are as follow:
> ```
> 'connector.startup-mode' = 'timestamp',
> 'connector.startup-timestamp-millis' = '1576145410000',
> 'connector.startup-timestamp' = '2019-12-12 10:11:23.123'
> ```
> The timestamp would need to be in form of milliseconds from epoch or 
> "yyyy-MM-dd HH:mm:ss[.SSS]". If both are provided, an validation exception 
> would be thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to