Hi Vishal,

> We have a use case where multiple topics are streamed to hdfs and we
would want to created buckets based on ingestion time
If I understand correctly, you want to create buckets based on event time.
Maybe you can use window[1]. For example, a tumbling window of 5 minutes
groups rows in 5 minutes intervals. And you can get window start
time(TUMBLE_START(time_attr, interval)) and end time(TUMBLE_END(time_attr,
interval)) when output data.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#group-windows

On Wed, Aug 1, 2018 at 8:21 PM, Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Any feedbaxk?
>
> On Tue, Jul 31, 2018, 10:20 AM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> In fact it may be available else where too ( for example ProcessFunction
>> etc ) but do we have no need to create one, it is just a data relay ( kafka
>> to hdfs ) and any intermediate processing should be avoided if possible
>> IMHO.
>>
>> On Tue, Jul 31, 2018 at 9:10 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> We have a use case where multiple topics are streamed to hdfsand we
>>> would want to created buckets based on ingestion time ( the time the event
>>> were pushed to kafka ). Our producers to kafka will set that the event time
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.5/dev/connectors/kafka.html#using-kafka-
>>> timestamps-and-flink-event-time-in-kafka-010
>>>
>>> suggests that the the "previousElementTimeStamp" will provide that
>>> timestamp provided "EventTime" characteristic is set. It also provides for
>>> the element. In out case the element will expose setIngestionTIme(long
>>> time) method. Is the element in this method
>>>
>>> public long extractTimestamp(Long element, long previousElementTimestamp)
>>>
>>>  passed by reference and can it be safely ( loss lessly ) mutated for
>>> downstream operators ?
>>>
>>>
>>> That said there is another place where that record time stamp is
>>> available.
>>>
>>> https://github.com/apache/flink/blob/master/flink-
>>> connectors/flink-connector-kafka-0.9/src/main/java/org/
>>> apache/flink/streaming/connectors/kafka/internal/
>>> Kafka09Fetcher.java#L141
>>>
>>> Is it possible to change the signature of the
>>>
>>> https://github.com/apache/flink/blob/master/flink-
>>> connectors/flink-connector-kafka-base/src/main/java/org/
>>> apache/flink/streaming/util/serialization/KeyedDeserializationSchema.
>>> java#L46
>>>
>>> to add record timestamp as the last argument ?
>>>
>>> Regards,
>>>
>>> Vishal
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>

Reply via email to