Firehose implements sink2 which is introduced in Flink 1.15. But the method inputStream#sinkTo(xxx) only accepts sink1 in Flink 1.13.
If you still want to use Firehose in Flink 1.13, I guess you may need to implement a SinkV2Adapter Or to t ranslates Sink V2 into Sink V1 like SinkV1Adapter in Flink 1.15 or rewrite some code of Firehose connector to migrate it to sink1. Best regards, Yuxia 发件人: "Zain Haider Nemati" <zain.hai...@retailo.co> 收件人: "Martijn Visser" <martijnvis...@apache.org> 抄送: "yu'an huang" <h.yuan...@gmail.com>, "User" <user@flink.apache.org> 发送时间: 星期四, 2022年 5 月 12日 下午 3:36:46 主题: Re: Incompatible data types while using firehose sink Hi, Appreciate your response. My flink version is 1.13. Is there any other way to sink data to kinesis without having to update to 1.15 On Thu, May 12, 2022 at 12:25 PM Martijn Visser < [ mailto:martijnvis...@apache.org | martijnvis...@apache.org ] > wrote: I'm guessing this must be Flink 1.15 since Firehose was added in that version :) On Thu, 12 May 2022 at 08:41, yu'an huang < [ mailto:h.yuan...@gmail.com | h.yuan...@gmail.com ] > wrote: BQ_BEGIN Hi, Your code is working fine in my computer. What is the Flink version you are using. BQ_BEGIN On 12 May 2022, at 3:39 AM, Zain Haider Nemati < [ mailto:zain.hai...@retailo.co | zain.hai...@retailo.co ] > wrote: Hi Folks, Getting this error when sinking data to a firehosesink, would really appreciate some help ! DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer<>("xxx", new SimpleStringSchema(), properties)); Properties sinkProperties = new Properties(); sinkProperties.put(AWSConfigConstants.AWS_REGION, "xxx"); sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "xxx"); sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "xxx"); KinesisFirehoseSink<String> kdfSink = KinesisFirehoseSink.<String>builder() .setFirehoseClientProperties(sinkProperties) .setSerializationSchema(new SimpleStringSchema()) .setDeliveryStreamName("xxx") .setMaxBatchSize(350) .build(); inputStream.sinkTo(kdfSink); incompatible types: org.apache.flink.connector.firehose.sink.KinesisFirehoseSink<java.lang.String> cannot be converted to org.apache.flink.api.connector.sink.Sink<java.lang.String,?,?,?> BQ_END BQ_END