Hi Vijay,

IMO, the semantics of the source is not changeless. It can contain
integrate with third-party systems and consume events. However, it can also
contain more business logic about your data pre-process after consuming
events.

Maybe it needs some customization. WDYT?

Best,
Vino

Vijay Balakrishnan <bvija...@gmail.com> 于2019年11月26日周二 上午6:45写道:

> Hi,
> Need to pre-process data(transform incoming data to a different format)
> before it hits the Source I have defined. How can I do that ?
>
> I tried to use a .map on the DataStream but that is too late as the data
> has already hit the Source I defined.
> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer =
> getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
> region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
> socketTimeout);
> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
> env.addSource(kinesisConsumer);
>
> DataStream<Map<String, Object>> kinesisStream1 = kinesisStream.map(new
> TransformFunction(...));//too late here
>
> TIA,
>

Reply via email to