Backported to 1.19 and 1.20 so we're fine here.
@Ferenc, thanks for the backports👍

BR,
G


On Wed, Apr 2, 2025 at 8:08 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Yeah, exactly. Go ahead and ping me, we'll punt it in...
>
> On Wed, Apr 2, 2025 at 7:17 PM Ferenc Csaky <ferenc.cs...@pm.me.invalid>
> wrote:
>
>> AFAIK the standard YAML config possibility were introduced in
>> Flink 1.19, it just coexisted with the old solution to keep bw
>> compatibility in the 1.x line.
>>
>> If we bp it to 1.20, it's not much overhead to move it over to
>> 1.19 as well I guess.
>>
>> WDYT?
>>
>>
>>
>> On Wednesday, April 2nd, 2025 at 17:23, Gabor Somogyi <
>> gabor.g.somo...@gmail.com> wrote:
>>
>> >
>> >
>> > I've double checked it and the mentioned fix must be adopted to at least
>> > 1.20
>> > with the is_standard_yaml condition.
>> >
>> >
>> > On Wed, Apr 2, 2025 at 5:07 PM Gabor Somogyi gabor.g.somo...@gmail.com
>> >
>> > wrote:
>> >
>> > > But as a general saying, if the issue exists on 1.x line then at least
>> > > 1.20 must have this fix.
>> > > Let me check that and act accordingly...
>> > >
>> > > BR,
>> > > G
>> > >
>> > > On Wed, Apr 2, 2025 at 5:00 PM Gabor Somogyi
>> gabor.g.somo...@gmail.com
>> > > wrote:
>> > >
>> > > > Yeah, new version of Kafka connector is needed to use 2.x.
>> > > >
>> > > > On Wed, Apr 2, 2025 at 4:56 PM Gulan, Jacob
>> jacob.gu...@duke-energy.com
>> > > > wrote:
>> > > >
>> > > > > I'm able to use the 2.x line, but I see that the
>> "flink-connector-kafka"
>> > > > > latest version is still on 3.4.0-1.20 with no 2.0 tag yet. Will I
>> need to
>> > > > > wait for the 2.x tag to be released?
>> > > > >
>> > > > > ________________________________
>> > > > > From: Gabor Somogyi gabor.g.somo...@gmail.com
>> > > > > Sent: Wednesday, April 2, 2025 10:13 AM
>> > > > > To: dev@flink.apache.org dev@flink.apache.org
>> > > > > Subject: [EXTERNAL] Re: Executing PyFlink Cluster
>> > > > >
>> > > > > *** CAUTION! EXTERNAL SENDER *** STOP. ASSESS. VERIFY!! Were you
>> > > > > expecting this email? Are grammar and spelling correct? Does the
>> content
>> > > > > make sense? Can you verify the sender? If suspicious report it,
>> then do not
>> > > > > click links, open attachments or enter your ID or password.
>> > > > >
>> > > > > I think I've already fixed this here [1] but only on 2.x line.
>> > > > > Up until now I thought that only 2.x is using YAML based
>> configs...
>> > > > >
>> > > > > [1] https://github.com/apache/flink/pull/26327
>> > > > >
>> > > > > BR,
>> > > > > G
>> > > > >
>> > > > > On Wed, Apr 2, 2025 at 4:08 PM Andreas Bube
>> > > > > ab...@toogoodtogo.com.invalid
>> > > > > wrote:
>> > > > >
>> > > > > > There's an open issue that looks very similar:
>> > > > > > https://issues.apache.org/jira/browse/FLINK-36457. No idea how
>> to work
>> > > > > > around this.
>> > > > > >
>> > > > > > On Wed, 2 Apr 2025 at 15:13, Gulan, Jacob <
>> jacob.gu...@duke-energy.com
>> > > > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Hello,
>> > > > > > >
>> > > > > > > I've been running into a wall for the past few days trying to
>> get my
>> > > > > > > PyFlink cluster running in a Docker container on my local
>> machine.
>> > > > > > >
>> > > > > > > However, whenever I get to the point in my code where I need
>> to run
>> > > > > > > env.execute(), I see the following error (full stack trace at
>> bottom
>> > > > > > > of
>> > > > > > > email):
>> > > > > > >
>> > > > > > > `java.net.MalformedURLException: no protocol:
>> ['file:/opt/flink/opt/flink-python-1.19.2.jar']`
>> > > > > > >
>> > > > > > > It appears that the `file://` prefix is being shortened to
>> `file:`,
>> > > > > > > but
>> > > > > > > I'm not setting this anywhere in my code, leading me to
>> believe
>> > > > > > > there's
>> > > > > > > underlying behavior that's creating this inaccurate file path.
>> > > > > > >
>> > > > > > > If this error is familiar, I would appreciate any advice as
>> to how to
>> > > > > > > remediate the issue. I've attached code snippets to this file
>> of my
>> > > > > > > current
>> > > > > > > implementation. I've tried using both versions 1.19.2 and
>> 1.20.1.
>> > > > > > >
>> > > > > > > Thank you for any assistance you may be able to provide.
>> > > > > > >
>> > > > > > > CODE SNIPPET:
>> > > > > > > ```
>> > > > > > > def parse_record(record: str) -> dict:
>> > > > > > > return json.loads(record)
>> > > > > > >
>> > > > > > > def initialize_environment() -> StreamExecutionEnvironment:
>> > > > > > > """Initialize the Flink environment"""
>> > > > > > > env = StreamExecutionEnvironment.get_execution_environment()
>> > > > > > >
>> > > > > > > flink_kafka_connector_path: str = str(
>> > > > > > > (JAR_DIR / "flink-connector-kafka-3.3.0-1.19.jar").resolve()
>> > > > > > > )
>> > > > > > > flink_python_path: str = str(
>> > > > > > > (JAR_DIR.parent / "opt" /
>> > > > > > > "flink-python-1.19.2.jar").resolve()
>> > > > > > > )
>> > > > > > >
>> > > > > > > print(f"flink_kafka_connector_path:
>> > > > > > > {flink_kafka_connector_path}")
>> > > > > > > print(f"flink_python_path: {flink_python_path}")
>> > > > > > >
>> > > > > > > env.add_jars(f"file://{flink_kafka_connector_path}",
>> > > > > > > f"[file://{flink_python_path}]file://{flink_python_path}")
>> > > > > > > return env
>> > > > > > >
>> > > > > > > def main() -> None:
>> > > > > > > env: StreamExecutionEnvironment = initialize_environment()
>> > > > > > >
>> > > > > > > kafka_properties = {
>> > > > > > > "bootstrap.servers": BROKERS,
>> > > > > > > "security.protocol": "SASL_SSL",
>> > > > > > > "sasl.mechanism": "PLAIN",
>> > > > > > > "sasl.jaas.config":
>> > > > > > > f"org.apache.kafka.common.security.plain.PlainLoginModule
>> required
>> > > > > > > username='{USERNAME}' password='{PASSWORD}';",
>> > > > > > > }
>> > > > > > >
>> > > > > > > kafka_source: KafkaSource = (
>> > > > > > > KafkaSource.builder()
>> > > > > > > .set_topics(SOURCE_TOPIC)
>> > > > > > > .set_properties(kafka_properties)
>> > > > > > > .set_starting_offsets(KafkaOffsetsInitializer.earliest())
>> > > > > > > .set_value_only_deserializer(SimpleStringSchema())
>> > > > > > > .build()
>> > > > > > > )
>> > > > > > >
>> > > > > > > kafka_sink: KafkaSink = (
>> > > > > > > KafkaSink.builder()
>> > > > > > > .set_bootstrap_servers(BROKERS)
>> > > > > > > .set_record_serializer(
>> > > > > > > KafkaRecordSerializationSchema.builder()
>> > > > > > > .set_topic(DESTINATION_TOPIC)
>> > > > > > > .set_value_serialization_schema(SimpleStringSchema())
>> > > > > > > .build()
>> > > > > > > )
>> > > > > > > .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>> > > > > > > .build()
>> > > > > > > )
>> > > > > > >
>> > > > > > > data_stream = env.from_source(
>> > > > > > > kafka_source,
>> > > > > > > WatermarkStrategy.no_watermarks(),
>> > > > > > > "Kafka Source",
>> > > > > > > )
>> > > > > > > transformed_data = data_stream.map(parse_record,
>> > > > > > > output_type=Types.STRING())
>> > > > > > > transformed_data.sink_to(kafka_sink)
>> > > > > > >
>> > > > > > > print("START EXECUTING!")
>> > > > > > >
>> > > > > > > env.execute()
>> > > > > > >
>> > > > > > > if name == "main":
>> > > > > > > main()
>> > > > > > > ```
>> > > > > > >
>> > > > > > > DOCKERFILE:
>> > > > > > >
>> > > > > > > ```
>> > > > >
>> > > > >
>> ##############################################################################
>> > > > >
>> > > > > > > # Stage 1 - Build uber jar using Maven
>> > > > >
>> > > > >
>> ##############################################################################
>> > > > >
>> > > > > > > # Set arguments for image versions
>> > > > > > > ARG JAVA_VERSION=11
>> > > > > > > ARG FLINK_VERSION=1.19
>> > > > > > > ARG SCALA_VERSION=scala_2.12
>> > > > > > >
>> > > > > > > # Source build image
>> > > > > > > FROM maven:3.9-amazoncorretto-${JAVA_VERSION} AS build
>> > > > > > >
>> > > > > > > # Build jars
>> > > > > > > WORKDIR /build
>> > > > > > > COPY pom.xml .
>> > > > > > > RUN mvn dependency:copy-dependencies
>> -DoutputDirectory=/build/jars
>> > > > >
>> > > > >
>> ##############################################################################
>> > > > >
>> > > > > > > # Stage 2 - Create the final image
>> > > > >
>> > > > >
>> ##############################################################################
>> > > > >
>> > > > > > > # Source image
>> > > > > > > FROM
>> flink:${FLINK_VERSION}-${SCALA_VERSION}-java${JAVA_VERSION}
>> > > > > > >
>> > > > > > > # Install Python
>> > > > > > > RUN apt-get update && apt-get install -y python3.11
>> python3-pip
>> > > > > > >
>> > > > > > > # Copy the built jars from the Maven stage
>> > > > > > > COPY --from=build /build/jars /opt/flink/lib
>> > > > > > >
>> > > > > > > # Create symlink so that "python" points to "python3.11"
>> > > > > > > RUN ln -s /usr/bin/python3.11 /usr/bin/python
>> > > > > > >
>> > > > > > > # Set the working directory
>> > > > > > > WORKDIR /opt/flink/src
>> > > > > > >
>> > > > > > > # Copy the Python project files
>> > > > > > > COPY pyproject.toml .
>> > > > > > > COPY README.md .
>> > > > > > > COPY dataproducts /opt/flink/src/dataproducts
>> > > > > > >
>> > > > > > > # Go get a cup of coffee, this will take a while ☕
>> > > > > > > RUN python3.11 -m pip install . \
>> > > > > > > --no-cache-dir
>> > > > > > > ```
>> > > > > > >
>> > > > > > > STACK TRACE:
>> > > > > > > ```
>> > > > > > > root@9e14f49ae422:/opt/flink/src/dataproducts# flink run -py
>> main.py
>> > > > > > > WARNING: Unknown module: jdk.compiler specified to
>> --add-exports
>> > > > > > > WARNING: Unknown module: jdk.compiler specified to
>> --add-exports
>> > > > > > > WARNING: Unknown module: jdk.compiler specified to
>> --add-exports
>> > > > > > > WARNING: Unknown module: jdk.compiler specified to
>> --add-exports
>> > > > > > > WARNING: Unknown module: jdk.compiler specified to
>> --add-exports
>> > > > > > > flink_kafka_connector_path:
>> > > > > > > /opt/flink/lib/flink-connector-kafka-3.3.0-1.19.jar
>> > > > > > > flink_python_path: /opt/flink/opt/flink-python-1.19.2.jar
>> > > > > > > START EXECUTING!
>> > > > > > > Traceback (most recent call last):
>> > > > > > > File "/opt/flink/src/dataproducts/main.py", line 97, in
>> <module>
>> > > > > > > main()
>> > > > > > > File "/opt/flink/src/dataproducts/main.py", line 93, in main
>> > > > > > > env.execute()
>> > > > > > > File
>> > > > >
>> > > > >
>> "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
>> > > > >
>> > > > > > > line 813, in execute
>> > > > > > > File
>> > > > > > >
>> "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
>> > > > > > > line 1322, in call
>> > > > > > > File
>> > > > > > >
>> "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py",
>> > > > > > > line 146, in deco
>> > > > > > > File
>> > > > > > >
>> "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py",
>> > > > > > > line 326, in get_return_value
>> > > > > > > py4j.protocol.Py4JJavaError: An error occurred while calling
>> > > > > > > o0.execute.
>> > > > > > > : java.net.MalformedURLException: no protocol:
>> > > > > > > ['file:/opt/flink/opt/flink-python-1.19.2.jar']
>> > > > > > > at java.base/java.net.URL.<init>(Unknown Source)
>> > > > > > > at java.base/java.net.URL.<init>(Unknown Source)
>> > > > > > > at java.base/java.net.URL.<init>(Unknown Source)
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> > > > >
>> > > > > > > Method)
>> > > > > > > at
>> > > > >
>> > > > >
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> > > > >
>> > > > > > > Source)
>> > > > > > > at
>> > > > >
>> > > > >
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>> > > > >
>> > > > > > > Source)
>> > > > > > > at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>> > > > >
>> > > > > > > at java.base/java.lang.Thread.run(Unknown Source)
>> > > > > > >
>> > > > > > > org.apache.flink.client.program.ProgramAbortException:
>> > > > > > > java.lang.RuntimeException: Python process exits with code: 1
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> > > > >
>> > > > > > > Method)
>> > > > > > > at
>> > > > >
>> > > > >
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> > > > >
>> > > > > > > Source)
>> > > > > > > at
>> > > > >
>> > > > >
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>> > > > >
>> > > > > > > Source)
>> > > > > > > at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
>> > > > >
>> > > > > > > at
>> > > > > > >
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>> > > > >
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
>> > > > >
>> > > > > > > at
>> > > > > > >
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
>> > > > > > > Caused by: java.lang.RuntimeException: Python process exits
>> with
>> > > > > > > code: 1
>> > > > > > > at
>> > > > >
>> > > > >
>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
>> > > > >
>> > > > > > > ... 14 more
>> > > > > > > ```
>>
>

Reply via email to