Thanks a lot! Awesome that 1.6 will have the ts of the element....

On Tue, Aug 7, 2018, 4:19 AM Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi Vishal,
>
> to answer the original question: it should not assumed that mutations of
> the element will be reflected downstream. For your situation this means
> that you have to use a ProcessingFunction to put the timestamp of a record
> into the record itself.
>
> Also, Flink 1.6 will come with the next version of the BucketingSink
> called StreamingFileSink, where the Bucketer interface was updated to allow
> access to the element timestamp. The new interface is now called
> BucketAssigner.
>
> Best,
> Aljoscha
>
> On 1. Aug 2018, at 16:36, Hequn Cheng <chenghe...@gmail.com> wrote:
>
> Hi Vishal,
>
> > We have a use case where multiple topics are streamed to hdfs and we
> would want to created buckets based on ingestion time
> If I understand correctly, you want to create buckets based on event time.
> Maybe you can use window[1]. For example, a tumbling window of 5 minutes
> groups rows in 5 minutes intervals. And you can get window start
> time(TUMBLE_START(time_attr, interval)) and end time(TUMBLE_END(time_attr,
> interval)) when output data.
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#group-windows
>
> On Wed, Aug 1, 2018 at 8:21 PM, Vishal Santoshi <vishal.santo...@gmail.com
> > wrote:
>
>> Any feedbaxk?
>>
>> On Tue, Jul 31, 2018, 10:20 AM Vishal Santoshi <vishal.santo...@gmail.com>
>> wrote:
>>
>>> In fact it may be available else where too ( for example ProcessFunction
>>> etc ) but do we have no need to create one, it is just a data relay ( kafka
>>> to hdfs ) and any intermediate processing should be avoided if possible
>>> IMHO.
>>>
>>> On Tue, Jul 31, 2018 at 9:10 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> We have a use case where multiple topics are streamed to hdfsand we
>>>> would want to created buckets based on ingestion time ( the time the event
>>>> were pushed to kafka ). Our producers to kafka will set that the event time
>>>>
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>>>>
>>>> suggests that the the "previousElementTimeStamp" will provide that
>>>> timestamp provided "EventTime" characteristic is set. It also provides for
>>>> the element. In out case the element will expose setIngestionTIme(long
>>>> time) method. Is the element in this method
>>>>
>>>> public long extractTimestamp(Long element, long previousElementTimestamp)
>>>>
>>>>  passed by reference and can it be safely ( loss lessly ) mutated for
>>>> downstream operators ?
>>>>
>>>>
>>>> That said there is another place where that record time stamp is
>>>> available.
>>>>
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141
>>>>
>>>> Is it possible to change the signature of the
>>>>
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java#L46
>>>>
>>>> to add record timestamp as the last argument ?
>>>>
>>>> Regards,
>>>>
>>>> Vishal
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>
>

Reply via email to