Hi all,

Does anyone know if it's possible to specify a java map function at some
intermediate point in a pyflink job? In this case

SimpleCountMeasurementsPerUUID

is a flink java MapFunction. The reason we want to do this is that
performance in pyflink seems quite poor.
e.g.

import logging
import os
import sys
import zlib

import Measurements_pb2
from pyflink.common import Types
from pyflink.common.serialization import
KafkaRecordSerializationSchemaBuilder, SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment,
RuntimeExecutionMode, MapFunction, RuntimeContext, \
    CheckpointingMode
from pyflink.datastream.connectors import RMQConnectionConfig,
RMQSource, KafkaSink

from functions.common import KeyByUUID
from functions.file_lister import auto_load_python_files
from customisations.serialisation import ZlibDeserializationSchema


class ZlibDecompressor(MapFunction):
    def map(self, value):
        decomp = zlib.decompress(value[1])
        return value[0], decomp


class MeasurementSnapshotCountMapFunction(MapFunction):
    def map(self, value):
        pb_body = Measurements_pb2.MeasurementSnapshot()
        pb_body.ParseFromString(value)
        meas_count = len(pb_body.measurements)
        if meas_count > 0:
            first_measurement = pb_body.measurements[0]
            point_uuid = first_measurement.point_uuid.value
            timestamp = first_measurement.time

            return timestamp, point_uuid, meas_count

        return None


def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    jarpath = 
f"file://{os.getcwd()}/../src-java/switchdin-flink-serialization/target/serialization-1.0-SNAPSHOT.jar"
    env.add_jars(jarpath)
    auto_load_python_files(env)
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)
    env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE)

    connection_config = RMQConnectionConfig.Builder() \
        .set_host("rabbitmq") \
        .set_port(5672) \
        .set_virtual_host("/") \
        .set_user_name("guest") \
        .set_password("guest") \
        .set_connection_timeout(60) \
        .set_prefetch_count(5000) \
        .build()

    deserialization_schema = ZlibDeserializationSchema()

    stream = env.add_source(RMQSource(
        connection_config,
        "flink-test",
        False,
        deserialization_schema,
    )).set_parallelism(1)

    # compute word count
    dstream = 
stream.map(MeasurementSnapshotCountMapFunction()).uid("DecompressRMQData")
\
        .key_by(KeyByUUID(), key_type=Types.STRING()) \
        .jMap("org.switchdin.operators.SimpleCountMeasurementsPerUUID")
 # Hypothetical

    kafka_serialisation_schema = KafkaRecordSerializationSchemaBuilder() \
        .set_value_serialization_schema(SimpleStringSchema()) \
        .set_topic("flink-test-kafka") \
        .build()

    dstream.sink_to(
        KafkaSink.builder() \
            .set_record_serializer(kafka_serialisation_schema) \
            .set_bootstrap_servers("kafka:9092") \
            .build()
    )

    # submit for execution
    env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
    word_count()

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia

Reply via email to