Any feedbaxk?

On Tue, Jul 31, 2018, 10:20 AM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> In fact it may be available else where too ( for example ProcessFunction
> etc ) but do we have no need to create one, it is just a data relay ( kafka
> to hdfs ) and any intermediate processing should be avoided if possible
> IMHO.
>
> On Tue, Jul 31, 2018 at 9:10 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> We have a use case where multiple topics are streamed to hdfsand we would
>> want to created buckets based on ingestion time ( the time the event were
>> pushed to kafka ). Our producers to kafka will set that the event time
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>>
>> suggests that the the "previousElementTimeStamp" will provide that
>> timestamp provided "EventTime" characteristic is set. It also provides for
>> the element. In out case the element will expose setIngestionTIme(long
>> time) method. Is the element in this method
>>
>> public long extractTimestamp(Long element, long previousElementTimestamp)
>>
>>  passed by reference and can it be safely ( loss lessly ) mutated for
>> downstream operators ?
>>
>>
>> That said there is another place where that record time stamp is
>> available.
>>
>>
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141
>>
>> Is it possible to change the signature of the
>>
>>
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java#L46
>>
>> to add record timestamp as the last argument ?
>>
>> Regards,
>>
>> Vishal
>>
>>
>>
>>
>>
>>
>>
>

Reply via email to