Hi, Resolved the issue by using a Custom Partitioner and setting RequestTimeout properties.
kinesisProducer.setCustomPartitioner(new SerializableCustomPartitioner()); private static final class SerializableCustomPartitioner extends KinesisPartitioner<Map<String,Object>> { private static final long serialVersionUID = -5196071893997035695L; @Override public String getPartitionId(Map<String,Object> map) { StringBuilder stringBuilder = new StringBuilder(); UUID uuid = UUID.randomUUID(); stringBuilder.append(uuid); return stringBuilder.toString(); } } On Thu, Jun 4, 2020 at 6:43 PM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Hi, > Looks like I am sending a Map<String,Object> to Kinesis and it is being > sent to 1 partition only. *How can I make this distribute across multiple > partitions/shards on the Kinesis Data stream with this Map<String, Object>* > data ? > > *Sending to Kinesis*: > DataStream<Map<String, Object>> influxToMapKinesisStream = > enrichedMGStream.map(influxDBPoint -> { > return new > MonitoringGroupingToInfluxDBPoint(agg, > groupBySetArr).fromInfluxDBPoint(influxDBPoint); > }).returns(new TypeHint<Map<String, Object>>() > { > }).setParallelism(dfltParallelism); > > FlinkKinesisProducer<Map<String, Object>> > kinesisProducer = getMonitoringFlinkKinesisProducer(kinesisTopicWrite, > region, local, localKinesis); > > influxToMapKinesisStream.addSink(kinesisProducer).setParallelism(dfltParallelism); > > *Map<String, Object> used to send to Kinesis:* > > Map<String, Object> mapObj = new HashMap<>(); > mapObj.put(Utils.EVENT_TIMESTAMP, influxDBPoint.getTimestamp()); > mapObj.put(Utils.MEASUREMENT, influxDBPoint.getMeasurement()); > mapObj.put(Utils.TAGS, influxDBPoint.getTags()); > mapObj.put(Utils.FIELDS, influxDBPoint.getFields()); > > TIA, > > On Thu, Jun 4, 2020 at 5:35 PM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> Hi, >> My FlinkKinesisProducer sends data to only 1 shard. Is it because I don't >> have "AggregationEnabled" set to false ? >> >> flink_connector_kinesis_2.11 : flink version 1.9.1 >> >> //Setup Kinesis Producer >> Properties kinesisProducerConfig = new Properties(); >> kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, >> region); >> >> kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, >> "AUTO"); >> //kinesisProducerConfig.setProperty("AggregationEnabled", >> "false"); >> >> FlinkKinesisProducer<Map<String, Object>> kinesisProducer = new >> FlinkKinesisProducer<>( >> new MonitoringMapKinesisSchema(localKinesis), >> kinesisProducerConfig); >> >> //TODO: kinesisProducer.setFailOnError(true); >> kinesisProducer.setDefaultStream(kinesisTopicWrite); >> kinesisProducer.setDefaultPartition("0");//TODO: why from start ? >> return kinesisProducer; >> >> TIA, >> >