Hi Vijay, IMO, the semantics of the source is not changeless. It can contain integrate with third-party systems and consume events. However, it can also contain more business logic about your data pre-process after consuming events.
Maybe it needs some customization. WDYT? Best, Vino Vijay Balakrishnan <bvija...@gmail.com> 于2019年11月26日周二 上午6:45写道: > Hi, > Need to pre-process data(transform incoming data to a different format) > before it hits the Source I have defined. How can I do that ? > > I tried to use a .map on the DataStream but that is too late as the data > has already hit the Source I defined. > FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = > getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead, > region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections, > socketTimeout); > DataStreamSource<Map<String, Object>> monitoringDataStreamSource = > env.addSource(kinesisConsumer); > > DataStream<Map<String, Object>> kinesisStream1 = kinesisStream.map(new > TransformFunction(...));//too late here > > TIA, >