Hi,
Looks like I am sending a Map<String,Object> to Kinesis and it is being
sent to 1 partition only. *How can I make this distribute across multiple
partitions/shards on the Kinesis Data stream with this Map<String, Object>*
data ?

*Sending to Kinesis*:
DataStream<Map<String, Object>> influxToMapKinesisStream =
enrichedMGStream.map(influxDBPoint -> {
                                return new
MonitoringGroupingToInfluxDBPoint(agg,
groupBySetArr).fromInfluxDBPoint(influxDBPoint);
                            }).returns(new TypeHint<Map<String, Object>>() {
                            }).setParallelism(dfltParallelism);

                            FlinkKinesisProducer<Map<String, Object>>
kinesisProducer = getMonitoringFlinkKinesisProducer(kinesisTopicWrite,
region, local, localKinesis);

influxToMapKinesisStream.addSink(kinesisProducer).setParallelism(dfltParallelism);

*Map<String, Object> used to send to Kinesis:*

Map<String, Object> mapObj = new HashMap<>();
mapObj.put(Utils.EVENT_TIMESTAMP, influxDBPoint.getTimestamp());
mapObj.put(Utils.MEASUREMENT, influxDBPoint.getMeasurement());
mapObj.put(Utils.TAGS, influxDBPoint.getTags());
mapObj.put(Utils.FIELDS, influxDBPoint.getFields());

TIA,

On Thu, Jun 4, 2020 at 5:35 PM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Hi,
> My FlinkKinesisProducer sends data to only 1 shard. Is it because I don't
> have "AggregationEnabled" set to false ?
>
> flink_connector_kinesis_2.11 : flink version 1.9.1
>
> //Setup Kinesis Producer
>         Properties kinesisProducerConfig = new Properties();
>         kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION,
> region);
>
> kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
> "AUTO");
>         //kinesisProducerConfig.setProperty("AggregationEnabled", "false");
>
>         FlinkKinesisProducer<Map<String, Object>> kinesisProducer = new
> FlinkKinesisProducer<>(
>                 new MonitoringMapKinesisSchema(localKinesis),
> kinesisProducerConfig);
>
>         //TODO: kinesisProducer.setFailOnError(true);
>         kinesisProducer.setDefaultStream(kinesisTopicWrite);
>         kinesisProducer.setDefaultPartition("0");//TODO: why from start ?
>         return kinesisProducer;
>
> TIA,
>

Reply via email to