Firehose implements sink2 which is introduced in Flink 1.15. But the method 
inputStream#sinkTo(xxx) only accepts sink1 in Flink 1.13. 

If you still want to use Firehose in Flink 1.13, I guess you may need to 
implement a SinkV2Adapter Or to t ranslates Sink V2 into Sink V1 like 
SinkV1Adapter in Flink 1.15 or rewrite some code of Firehose connector to 
migrate it to sink1. 

Best regards, 
Yuxia 


发件人: "Zain Haider Nemati" <zain.hai...@retailo.co> 
收件人: "Martijn Visser" <martijnvis...@apache.org> 
抄送: "yu'an huang" <h.yuan...@gmail.com>, "User" <user@flink.apache.org> 
发送时间: 星期四, 2022年 5 月 12日 下午 3:36:46 
主题: Re: Incompatible data types while using firehose sink 

Hi, Appreciate your response. 
My flink version is 1.13. 
Is there any other way to sink data to kinesis without having to update to 1.15 

On Thu, May 12, 2022 at 12:25 PM Martijn Visser < [ 
mailto:martijnvis...@apache.org | martijnvis...@apache.org ] > wrote: 



I'm guessing this must be Flink 1.15 since Firehose was added in that version 
:) 

On Thu, 12 May 2022 at 08:41, yu'an huang < [ mailto:h.yuan...@gmail.com | 
h.yuan...@gmail.com ] > wrote: 

BQ_BEGIN

Hi, 

Your code is working fine in my computer. What is the Flink version you are 
using. 





BQ_BEGIN

On 12 May 2022, at 3:39 AM, Zain Haider Nemati < [ 
mailto:zain.hai...@retailo.co | zain.hai...@retailo.co ] > wrote: 

Hi Folks, 
Getting this error when sinking data to a firehosesink, would really appreciate 
some help ! 

DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer<>("xxx", 
new SimpleStringSchema(), properties)); 

Properties sinkProperties = new Properties(); 
sinkProperties.put(AWSConfigConstants.AWS_REGION, "xxx"); 
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "xxx"); 
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "xxx"); 
KinesisFirehoseSink<String> kdfSink = KinesisFirehoseSink.<String>builder() 
.setFirehoseClientProperties(sinkProperties) 
.setSerializationSchema(new SimpleStringSchema()) 
.setDeliveryStreamName("xxx") 
.setMaxBatchSize(350) 
.build(); 



inputStream.sinkTo(kdfSink); 

incompatible types: 
org.apache.flink.connector.firehose.sink.KinesisFirehoseSink<java.lang.String> 
cannot be converted to 
org.apache.flink.api.connector.sink.Sink<java.lang.String,?,?,?> 





BQ_END


BQ_END


Reply via email to