Hi Vishal, > We have a use case where multiple topics are streamed to hdfs and we would want to created buckets based on ingestion time If I understand correctly, you want to create buckets based on event time. Maybe you can use window[1]. For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. And you can get window start time(TUMBLE_START(time_attr, interval)) and end time(TUMBLE_END(time_attr, interval)) when output data.
Best, Hequn [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#group-windows On Wed, Aug 1, 2018 at 8:21 PM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > Any feedbaxk? > > On Tue, Jul 31, 2018, 10:20 AM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > >> In fact it may be available else where too ( for example ProcessFunction >> etc ) but do we have no need to create one, it is just a data relay ( kafka >> to hdfs ) and any intermediate processing should be avoided if possible >> IMHO. >> >> On Tue, Jul 31, 2018 at 9:10 AM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> We have a use case where multiple topics are streamed to hdfsand we >>> would want to created buckets based on ingestion time ( the time the event >>> were pushed to kafka ). Our producers to kafka will set that the event time >>> >>> https://ci.apache.org/projects/flink/flink-docs- >>> release-1.5/dev/connectors/kafka.html#using-kafka- >>> timestamps-and-flink-event-time-in-kafka-010 >>> >>> suggests that the the "previousElementTimeStamp" will provide that >>> timestamp provided "EventTime" characteristic is set. It also provides for >>> the element. In out case the element will expose setIngestionTIme(long >>> time) method. Is the element in this method >>> >>> public long extractTimestamp(Long element, long previousElementTimestamp) >>> >>> passed by reference and can it be safely ( loss lessly ) mutated for >>> downstream operators ? >>> >>> >>> That said there is another place where that record time stamp is >>> available. >>> >>> https://github.com/apache/flink/blob/master/flink- >>> connectors/flink-connector-kafka-0.9/src/main/java/org/ >>> apache/flink/streaming/connectors/kafka/internal/ >>> Kafka09Fetcher.java#L141 >>> >>> Is it possible to change the signature of the >>> >>> https://github.com/apache/flink/blob/master/flink- >>> connectors/flink-connector-kafka-base/src/main/java/org/ >>> apache/flink/streaming/util/serialization/KeyedDeserializationSchema. >>> java#L46 >>> >>> to add record timestamp as the last argument ? >>> >>> Regards, >>> >>> Vishal >>> >>> >>> >>> >>> >>> >>> >>