Hi all, Does anyone know if it's possible to specify a java map function at some intermediate point in a pyflink job? In this case
SimpleCountMeasurementsPerUUID is a flink java MapFunction. The reason we want to do this is that performance in pyflink seems quite poor. e.g. import logging import os import sys import zlib import Measurements_pb2 from pyflink.common import Types from pyflink.common.serialization import KafkaRecordSerializationSchemaBuilder, SimpleStringSchema from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, MapFunction, RuntimeContext, \ CheckpointingMode from pyflink.datastream.connectors import RMQConnectionConfig, RMQSource, KafkaSink from functions.common import KeyByUUID from functions.file_lister import auto_load_python_files from customisations.serialisation import ZlibDeserializationSchema class ZlibDecompressor(MapFunction): def map(self, value): decomp = zlib.decompress(value[1]) return value[0], decomp class MeasurementSnapshotCountMapFunction(MapFunction): def map(self, value): pb_body = Measurements_pb2.MeasurementSnapshot() pb_body.ParseFromString(value) meas_count = len(pb_body.measurements) if meas_count > 0: first_measurement = pb_body.measurements[0] point_uuid = first_measurement.point_uuid.value timestamp = first_measurement.time return timestamp, point_uuid, meas_count return None def word_count(): env = StreamExecutionEnvironment.get_execution_environment() jarpath = f"file://{os.getcwd()}/../src-java/switchdin-flink-serialization/target/serialization-1.0-SNAPSHOT.jar" env.add_jars(jarpath) auto_load_python_files(env) env.set_runtime_mode(RuntimeExecutionMode.STREAMING) # write all the data to one file env.set_parallelism(1) env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE) connection_config = RMQConnectionConfig.Builder() \ .set_host("rabbitmq") \ .set_port(5672) \ .set_virtual_host("/") \ .set_user_name("guest") \ .set_password("guest") \ .set_connection_timeout(60) \ .set_prefetch_count(5000) \ .build() deserialization_schema = ZlibDeserializationSchema() stream = env.add_source(RMQSource( connection_config, "flink-test", False, deserialization_schema, )).set_parallelism(1) # compute word count dstream = stream.map(MeasurementSnapshotCountMapFunction()).uid("DecompressRMQData") \ .key_by(KeyByUUID(), key_type=Types.STRING()) \ .jMap("org.switchdin.operators.SimpleCountMeasurementsPerUUID") # Hypothetical kafka_serialisation_schema = KafkaRecordSerializationSchemaBuilder() \ .set_value_serialization_schema(SimpleStringSchema()) \ .set_topic("flink-test-kafka") \ .build() dstream.sink_to( KafkaSink.builder() \ .set_record_serializer(kafka_serialisation_schema) \ .set_bootstrap_servers("kafka:9092") \ .build() ) # submit for execution env.execute() if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") word_count() -- This email and any attachments are proprietary and confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily reflect or represent those of SwitchDin Pty Ltd. If you have received this email in error, please let us know immediately by reply email and delete it from your system. You may not use, disseminate, distribute or copy this message nor disclose its contents to anyone. SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia