Hello,

I've been running into a wall for the past few days trying to get my PyFlink 
cluster running in a Docker container on my local machine.

However, whenever I get to the point in my code where I need to run 
env.execute(), I see the following error (full stack trace at bottom of email):

```
java.net.MalformedURLException: no protocol: 
['file:/opt/flink/opt/flink-python-1.19.2.jar']
```

It appears that the `file://` prefix is being shortened to `file:`, but I'm not 
setting this anywhere in my code, leading me to believe there's underlying 
behavior that's creating this inaccurate file path.

If this error is familiar, I would appreciate any advice as to how to remediate 
the issue. I've attached code snippets to this file of my current 
implementation. I've tried using both versions 1.19.2 and 1.20.1.

Thank you for any assistance you may be able to provide.


CODE SNIPPET:
```
def parse_record(record: str) -> dict:
    return json.loads(record)


def initialize_environment() -> StreamExecutionEnvironment:
    """Initialize the Flink environment"""
    env = StreamExecutionEnvironment.get_execution_environment()

    flink_kafka_connector_path: str = str(
        (JAR_DIR / "flink-connector-kafka-3.3.0-1.19.jar").resolve()
    )
    flink_python_path: str = str(
        (JAR_DIR.parent / "opt" / "flink-python-1.19.2.jar").resolve()
    )

    print(f"flink_kafka_connector_path: {flink_kafka_connector_path}")
    print(f"flink_python_path: {flink_python_path}")

    env.add_jars(f"file://{flink_kafka_connector_path}", 
f"file://{flink_python_path}")
    return env


def main() -> None:
    env: StreamExecutionEnvironment = initialize_environment()

    kafka_properties = {
        "bootstrap.servers": BROKERS,
        "security.protocol": "SASL_SSL",
        "sasl.mechanism": "PLAIN",
        "sasl.jaas.config": 
f"org.apache.kafka.common.security.plain.PlainLoginModule required 
username='{USERNAME}' password='{PASSWORD}';",
    }

    kafka_source: KafkaSource = (
        KafkaSource.builder()
        .set_topics(SOURCE_TOPIC)
        .set_properties(kafka_properties)
        .set_starting_offsets(KafkaOffsetsInitializer.earliest())
        .set_value_only_deserializer(SimpleStringSchema())
        .build()
    )

    kafka_sink: KafkaSink = (
        KafkaSink.builder()
        .set_bootstrap_servers(BROKERS)
        .set_record_serializer(
            KafkaRecordSerializationSchema.builder()
            .set_topic(DESTINATION_TOPIC)
            .set_value_serialization_schema(SimpleStringSchema())
            .build()
        )
        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build()
    )

    data_stream = env.from_source(
        kafka_source,
        WatermarkStrategy.no_watermarks(),
        "Kafka Source",
    )
    transformed_data = data_stream.map(parse_record, output_type=Types.STRING())
    transformed_data.sink_to(kafka_sink)

    print("START EXECUTING!")

    env.execute()


if __name__ == "__main__":
    main()
```



DOCKERFILE:

```
##############################################################################
# Stage 1 - Build uber jar using Maven
##############################################################################

# Set arguments for image versions
ARG JAVA_VERSION=11
ARG FLINK_VERSION=1.19
ARG SCALA_VERSION=scala_2.12

# Source build image
FROM maven:3.9-amazoncorretto-${JAVA_VERSION} AS build

# Build jars
WORKDIR /build
COPY pom.xml .
RUN mvn dependency:copy-dependencies -DoutputDirectory=/build/jars


##############################################################################
# Stage 2 - Create the final image
##############################################################################

# Source image
FROM flink:${FLINK_VERSION}-${SCALA_VERSION}-java${JAVA_VERSION}

# Install Python
RUN apt-get update && apt-get install -y python3.11 python3-pip

# Copy the built jars from the Maven stage
COPY --from=build /build/jars /opt/flink/lib

# Create symlink so that "python" points to "python3.11"
RUN ln -s /usr/bin/python3.11 /usr/bin/python

# Set the working directory
WORKDIR /opt/flink/src

# Copy the Python project files
COPY pyproject.toml .
COPY README.md .
COPY dataproducts /opt/flink/src/dataproducts

# Go get a cup of coffee, this will take a while ☕
RUN python3.11 -m pip install . \
    --no-cache-dir
```

STACK TRACE:
```
root@9e14f49ae422:/opt/flink/src/dataproducts# flink run -py main.py
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
flink_kafka_connector_path: /opt/flink/lib/flink-connector-kafka-3.3.0-1.19.jar
flink_python_path: /opt/flink/opt/flink-python-1.19.2.jar
START EXECUTING!
Traceback (most recent call last):
  File "/opt/flink/src/dataproducts/main.py", line 97, in <module>
    main()
  File "/opt/flink/src/dataproducts/main.py", line 93, in main
    env.execute()
  File 
"/opt/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
 line 813, in execute
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 
1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 
146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 
326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: java.net.MalformedURLException: no protocol: 
['file:/opt/flink/opt/flink-python-1.19.2.jar']
        at java.base/java.net.URL.<init>(Unknown Source)
        at java.base/java.net.URL.<init>(Unknown Source)
        at java.base/java.net.URL.<init>(Unknown Source)
        at 
org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133)
        at 
org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77)
        at 
org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77)
        at 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
        at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)
        at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Unknown Source)

org.apache.flink.client.program.ProgramAbortException: 
java.lang.RuntimeException: Python process exits with code: 1
        at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 14 more
```

Reply via email to