Thank you, Alexander, This is really helpful.

Can the input be Flink SQL? Idea is to provide the capability to take SQL
as the input and create new streams on-demand for the given SQL.

So users of the system provide "SQL" in the configuration files and
henceforth they can start listening to a topic or start reading data from
Can we load Flink SQL at runtime ??


On Thu, Apr 30, 2020 at 2:02 AM Alexander Fedulov <>

> This too, should be possible.
> Flink uses `StreamingFileSink` to transfer data to S3 [1
> <>].
> You can pass it your custom bucket assigner [2
> <>
> ]:
> public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
> which, similarly to the `KeyedSerializationSchema`, returns a destination
> for each input element:
> BucketID getBucketId(IN element, BucketAssigner.Context context);
> [1]
> [2]
> --
> Alexander Fedulov | Solutions Architect
> +49 1514 6265796
> <>
> Follow us @VervericaData
> --
> Join Flink Forward <> - The Apache Flink
> Conference
> Stream Processing | Event Driven | Real Time
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
> On Thu, Apr 30, 2020 at 5:21 AM dhurandar S <>
> wrote:
>> Thank you Alexander for the response. This is very helpful.
>> Can i apply the same pattern to S3 as well , as in read from Kafka or
>> Kinesis and write multiple files in S3 or multiple topics in Kinesis ?
>> regards,
>> Rahul
>> On Wed, Apr 29, 2020 at 2:32 PM Alexander Fedulov <
>>> wrote:
>>> Hi Dhurandar,
>>> it is not supported out of the box, however, I think it is possible by
>>> doing the following:
>>> 1) Create a wrapper type, containing the original message and a topic
>>> destination where it is supposed to be sent. You can enrich the messages
>>> with it in accordance to the configuration you've mentioned.
>>> 2) Extend `KeyedSerializationSchema` and make its `getTargetTopic`
>>> return the desired topic
>>> 3) Initialize `FlinkKafkaProducer011` with this custom
>>> `KeyedSerializationSchema`
>>> Please mind that `KeyedSerializationSchema` and is marked as deprecated
>>> and is supposed to be substituted by the new `KafkaSerializationSchema`,
>>> which would require a slight modification, but, from what I can tell, it
>>> will still be possible to achieve such dynamic events dispatching.
>>> Best regards,
>>> Alexander Fedulov

Thank you and regards,

Reply via email to