Hi,
Resolved the issue by using a Custom Partitioner and setting RequestTimeout
properties.

kinesisProducer.setCustomPartitioner(new SerializableCustomPartitioner());

private static final class SerializableCustomPartitioner extends
KinesisPartitioner<Map<String,Object>> {

    private static final long serialVersionUID = -5196071893997035695L;

    @Override
    public String getPartitionId(Map<String,Object> map) {
        StringBuilder stringBuilder = new StringBuilder();
        UUID uuid = UUID.randomUUID();
        stringBuilder.append(uuid);
        return stringBuilder.toString();
    }
}


On Thu, Jun 4, 2020 at 6:43 PM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Hi,
> Looks like I am sending a Map<String,Object> to Kinesis and it is being
> sent to 1 partition only. *How can I make this distribute across multiple
> partitions/shards on the Kinesis Data stream with this Map<String, Object>*
> data ?
>
> *Sending to Kinesis*:
> DataStream<Map<String, Object>> influxToMapKinesisStream =
> enrichedMGStream.map(influxDBPoint -> {
>                                 return new
> MonitoringGroupingToInfluxDBPoint(agg,
> groupBySetArr).fromInfluxDBPoint(influxDBPoint);
>                             }).returns(new TypeHint<Map<String, Object>>()
> {
>                             }).setParallelism(dfltParallelism);
>
>                             FlinkKinesisProducer<Map<String, Object>>
> kinesisProducer = getMonitoringFlinkKinesisProducer(kinesisTopicWrite,
> region, local, localKinesis);
>
> influxToMapKinesisStream.addSink(kinesisProducer).setParallelism(dfltParallelism);
>
> *Map<String, Object> used to send to Kinesis:*
>
> Map<String, Object> mapObj = new HashMap<>();
> mapObj.put(Utils.EVENT_TIMESTAMP, influxDBPoint.getTimestamp());
> mapObj.put(Utils.MEASUREMENT, influxDBPoint.getMeasurement());
> mapObj.put(Utils.TAGS, influxDBPoint.getTags());
> mapObj.put(Utils.FIELDS, influxDBPoint.getFields());
>
> TIA,
>
> On Thu, Jun 4, 2020 at 5:35 PM Vijay Balakrishnan <bvija...@gmail.com>
> wrote:
>
>> Hi,
>> My FlinkKinesisProducer sends data to only 1 shard. Is it because I don't
>> have "AggregationEnabled" set to false ?
>>
>> flink_connector_kinesis_2.11 : flink version 1.9.1
>>
>> //Setup Kinesis Producer
>>         Properties kinesisProducerConfig = new Properties();
>>         kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION,
>> region);
>>
>> kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
>> "AUTO");
>>         //kinesisProducerConfig.setProperty("AggregationEnabled",
>> "false");
>>
>>         FlinkKinesisProducer<Map<String, Object>> kinesisProducer = new
>> FlinkKinesisProducer<>(
>>                 new MonitoringMapKinesisSchema(localKinesis),
>> kinesisProducerConfig);
>>
>>         //TODO: kinesisProducer.setFailOnError(true);
>>         kinesisProducer.setDefaultStream(kinesisTopicWrite);
>>         kinesisProducer.setDefaultPartition("0");//TODO: why from start ?
>>         return kinesisProducer;
>>
>> TIA,
>>
>

Reply via email to