Hi all, I'm trying to follow the code in 1.16 SNAPSHOT to have a Kinesis sink in PyFlink 1.15, to write the output of a KeyedCoProcessFunction to Kinesis.
1. If I use ".set_serialization_schema(SimpleStringSchema())", then I got the error message: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap') at org.apache.flink.api.common.serialization.SimpleStringSchema.serialize(SimpleStringSchema.java:36) 2. I then tried to write my own implementation of the `SerializationSchema`: public class BytesSerDeSchema implements DeserializationSchema<byte[]>, SerializationSchema<byte[]> Then the code ran, but I got 16 bytes added to the beginning of every event before they were sent to Kinesis (debugging showed that the `element` variable in this method of the class BytesSerDeSchema: public byte[] serialize(byte[] element) already has those bytes padded). I guess those padded bytes are a side product of Python pickle. What I have to do now is to remove those 16 bytes in that `serialize` method. Could you please suggest a proper solution? Thanks a lot. Regards, Huyen On Mon, 27 Jun 2022 at 11:42, Dian Fu <dian0511...@gmail.com> wrote: > Hi John, > > Kinesis and most of the other connectors will be supported in 1.16, see > [1] for more details about kinesis. > > For versions prior to 1.16, you could try just as Andrew suggested or > refer to the implementations which are already available in the master as > examples. > > Regards, > Dian > > [1] > https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors/kinesis.py > > On Fri, Jun 24, 2022 at 9:20 PM Andrew Otto <o...@wikimedia.org> wrote: > >> I've had success using the Java in pyflink via pyflink.java_gateway. >> Something like: >> >> from pyflink.java_gateway import get_gateway >> jvm = get_gateway() >> >> # then perhaps something like: >> FlinkKinesisConsumer = jvm. >> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer >> >> There also seems to be a nice java_utils.py >> <https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/util/java_utils.py> >> with helpers that may uh, help. >> >> Not sure if this will work, you might need to use the python env's a java >> StreamTableEnvironment to do it? Here's an example >> <https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/datastream/stream_execution_environment.py#L922-L937> >> of how the python StreamTableEnvironment calls out to the Java one. >> >> BTW: I'm not an authority nor I have I really tried this, so take this >> advice with a grain of salt! :) >> >> Good luck! >> >> >> >> >> >> >> On Fri, Jun 24, 2022 at 9:06 AM John Tipper <john_tip...@hotmail.com> >> wrote: >> >>> Hi all, >>> >>> There are a number of connectors which do not appear to be in the Python >>> API v1.15.0, e.g. Kinesis. I can see that it's possible to use these >>> connectors by using the Table API: >>> >>> CREATE TABLE my_table (...) >>> WITH ('connector' = 'kinesis' ...) >>> >>> >>> I guess if you wanted the stream as a DataStream you'd I guess you'd >>> create the Table and then convert into a DataStream? >>> >>> Is there a way of directly instantiating these connectors in PyFlink >>> without needed to use SQL like this (and without having to wait until >>> v1.16)? e.g. the Java API looks like this: >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( >>> "kinesis_stream_name", new SimpleStringSchema(), consumerConfig)); >>> >>> >>> Many thanks, >>> >>> John >>> >>