I had a similar need recently and ended up using
KafkaDeserializationSchemaWrapper to wrap a given
DeserializationSchema. The resulting
KafkaDeserializationSchema[Wrapper] can be passed directly to the
`FlinkKafkaConsumer` constructor.
```
class BoundingDeserializationSchema
extends KafkaDese
Thanks Yun
On Thu, Jan 14, 2021 at 1:58 PM Yun Gao wrote:
> Hi Sagar,
>
> I rechecked and found that the new kafka source is not formally publish
> yet, and a stable method I think may be try adding the FlinkKafkaConsumer
> as a BOUNDED source first. Sorry for the inconvient.
>
> Best,
> Yu
Hi Sagar,
I rechecked and found that the new kafka source is not formally publish yet,
and a stable method I think may be try adding the FlinkKafkaConsumer as a
BOUNDED source first. Sorry for the inconvient.
Best,
Yun
--
Send
Hi Sagar,
I think the problem is that the legacy source implemented by extending
SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource().
Although there is hacky way to add the legacy sources as BOUNDED source [1], I
think you may first have a try of new version of