This too, should be possible. Flink uses `StreamingFileSink` to transfer data to S3 [1 <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#streaming-file-sink>]. You can pass it your custom bucket assigner [2 <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#bucket-assignment> ]:
public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) { which, similarly to the `KeyedSerializationSchema`, returns a destination for each input element: BucketID getBucketId(IN element, BucketAssigner.Context context); [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#streaming-file-sink [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#bucket-assignment -- Alexander Fedulov | Solutions Architect +49 1514 6265796 <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng On Thu, Apr 30, 2020 at 5:21 AM dhurandar S <dhurandarg...@gmail.com> wrote: > Thank you Alexander for the response. This is very helpful. > Can i apply the same pattern to S3 as well , as in read from Kafka or > Kinesis and write multiple files in S3 or multiple topics in Kinesis ? > > regards, > Rahul > > On Wed, Apr 29, 2020 at 2:32 PM Alexander Fedulov <alexan...@ververica.com> > wrote: > >> Hi Dhurandar, >> >> it is not supported out of the box, however, I think it is possible by >> doing the following: >> 1) Create a wrapper type, containing the original message and a topic >> destination where it is supposed to be sent. You can enrich the messages >> with it in accordance to the configuration you've mentioned. >> 2) Extend `KeyedSerializationSchema` and make its `getTargetTopic` return >> the desired topic >> 3) Initialize `FlinkKafkaProducer011` with this custom >> `KeyedSerializationSchema` >> Please mind that `KeyedSerializationSchema` and is marked as deprecated >> and is supposed to be substituted by the new `KafkaSerializationSchema`, >> which would require a slight modification, but, from what I can tell, it >> will still be possible to achieve such dynamic events dispatching. >> >> Best regards, >> Alexander Fedulov >> >>>