Hi all,
I have a PyFlink job connected to a KafkaConsumer and Producer. I want to
directly work with the bytes from and to Kafka because I want to
serialize/deserialize in my Python code rather than the JVM environment.
Therefore, I can't use the SimpleStringSchema for (de)serialization (the
messages aren't strings anyways). I've tried to create a
TypeInformationSerializer with Types.BYTE(), see the code snippet below:
class ByteSerializer(SerializationSchema, DeserializationSchema):
def __init__(self, execution_environment):
gate_way = get_gateway()
j_byte_string_schema =
gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
Types.BYTE().get_java_type_info(),
get_j_env_configuration(execution_environment),
)
SerializationSchema.__init__(self,
j_serialization_schema=j_byte_string_schema)
DeserializationSchema.__init__(
self, j_deserialization_schema=j_byte_string_schema
)The ByteSerializer is used like this:
return FlinkKafkaConsumer(
["client_request", "internal"],
ByteSerializer(self.env._j_stream_execution_environment),
{
"bootstrap.servers": "localhost:9092",
"auto.offset.reset": "latest",
"group.id": str(uuid.uuid4()),
},
)
However, this does not seem to work. I think the error is thrown in
the JVM environment, which makes it a bit hard to parse in my Python
stack trace,
but I think it boils down to this stacktrace part:
answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException:
Constructor
org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat
java.base/java.lang.Thread.run(Thread.java:834)\\n'
gateway_client = <py4j.java_gateway.GatewayClient object at 0x140c43550>
target_id = None
name =
'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema'
def get_return_value(answer, gateway_client, target_id=None, name=None):
"""Converts an answer received from the Java gateway into a
Python object.
For example, string representation of integers are converted to Python
integer, string representation of objects are converted to JavaObject
instances, etc.
:param answer: the string returned by the Java gateway
:param gateway_client: the gateway client used to communicate
with the Java
Gateway. Only necessary if the answer is a reference (e.g., object,
list, map)
:param target_id: the name of the object from which the answer
comes from
(e.g., *object1* in `object1.hello()`). Optional.
:param name: the name of the member from which the answer comes from
(e.g., *hello* in `object1.hello()`). Optional.
"""
if is_error(answer)[0]:
if len(answer) > 1:
type = answer[1]
value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
if answer[1] == REFERENCE_TYPE:
raise Py4JJavaError(
"An error occurred while calling {0}{1}{2}.\n".
format(target_id, ".", name), value)
else:
> raise Py4JError(
"An error occurred while calling {0}{1}{2}.
Trace:\n{3}\n".
format(target_id, ".", name, value))
E py4j.protocol.Py4JError: An error occurred while
calling
None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.
Trace:
E
org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class
org.apache.flink.configuration.Configuration]) does not exist
E at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
E at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
E at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237)
E at
org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
E at
org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
E at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E at java.base/java.lang.Thread.run(Thread.java:834)
I hope you can help me out!
Thanks in advance,
Wouter