Hi Dian, all,
The way I resolved right now, is to write my own custom serializer which
only maps from bytes to bytes. See the code below:
public class KafkaBytesSerializer implements SerializationSchema<byte[]>,
DeserializationSchema<byte[]> {
@Override
public byte[] deserialize(byte[] bytes) throws IOException {
return bytes;
}
@Override
public boolean isEndOfStream(byte[] bytes) {
return false;
}
@Override
public byte[] serialize(byte[] bytes) {
return bytes;
}
@Override
public TypeInformation<byte[]> getProducedType() {
return TypeInformation.of(byte[].class);
}
}
This code is packaged in a jar and uploaded through env.add_jars. That
works like a charm!
Thanks for the help!
Wouter
On Fri, 4 Jun 2021 at 14:40, Wouter Zorgdrager <[email protected]>
wrote:
> Hi Dian, all,
>
> Thanks for your suggestion. Unfortunately, it does not seem to work. I get
> the following exception:
>
> Caused by: java.lang.NegativeArraySizeException: -2147183315
> at
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
> at
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31)
> at
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92)
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>
> To be more precise, the messages in my Kafka topic are pickled Python
> objects. Maybe that is the reason for the exception, I also tried using
> Types.PICKLED_BYTE_ARRAY().get_java_type_info()
> but I think that has the same serializer because I get the same exception.
>
> Any suggestions? Thanks for your help!
>
> Regards,
> Wouter
>
> On Fri, 4 Jun 2021 at 08:24, Dian Fu <[email protected]> wrote:
>
>> Hi Wouter,
>>
>> E org.apache.flink.api.python.shaded.py4j.Py4JException:
>> Constructor
>> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>> org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class
>> org.apache.flink.configuration.Configuration]) does not exist
>>
>>
>> As the exception indicate, the constructor doesn’t exists.
>>
>>
>>
>> Could you try with the following:
>>
>> ```
>> j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
>> j_type_serializer=
>>
>> j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
>>
>> j_byte_string_schema =
>> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info,
>> j_type_serializer)
>>
>> ```
>>
>> Regards,
>> Dian
>>
>> 2021年6月3日 下午8:51,Wouter Zorgdrager <[email protected]> 写道:
>>
>> Hi all,
>>
>> I have a PyFlink job connected to a KafkaConsumer and Producer. I want to
>> directly work with the bytes from and to Kafka because I want to
>> serialize/deserialize in my Python code rather than the JVM environment.
>> Therefore, I can't use the SimpleStringSchema for (de)serialization (the
>> messages aren't strings anyways). I've tried to create a
>> TypeInformationSerializer with Types.BYTE(), see the code snippet below:
>>
>> class ByteSerializer(SerializationSchema, DeserializationSchema):
>> def __init__(self, execution_environment):
>> gate_way = get_gateway()
>>
>> j_byte_string_schema =
>> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
>> Types.BYTE().get_java_type_info(),
>> get_j_env_configuration(execution_environment),
>> )
>> SerializationSchema.__init__(self,
>> j_serialization_schema=j_byte_string_schema)
>> DeserializationSchema.__init__(
>> self, j_deserialization_schema=j_byte_string_schema
>> )The ByteSerializer is used like this:
>>
>>
>> return FlinkKafkaConsumer(
>> ["client_request", "internal"],
>> ByteSerializer(self.env._j_stream_execution_environment),
>> {
>> "bootstrap.servers": "localhost:9092",
>> "auto.offset.reset": "latest",
>> "group.id": str(uuid.uuid4()),
>> },
>> )
>> However, this does not seem to work. I think the error is thrown in the JVM
>> environment, which makes it a bit hard to parse in my Python stack trace,
>>
>> but I think it boils down to this stacktrace part:
>>
>>
>> answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException:
>> Constructor
>> org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat
>> java.base/java.lang.Thread.run(Thread.java:834)\\n'
>> gateway_client = <py4j.java_gateway.GatewayClient object at 0x140c43550>
>> target_id = None
>> name =
>> 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema'
>>
>> def get_return_value(answer, gateway_client, target_id=None, name=None):
>> """Converts an answer received from the Java gateway into a Python
>> object.
>>
>> For example, string representation of integers are converted to
>> Python
>> integer, string representation of objects are converted to JavaObject
>> instances, etc.
>>
>> :param answer: the string returned by the Java gateway
>> :param gateway_client: the gateway client used to communicate with
>> the Java
>> Gateway. Only necessary if the answer is a reference (e.g.,
>> object,
>> list, map)
>> :param target_id: the name of the object from which the answer comes
>> from
>> (e.g., *object1* in `object1.hello()`). Optional.
>> :param name: the name of the member from which the answer comes from
>> (e.g., *hello* in `object1.hello()`). Optional.
>> """
>> if is_error(answer)[0]:
>> if len(answer) > 1:
>> type = answer[1]
>> value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
>> if answer[1] == REFERENCE_TYPE:
>> raise Py4JJavaError(
>> "An error occurred while calling {0}{1}{2}.\n".
>> format(target_id, ".", name), value)
>> else:
>> > raise Py4JError(
>> "An error occurred while calling {0}{1}{2}.
>> Trace:\n{3}\n".
>> format(target_id, ".", name, value))
>> E py4j.protocol.Py4JError: An error occurred while calling
>> None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.
>> Trace:
>> E org.apache.flink.api.python.shaded.py4j.Py4JException:
>> Constructor
>> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>> org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class
>> org.apache.flink.configuration.Configuration]) does not exist
>> E at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
>> E at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
>> E at
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237)
>> E at
>> org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
>> E at
>> org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
>> E at
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>> E at java.base/java.lang.Thread.run(Thread.java:834)
>> I hope you can help me out!
>>
>>
>> Thanks in advance,
>>
>> Wouter
>>
>>
>>