This too, should be possible.
Flink uses `StreamingFileSink` to transfer data to S3 [1
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#streaming-file-sink>].
You can pass it your custom bucket assigner [2
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#bucket-assignment>
]:

public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {

which, similarly to the `KeyedSerializationSchema`, returns a destination
for each input element:

BucketID getBucketId(IN element, BucketAssigner.Context context);

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#streaming-file-sink
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#bucket-assignment

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng



On Thu, Apr 30, 2020 at 5:21 AM dhurandar S <dhurandarg...@gmail.com> wrote:

> Thank you Alexander for the response. This is very helpful.
> Can i apply the same pattern to S3 as well , as in read from Kafka or
> Kinesis and write multiple files in S3 or multiple topics in Kinesis ?
>
> regards,
> Rahul
>
> On Wed, Apr 29, 2020 at 2:32 PM Alexander Fedulov <alexan...@ververica.com>
> wrote:
>
>> Hi Dhurandar,
>>
>> it is not supported out of the box, however, I think it is possible by
>> doing the following:
>> 1) Create a wrapper type, containing the original message and a topic
>> destination where it is supposed to be sent. You can enrich the messages
>> with it in accordance to the configuration you've mentioned.
>> 2) Extend `KeyedSerializationSchema` and make its `getTargetTopic` return
>> the desired topic
>> 3) Initialize `FlinkKafkaProducer011` with this custom
>> `KeyedSerializationSchema`
>> Please mind that `KeyedSerializationSchema` and is marked as deprecated
>> and is supposed to be substituted by the new `KafkaSerializationSchema`,
>> which would require a slight modification, but, from what I can tell, it
>> will still be possible to achieve such dynamic events dispatching.
>>
>> Best regards,
>> Alexander Fedulov
>>
>>>

Reply via email to