[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585692#comment-16585692
 ] 

ASF GitHub Bot commented on FLINK-8500:
---------------------------------------

FredTing commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r211197298
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##########
 @@ -45,6 +45,22 @@
         */
        T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+       /**
+        * Deserializes the byte message.
+        *
+        * @param messageKey the key as a byte array (null if no key has been 
set).
+        * @param message The message, as a byte array (null if the message was 
empty or deleted).
+        * @param partition The partition the message has originated from.
+        * @param offset the offset of the message in the original source (for 
example the Kafka offset).
+        * @param timestamp the timestamp of the consumer record
+        * @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+        *
+        * @return The deserialized message as an object (null if the message 
cannot be deserialized).
+        */
+       default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   @tzulitai Yes, this would also be an issue with our long-term solution, and 
we should look at that carefully. For instance, when messages are encrypted on 
Kafka we need them to be decrypted. How stuff is encrypted is determined by the 
producer (out of our control), but we should be able to decrypt these message. 
When for encryption some kafka-meta information is used we need this during 
decryption too.
   
   > I'm not sure how this would work. Could you elaborate a bit more on this?
   
   When a custom implementation of the `KeyedDeserializationSchema` is created, 
then at least one of the `deserialize` methods must be implement (why else 
create a custom implementation?). When both `deserialize` methods have a 
default implementation the compiler isn't complaining anymore if you forget to 
implement it. When it is an empty method, you won't get complains during 
runtime either, but you won't receive any record too. By throwing an exception 
with a clear message that you forgot to override either one of the 
`deserialize` methods, you get a error message during runtime.
   When either one of the `deserialize` method is overridden the default with 
the exception isn't called any more. 
   Since it's now an abstract method, al current implementations already 
override this `deserialize` method

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-8500
>                 URL: https://issues.apache.org/jira/browse/FLINK-8500
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: yanxiaobin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>         Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to