[ https://issues.apache.org/jira/browse/FLINK-19332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Metzger updated FLINK-19332: ----------------------------------- Fix Version/s: (was: 1.8.2) > Special characters issue using Kinesis Data Analytics for Apache Flink > ---------------------------------------------------------------------- > > Key: FLINK-19332 > URL: https://issues.apache.org/jira/browse/FLINK-19332 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis > Affects Versions: 1.8.2 > Reporter: Zekun Yu > Priority: Critical > > Hi there, > > I am encountering one special character issue while using Kinesis Data > Analytics for Apache Flink (KDA). > > Our KDA is built for processing data and outputting to a Kinesis stream. We > have a lambda function that subscribes to the Kinesis stream and reads > records from the Kinesis stream. > The library in the KDA I am using is > "org.apache.flink.streaming.connectors.kinesis". > > Our KDA is only outputting one single record to the Kinesis sink using > "collector.collect()" for a single key (details will be found below) > Most times, the record received by the Lambda looks perfectly good. > However, occasionally, when two records are sent to the kinesis sink using > "collector.collect()" at the same time, *we noticed that those two records > are combined somehow and there are some special characters in the record > received by the Lambda function*. > > > > > > > Below are some technical details: > > The KDA is not using any "TimeWindow()" but uses "keyBy()" by some keys. > ).returns(MatchedDataForAlarm.class) > .keyBy(MatchedDataForAlarm::getStateKey) > .connect(ruleBroadcastStream) > .process(new MetricProcess()) > .addSink(kinesis); > > > The "MetricProcess()" extends "KeyedBroadcastProcessFunction" which overrides > the "processElement" function. It uses collector.collect() for outputs. > @Override > public void processElement(MatchedDataForAlarm input, ReadOnlyContext > ctx,Collector<MatchedDataForAlarm> collector) throws Exception { > > > We have our own AEMMatchedDataForAlarmSchemaSerialization which implements > KinesisSerializationSchema<MatchedDataForAlarm>. The serialization simply > converts a "MatchedDataForAlarm" object to String using Gson and then > converts to ByteBuffer. > > @Override > public ByteBuffer serialize(MatchedDataForAlarm matchedDataForAlarm) { > Gson gson = new Gson(); > String result = gson.toJson(matchedDataForAlarm); > _log_.info("Alarm record sent to Kinesis stream: {}", result); > return ByteBuffer._wrap_(result.getBytes()); > } > > > > *Here's the record shown in the Lambda logs when two records are combined > somewhere somehow (most cases those two are received as two separate > records):* > > ???? > 0?? > { "inAlarmState": false } > ?? > { "inAlarmState": false} > e????E????o?N9x > > > > > I am not sure if it's a serialization issue or some default behaviors in the > Kinesis sink library? It might be just some common mistakes that I made which > I am not aware of. > Could anyone help with this problem? I really appreciate it. > > > > Thanks, > Zekun > -- This message was sent by Atlassian Jira (v8.3.4#803005)