PyFlink StateTtlConfig and Thread Mode

2025-02-10 Thread Justin Ngai via user
Hello I've been trying to setup StateTtl for my PyFlink application using thread mode. I was following the examples in the docs (needed to use Time instead of Duration) -- https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#state-in-the-scala-datas

Re: Using data classes in pyflink

2025-01-16 Thread Dian Fu
, 2025 at 11:26 PM > To: user@flink.apache.org > Subject: Using data classes in pyflink > > Hi, > > > > I'm trying to understand how to use python classes in flink DataStream > pipelines. I'm using python 3.11 and flink 1.19. > > I've tried runni

Re: Using data classes in pyflink

2025-01-13 Thread Pavel Dmitriev
ect: Using data classes in pyflink Hi, I'm trying to understand how to use python classes in flink DataStream pipelines. I'm using python 3.11 and flink 1.19. I've tried running a few simple programs and require some guidance. Here's the first example: from dataclasses import

Re: Using data classes in pyflink

2025-01-13 Thread Nikola Milutinovic
is that possible, you will need to investigate. Nix. From: Oleksii Sh Date: Friday, January 10, 2025 at 11:26 PM To: user@flink.apache.org Subject: Using data classes in pyflink Hi, I'm trying to understand how to use python classes in flink DataStream pipelines. I'm using pytho

Using data classes in pyflink

2025-01-10 Thread Oleksii Sh
Hi, I'm trying to understand how to use python classes in flink DataStream pipelines. I'm using python 3.11 and flink 1.19. I've tried running a few simple programs and require some guidance. Here's the first example: from dataclasses import dataclass from pyflink.common import Configuration

Pyflink checkpoints on s3

2024-12-09 Thread Phil Stavridis
Hello, I am trying to configure my Flink (1.18.1) jobs to store checkpoints on s3 but I am getting the below error. Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side. ... Caused by: org.apache.flink.core.fs.UnsupportedFile

Calling 3rd party JARs from Pyflink

2024-11-06 Thread Akos Barabas
Hello, I want to read data from Kafka topics which have Confluent-encoded Protobuf messages (not plain Protobuf, Confluent adds a "magic byte" and schema ID) with Python Datastream API. I have found that Confluent has a Java class[1] which implements org.apache.kafka.common.serialization.Deseriali

Using KafkaSink in PyFlink with a key

2024-09-17 Thread Andrew Raftery
Hi, I'm trying to send data to a Kafka topic using PyFlink (DataStream API), while setting a key on the Kakfa record. The key is a simple string, the value is a JSON string. What I have so far basically works, except the whole record is sent as both the key and the value. How do I specify t

Re: PyFlink on EMR on EKS

2024-09-03 Thread Ahmed Hamdy
0-amzn-0.jar:1.18.0-amzn-0] > at > org.apache.flink.client.python.PythonEnvUtils.addToPythonPath(PythonEnvUtils.java:291) > ~[?:?] > at > org.apache.flink.client.python.PythonEnvUtils.preparePythonEnvironment(PythonEnvUtils.java:226) > ~[?:?] > at > org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient(PythonEnvUtils.java:487) > ~[?:?] > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:92) > ~[?:?] > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) ~[?:?] > at > jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:?] > at > jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:?] > at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) > ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) > ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0] > > ``` > > I tested the Flink job locally and it perfectly worker. I even tried to > translate it in Java and run using the AWS EMR_on_EKS_Flin container and it > also workerd. So I do not understand why it does not work in PyFlink. > Moreover, this error is related to the file system, but I never make any > call to it. And even if I did, why does it work in Java and not in Python ? > > > Best regards, > > KY Alexandre >

PyFlink on EMR on EKS

2024-09-02 Thread Alexandre KY
0-amzn-0] ``` I tested the Flink job locally and it perfectly worker. I even tried to translate it in Java and run using the AWS EMR_on_EKS_Flin container and it also workerd. So I do not understand why it does not work in PyFlink. Moreover, this error is related to the file system, but

RE: Problem reading a CSV file with pyflink datastream in k8s with Flink operator

2024-06-18 Thread gwenael . lebarzic
Hello Rob. This workaround works indeed ! Cdt. [Logo Orange]<http://www.orange.com/> Gwenael Le Barzic De : Robert Young Envoyé : mardi 18 juin 2024 03:54 À : LE BARZIC Gwenael DTSI/SI Cc : user@flink.apache.org Objet : Re: Problem reading a CSV file with pyflink datastream in k8

Re: Problem reading a CSV file with pyflink datastream in k8s with Flink operator

2024-06-17 Thread Robert Young
70%2085%2075> > gwenael.lebar...@orange.com > > > > Nouveau lien vers le Portail de suivi des Tickets du CXP > <https://portail.agir.orange.com/servicedesk/customer/portal/35> > > > > > Orange Restricted > De : LE BARZIC Gwenael DTSI/SI > *Envoyé :* v

RE: Problem reading a CSV file with pyflink datastream in k8s with Flink operator

2024-06-17 Thread gwenael . lebarzic
range.com/servicedesk/customer/portal/35> Orange Restricted De : LE BARZIC Gwenael DTSI/SI Envoyé : vendredi 14 juin 2024 22:02 À : user@flink.apache.org Objet : Problem reading a CSV file with pyflink datastream in k8s with Flink operator Hello everyone. I get the following error when t

Problem reading a CSV file with pyflink datastream in k8s with Flink operator

2024-06-14 Thread gwenael . lebarzic
Hello everyone. I get the following error when trying to read a CSV file with pyflink datastream in a k8s environment using the flink operator. ### File "/opt/myworkdir/myscript.py", line 30, in run_flink_job(myfile) File "/opt/myworkdir/myscript.py", line

Re: Which base image to use for pyflink on k8s with flink operator ?

2024-06-14 Thread Gunnar Morling
Hey, I ran into some issues with PyFlink on Kubernetes myself a while ago. Blogged about it here, perhaps it's useful: https://www.decodable.co/blog/getting-started-with-pyflink-on-kubernetes Best, --Gunnar Am Fr., 14. Juni 2024 um 20:58 Uhr schrieb Mate Czagany : > Hi, > >

RE: Which base image to use for pyflink on k8s with flink operator ?

2024-06-14 Thread gwenael . lebarzic
To be sure about that, I can see this in the doc : # install PyFlink COPY apache-flink*.tar.gz / RUN pip3 install /apache-flink-libraries*.tar.gz && pip3 install /apache-flink*.tar.gz Is the result the same than this command below : RUN pip install --no-cache-dir -r requirements.t

RE: Which base image to use for pyflink on k8s with flink operator ?

2024-06-14 Thread gwenael . lebarzic
stomer/portal/35> De : Mate Czagany Envoyé : vendredi 14 juin 2024 18:30 À : LE BARZIC Gwenael DTSI/SI Cc : user@flink.apache.org Objet : Re: Which base image to use for pyflink on k8s with flink operator ? CAUTION : This email originated outside the company. Do not click on any links or open at

Re: Which base image to use for pyflink on k8s with flink operator ?

2024-06-14 Thread Mate Czagany
ll Flink > libraries if I am not mistaken. > > Regards, > Mate > > ezt írta (időpont: 2024. jún. 14., P, > 17:22): > >> Hello everyone. >> >> >> >> I contact you because I’m encountereing some strange difficulties with >> pyflink on Kuberne

Re: Which base image to use for pyflink on k8s with flink operator ?

2024-06-14 Thread Mate Czagany
2024. jún. 14., P, 17:22): > Hello everyone. > > > > I contact you because I’m encountereing some strange difficulties with > pyflink on Kubernetes using the flink operator. > > So, first thing first, I was wondering which base image should I use for > my python ima

Which base image to use for pyflink on k8s with flink operator ?

2024-06-14 Thread gwenael . lebarzic
Hello everyone. I contact you because I'm encountereing some strange difficulties with pyflink on Kubernetes using the flink operator. So, first thing first, I was wondering which base image should I use for my python image that I will then deploy on my Kubernetes cluster ? Can I use

Re:Memory table in pyflink

2024-06-06 Thread Xuyang
filesystem connector as a temporary table to work around? -- Best! Xuyang At 2024-06-07 03:26:27, "Phil Stavridis" wrote: Hello, I am trying to create an in-memory table in PyFlink to use as a staging table after ingesting some data from Kafka but it doesn’t work as expected

Memory table in pyflink

2024-06-06 Thread Phil Stavridis
Hello, I am trying to create an in-memory table in PyFlink to use as a staging table after ingesting some data from Kafka but it doesn’t work as expected. I have used the print connector which prints the results but I need to use a similar connector that stores staging results. I have tried

Re: SSL Kafka PyFlink

2024-05-17 Thread Evgeniy Lyutikov via user
@flink.apache.org Тема: Re: SSL Kafka PyFlink Hi Phil, The kafka configuration keys of ssl maybe not correct. You can refer the kafka document[1] to get the ssl configurations of client. [1] https://kafka.apache.org/documentation/#security_configclients Best, Zhongqiang Gong Phil Stavridis mailto:phi

Re: SSL Kafka PyFlink

2024-05-16 Thread gongzhongqiang
I have a PyFlink job that needs to read from a Kafka topic and the > communication with the Kafka broker requires SSL. > I have connected to the Kafka cluster with something like this using just > Python. > > from confluent_kafka import Consumer, KafkaException, KafkaErro

SSL Kafka PyFlink

2024-05-16 Thread Phil Stavridis
Hi, I have a PyFlink job that needs to read from a Kafka topic and the communication with the Kafka broker requires SSL. I have connected to the Kafka cluster with something like this using just Python. from confluent_kafka import Consumer, KafkaException, KafkaError def get_config

PyFlink Performance

2024-04-24 Thread David Jost
Hi, I am currently evaluating PyFlink in comparison to Java and did some various tests, mainly comparing identical pipelines with focus on throughput. For me it seems, that PyFlink is generally worse for wear and seems to reach its limits in throughput at a point where Java still has resources

Re: Pyflink w Nessie and Iceberg in S3 Jars

2024-04-16 Thread Robert Prat
Hi Péter, Thanks for pointing this out! I was aware of the difference in version between pyflink and some of the JAR dependencies. I was starting out with PyFlink 1.16 and I had some errors when creating the Dockerfile that seemed to be fixed when upgrading the version to 1.18. Thus the

Re: Pyflink w Nessie and Iceberg in S3 Jars

2024-04-16 Thread Péter Váry
Is it intentional, that you are using iceberg-flink-runtime-1.16-1.3.1.jar with 1.18.0 PyFlink? This might cause issues later. I would try to synchronize the Flink versions throughout all the dependencies. On Tue, Apr 16, 2024, 11:23 Robert Prat wrote: > I finally managed to make it w

Re: Pyflink Performance and Benchmark

2024-04-16 Thread Chase Zhang
On Mon, Apr 15, 2024 at 16:17 Niklas Wilcke wrote: > Hi Flink Community, > u > I wanted to reach out to you to get some input about Pyflink performance. > Are there any resources available about Pyflink benchmarks and maybe a > comparison with the Java API? I wasn't ab

Re: Pyflink w Nessie and Iceberg in S3 Jars

2024-04-16 Thread Robert Prat
tils python3-pip python3-requests python3-software-properties python-is-python3 I'm not sure I need all the Jars in the dockerfile but as they say if it ain't broke, don't fix it. From: Robert Prat Sent: Friday, April 12, 2024 3:45 PM To: user@flink

Re: [EXTERNAL]Re: Pyflink Performance and Benchmark

2024-04-15 Thread Niklas Wilcke
ng. You might be interested in > this > blog:https://flink.apache.org/2022/05/06/exploring-the-thread-mode-in-pyflink/, > which did a benchmark on the latter with the common the JSON processing > scenario with UDFs in Java/Python under thread mode/Python under process mode. > >

Re: Pyflink Performance and Benchmark

2024-04-15 Thread Zhanghao Chen
When it comes down to the actual runtime, what really matters is the plan optimization and the operator impl & shuffling. You might be interested in this blog: https://flink.apache.org/2022/05/06/exploring-the-thread-mode-in-pyflink/, which did a benchmark on the latter with the common

Pyflink Performance and Benchmark

2024-04-15 Thread Niklas Wilcke
Hi Flink Community, I wanted to reach out to you to get some input about Pyflink performance. Are there any resources available about Pyflink benchmarks and maybe a comparison with the Java API? I wasn't able to find something valuable, but maybe I missed something? I am aware

Pyflink w Nessie and Iceberg in S3 Jars

2024-04-12 Thread Robert Prat
Hi there, For several days I have been trying to find the right configuration for my pipeline which roughly consists in the following schema RabbitMQ->PyFlink->Nessie/Iceberg/S3. For what I am going to explain I have tried both locally and through the official Flink docker images.

Re: join two streams with pyflink

2024-04-02 Thread Biao Geng
Hi Thierry, Your case is not very complex and I believe all programming language(e.g. Java, Python, SQL) interfaces of flink can do that. When using pyflink, you can use pyflink datastream/table/SQL API. Here are some examples of using pyflink table api: https://nightlies.apache.org/flink/flink

join two streams with pyflink

2024-04-02 Thread Fokou Toukam, Thierry
Hi, i have 2 streams as sean in this example (6> {"tripId": "275118740", "timestamp": "2024-04-02T06:20:00Z", "stopSequence": 13, "stopId": "61261", "bearing": 0.0, "speed": 0.0, "vehicleId": "39006", "routeId": "747"} 1> {"visibility": 1, "weather_conditions": "clear sky", "timestamp": "20

Row to tuple conversion in PyFlink when switching to 'thread' execution mode

2024-03-29 Thread Wouter Zorgdrager
Dear readers, I'm running into some unexpected behaviour in PyFlink when switching execution mode from process to thread. In thread mode, my `Row` gets converted to a tuple whenever I use a UDF in a map operation. By this conversion to tuples, we lose critical information such as column

Re: TTL in pyflink does not seem to work

2024-03-11 Thread Ivan Petrarka
Thanks! We’ve created and issue for that:  https://issues.apache.org/jira/browse/FLINK-34625 Yeap, planning to use timers as workaround for now On Mar 10, 2024 at 02:59 +0400, David Anderson , wrote: > My guess is that this only fails when pyflink is used with the heap state > backend, in

Re: TTL in pyflink does not seem to work

2024-03-09 Thread David Anderson
My guess is that this only fails when pyflink is used with the heap state backend, in which case one possible workaround is to use the RocksDB state backend instead. Another workaround would be to rely on timers in the process function, and clear the state yourself. David On Fri, Mar 8, 2024 at

Re: TTL in pyflink does not seem to work

2024-03-08 Thread lorenzo.affetti.ververica.com via user
in Java code, it prints `State: Null`, `State: Null`, as I was > expecting in, unlike pyflink code > On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , wrote: > > Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to > > work. I have reproduced the exact sam

Re: TTL in pyflink does not seem to work

2024-03-07 Thread Ivan Petrarka
Note, that in Java code, it prints `State: Null`, `State: Null`, as I was expecting in, unlike pyflink code On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , wrote: > Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to > work. I have reproduced the exact same code in Ja

TTL in pyflink does not seem to work

2024-03-07 Thread Ivan Petrarka
Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to work. I have reproduced the exact same code in Java and it works! Is this a pyflink bug? If so - how can I report it? If not - what can I try to do? Flink: 1.18.0 image: flink:1.18.0-scala_2.12-java11 Code to reproduce

Re: [PyFlink] Collect multiple elements in CoProcessFunction

2023-11-22 Thread Alexander Fedulov
Hi David, Thanks for the confirmation. Let's fix the docs: https://github.com/apache/flink/pull/23776 Thanks, Alex On Sun, 19 Nov 2023 at 01:55, David Anderson wrote: > Hi, Alex! > > Yes, in PyFlink the various flatmap and process functions are implemented > as generator f

Re: [PyFlink] Collect multiple elements in CoProcessFunction

2023-11-18 Thread David Anderson
Hi, Alex! Yes, in PyFlink the various flatmap and process functions are implemented as generator functions, so they use yield to emit results. David On Tue, Nov 7, 2023 at 1:16 PM Alexander Fedulov < alexander.fedu...@gmail.com> wrote: > Java ProcessFunction API defines a clear way t

[PyFlink] Collect multiple elements in CoProcessFunction

2023-11-07 Thread Alexander Fedulov
Java ProcessFunction API defines a clear way to collect data via the Collector object. PyFlink documentation also refers to the Collector [1] , but it is not being passed to the function and is also nowhere to be found in the pyflink source code. How can multiple elements be collected? Is "

Why is Apache Beam a required dependency of PyFlink (and can it be removed)?

2023-10-31 Thread Deepyaman Datta
Hi, I'm trying to understand where the Apache Beam dependency comes from; it's not just a regular dependency of PyFlink, but a build system dependency. Searching through the code, it seems like Beam is only used by PyFlink, and not by non-Python Flink. In my (limited) understanding, it

Re: PyFlink MapState with Types.ROW() throws exception

2023-10-05 Thread Elkhan Dadashov
#x27;> type elements. > > Wanted to check if anyone else faced the same issue while trying to use > MapState in PyFlink with complex types. > > Here is the code: > > from pyflink.common import Time > from pyflink.common.typeinfo import Types > from pyflink.datastream im

Python unit test cases approach for PyFlink 1.17.1

2023-10-05 Thread Perez
cases for Pyflink API. Therefore, I tried my way of Standard Python Unittest library Mockito. But, I am facing some JAR issues. This is what I am talking about https://stackoverflow.com/questions/77234674/pyflink-unit-tests-unable-to-find-the-jar-file-passed-within-the-tests Also, can somebody

PyFlink MapState with Types.ROW() throws exception

2023-10-04 Thread Elkhan Dadashov
Hi Flinkers, I'm trying to use MapState, where the value will be a list of type elements. Wanted to check if anyone else faced the same issue while trying to use MapState in PyFlink with complex types. Here is the code: from pyflink.common import Time from pyflink.common.typeinfo import

Re: Pyflink unittest cases

2023-10-04 Thread Perez
f >> Kafka and Flink's low level APIs like map and process functions and since I >> am entirely new to these stuffs, I couldn't get the exact approach/way to >> test Kafka connector and low level APIs. >> >> So can anyone share any working links/references to g

Re: Pyflink unittest cases

2023-10-02 Thread joshua perez
new to these stuffs, I couldn't get the exact approach/way to > test Kafka connector and low level APIs. > > So can anyone share any working links/references to get started with > atleast. We would be working with Pyflink for now since the use case is > very limited. > > I would really appreciate your time. > > J. >

Pyflink unittest cases

2023-09-30 Thread joshua perez
share any working links/references to get started with atleast. We would be working with Pyflink for now since the use case is very limited. I would really appreciate your time. J.

pyflink aggfunction in tvf question

2023-09-12 Thread faronzz
hi flink community~ I came across a problem I didn't understand,I can't use pyflink aggfuction function properly in window tvf, The following are available: java aggfuntion flink system aggfunction window (not window tvf) I want to know if this is a bug or if I'm using i

Custom source function in PyFlink

2023-08-28 Thread Őrhidi Mátyás
Hey folks, I'm looking for an example for creating a custom source in PyFlink. The one that I found in the tests is a wrapper around a java class: def test_add_custom_source(self): custom_source = SourceFunction( "org.apache.flink.python.util.MyCustomSourceFunction") ds = self

Re: PyFlink SQL from Kafka to Iceberg issues

2023-07-17 Thread Martijn Visser
lso decided to > scrap pyflink and go for the sql-client instead to keep things simpler for > now. > > This is the Dockerfile I am using for both the *jobmanager* and the > *sql-client* > > FROM flink:1.16.2-scala_2.12-java11 > > RUN APACHE_HADOOP_URL=https://archive.apac

Re: PyFlink SQL from Kafka to Iceberg issues

2023-07-09 Thread Dániel Pálma
Thanks for the tips Martijn! I've fixed the library versions to 1.16 everywhere and also decided to scrap pyflink and go for the sql-client instead to keep things simpler for now. This is the Dockerfile I am using for both the *jobmanager* and the *sql-client* FROM flink:1.16.2-scala

Re: PyFlink SQL from Kafka to Iceberg issues

2023-06-29 Thread Martijn Visser
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/ Best regards, Martijn On Sat, Jun 24, 2023 at 1:22 PM Dániel Pálma wrote: > Hey folks, > > Nice to meet ya'll! > > I'm trying to get the following stack up and running locally: > &g

PyFlink SQL from Kafka to Iceberg issues

2023-06-24 Thread Dániel Pálma
Hey folks, Nice to meet ya'll! I'm trying to get the following stack up and running locally: - Kafka as source - pyFlink SQL - Iceberg on top of MinIO The goal is to have a pyflink script that reads data from a Kafka topic, does some transformations, and dumps it into an iceberg tabl

Re: PyFlink Error JAR files

2023-06-08 Thread Leo
FlinkKafkaConsumer( topics=topic, # Kafka topic deserialization_schema=deserialization_schema, properties=consumer_props, # Consumer properties ) kafka_consumer.set_start_from_earliest() # Add the Kafka consumer as a source to the execution environment stream=env.add_source(kafka_consumer) # Defin

PyFlink Error JAR files

2023-06-07 Thread Kadiyala, Ruthvik via user
Consumer properties ) kafka_consumer.set_start_from_earliest() # Add the Kafka consumer as a source to the execution environment stream = env.add_source(kafka_consumer) # Define your data processing logic here # For example, you can print the stream to the console stream.print() # Execute

Re: Dynamin Windowing in with Pyflink

2023-05-17 Thread Dian Fu
Hi Nawaz, >> My concern is, as Flink does not support dynamic windows, is this approach going against Flink Architecture. Per my understanding, the session window could be seen as a kind of dynamic window. Besides, Flink also supports user-defined window with which users should also be able to imp

Dynamin Windowing in with Pyflink

2023-05-16 Thread Nawaz Nayeem via user
Hey, I’ve been trying to emulate the behavior of a dynamic window, as Flink does not support dynamic window sizes. My operator inherits from KeyedProcessFunction, and I’m only using KeyedStates to manipulate the window_size. I’m clearing the KeyedStates when my bucket(window) is complete, to reset

Re: Issues using PyFlink

2023-05-14 Thread Dian Fu
Hi Jill, I suspect that the PyFlink isn't installed in the Python environment which is used to run the example. Could you share the complete command you used to execute the example: `./flink-1.17.0/bin/flink run -pyclientexec venv/bin/python3 --python flink-1.17.0/examples/python/ datas

Issues using PyFlink

2023-05-11 Thread Jill Cardamon
d_count.py", line 89, in word_count ds = ds.flat_map(split) \ File "/Users/jill/flink-1.17.0/opt/python/pyflink.zip/pyflink/datastream/data_stream.py", line 354, in flat_map File "/Users/jill/flink-1.17.0/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",

Re: Pyflink Side Output Question and/or suggested documentation change

2023-02-13 Thread Andrew Otto
Thank you! On Mon, Feb 13, 2023 at 5:55 AM Dian Fu wrote: > Thanks Andrew, I think this is a valid advice. I will update the > documentation~ > > Regards, > Dian > > , > > On Fri, Feb 10, 2023 at 10:08 PM Andrew Otto wrote: > >> Question about side outputs

Re: Pyflink Side Output Question and/or suggested documentation change

2023-02-13 Thread Dian Fu
Thanks Andrew, I think this is a valid advice. I will update the documentation~ Regards, Dian , On Fri, Feb 10, 2023 at 10:08 PM Andrew Otto wrote: > Question about side outputs and OutputTags in pyflink. The docs > <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/d

Pyflink Side Output Question and/or suggested documentation change

2023-02-10 Thread Andrew Otto
Question about side outputs and OutputTags in pyflink. The docs <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/> say we are supposed to yield output_tag, value Docs then say: > For retrieving the side output stream you use getSideOutput(OutputTa

Re: Using pyflink from flink distribution

2023-01-31 Thread Andrew Otto
TaskManagers to run > pyflink without having pyflink installed themselves? Somehow I'd guess > this wouldn't work tho; I'd assume TaskManagers would also need some python > transitive dependencies, e.g. google protobuf. > > It has some historical reasons. In the f

Re: Using pyflink from flink distribution

2023-01-30 Thread Dian Fu
>> What is the reason for including opt/python/{pyflink.zip,cloudpickle.zip,py4j.zip} in the base distribution then? Oh, a guess: to make it easier for TaskManagers to run pyflink without having pyflink installed themselves? Somehow I'd guess this wouldn't work tho; I'd assu

Re: Using pyflink from flink distribution

2023-01-30 Thread Andrew Otto
Thanks Dian! > >> Is using pyflink from the flink distribution tarball (without pip) not a supported way to use pyflink? > You are right. What is the reason for including opt/python/{pyflink.zip,cloudpickle.zip,py4j.zip} in the base distribution then? Oh, a guess: to make i

Re: Using pyflink from flink distribution

2023-01-28 Thread Dian Fu
at e.g. /usr/local/lib/python3.7/dist-packages/pyflink/lib! So, by following those instructions, flink is effectively installed twice into the docker image. Yes, your understanding is correct. The base image `flink:1.15.2` doesn't include PyFlink and so you need to build a custom image i

Re: Using pyflink from flink distribution

2023-01-26 Thread Andrew Otto
Ah, oops and my original email had a typo: > Some python dependencies are not included in the flink distribution tarballs: cloudpickle, py4j and pyflink are in opt/python. Should read: > Some python dependencies ARE included in the flink distribution tarballs: cloudpickle, py4j and pyflink

Re: Using pyflink from flink distribution

2023-01-26 Thread Andrew Otto
Foundation On Tue, Jan 24, 2023 at 4:26 PM Andrew Otto wrote: > Hello, > > I'm having quite a bit of trouble running pyflink from the default flink > distribution tarballs. I'd expect the python examples to work as long as > python is installed, and we've got

Re: PyFlink job in kubernetes operator

2023-01-25 Thread Evgeniy Lyutikov
авлено: 25 января 2023 г. 21:03:40 Кому: Evgeniy Lyutikov Копия: user@flink.apache.org Тема: Re: PyFlink job in kubernetes operator Did you check the Python example? https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-python-example<https://eur04.safelinks.protection.ou

Re: PyFlink job in kubernetes operator

2023-01-25 Thread Gyula Fóra
Did you check the Python example? https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-python-example Gyula On Wed, Jan 25, 2023 at 2:54 PM Evgeniy Lyutikov wrote: > Hello > > Is there a way to run PyFlink jobs in k8s with flink kubernetes operator? > And i

PyFlink job in kubernetes operator

2023-01-25 Thread Evgeniy Lyutikov
Hello Is there a way to run PyFlink jobs in k8s with flink kubernetes operator? And if not, is it planned to add such functionality? "This message contains confidential information/commercial secret. If you are not the intended addressee of this message yo

Using pyflink from flink distribution

2023-01-24 Thread Andrew Otto
Hello, I'm having quite a bit of trouble running pyflink from the default flink distribution tarballs. I'd expect the python examples to work as long as python is installed, and we've got the distribution. Some python dependencies are not included in the flink distribution tarbal

PyFlink raises KeyError at on_evict method of CachedMapState

2023-01-11 Thread Ru Zhang
Hello, I am an user of pyflink. The pseudo code of my pipeline is as follows: ``` if not map_state.contains(data[“key”]): processing(data) map_state.put(data[“key”], 0) ``` However, it raises a KeyError: File "/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_im

Re: Upgrading pyflink to 1.16.0

2023-01-10 Thread Gunnar Morling
Hey Ramana, I can't help you with that particular script, but perhaps this commit is helpful to you which I did for upgrading the PyFlink Playground project to 1.16: https://github.com/apache/flink-playgrounds/commit/4aa9a341bbf49e51809bc9cfcf0e946b2accd8ac In particular see the chang

Upgrading pyflink to 1.16.0

2023-01-09 Thread Ramana
All - I am trying to upgrade my pyflink installation from 1.15.2 to 1.16.0. Could someone tell me if that's possible via the 'setup-pyflink-virtual-env.sh 1.16.0' command? I don't want to overwrite any of my configuration files, so what's the clean way of upgrading to 1.16

Re: Unable to write checkpoint metadata to s3 using pyflink (1.16.0)

2022-11-23 Thread Martijn Visser
instructions in the documentation. Best regards, Martijn On Wed, Nov 23, 2022 at 1:53 PM Mujahid Niaz wrote: > Hi team, > Hope Everyone is doing good, > > We have an issue regarding writing checkpoints metadata to S3 using > pyflink datastream api. we are using Apache-Flink==1.1

Unable to write checkpoint metadata to s3 using pyflink (1.16.0)

2022-11-23 Thread Mujahid Niaz
Hi team, Hope Everyone is doing good, We have an issue regarding writing checkpoints metadata to S3 using pyflink datastream api. we are using Apache-Flink==1.16.0. We are able to sink our Stream into s3 but when it comes to writing checkpoint data. we are getting the following error. We tried a

Re: How to use connectors in PyFlink 1.15.0 when not defined in Python API?

2022-10-23 Thread Levan Huyen
Hi all, I'm trying to follow the code in 1.16 SNAPSHOT to have a Kinesis sink in PyFlink 1.15, to write the output of a KeyedCoProcessFunction to Kinesis. 1. If I use ".set_serialization_schema(SimpleStringSchema())", then I got the error message: java.lang.ClassCastException: cla

Re: Cannot run pyflink example using Flink CLI

2022-10-21 Thread Levan Huyen
Great, thanks! Kind regards, Levan Huyen On Fri, 21 Oct 2022 at 00:53, Biao Geng wrote: > You are right. > It contains the python package `pyflink` and some dependencies like py4j > and cloudpickle but does not contain all relevant dependencies(e.g. > `google.protobuf` as the err

Re: Cannot run pyflink example using Flink CLI

2022-10-20 Thread Biao Geng
You are right. It contains the python package `pyflink` and some dependencies like py4j and cloudpickle but does not contain all relevant dependencies(e.g. `google.protobuf` as the error log shows, which I also reproduce in my own machine). Best, Biao Geng Levan Huyen 于2022年10月20日周四 19:53写道

Re: Cannot run pyflink example using Flink CLI

2022-10-20 Thread Levan Huyen
Thanks Biao. May I ask one more question: does the binary package on Apache site (e.g: https://archive.apache.org/dist/flink/flink-1.15.2) contain the python package `pyflink` and its dependencies? I guess the answer is no. Thanks and regards, Levan Huyen On Thu, 20 Oct 2022 at 18:13, Biao Geng

Re: Cannot run pyflink example using Flink CLI

2022-10-20 Thread Biao Geng
Hi Levan, Great to hear that your issue is resolved! For the follow-up question, I am not quite familiar with AWS EMR's configuration for flink but due to the error you attached, it looks like that pyflink may not ship some 'Google' dependencies in the Flink binary zip file and

Re: Cannot run pyflink example using Flink CLI

2022-10-19 Thread Levan Huyen
, the packages are in the current user's location (`~/.local/...) which Flink did not look at. BTW, is there any way to use the pyflink shipped with the Flink binary zip file that I downloaded from Apache's site? On EMR, such package is included, and I feel it's awkward to have to

Re: Cannot run pyflink example using Flink CLI

2022-10-19 Thread Biao Geng
not launch the flink cluster. You can do `start-cluster.sh` first to launch a standalone flink cluster and then try the `flink run ...` command. For your setup4, the reason why it works well is that it will use the default mini cluster to run the pyflink job. So even you haven't started a s

Cannot run pyflink example using Flink CLI

2022-10-19 Thread Levan Huyen
Hi, I'm new to PyFlink, and I couldn't run a basic example that shipped with Flink. This is the command I tried: ./bin/flink run -py examples/python/datastream/word_count.py Here below are the results I got with different setups: 1. On AWS EMR 6.8.0 (Flink 1.15.1): *Error: No mo

Re: Unable to run pyflink job - NetUtils getAvailablePort Error

2022-09-20 Thread Dian Fu
/dd1fddb13b2d08ade580e5b3ec6b8e910974308d On Wed, Sep 7, 2022 at 1:37 PM Ramana wrote: > Hi Xingbo > > I have double checked on this, both the flink and pyflink versions that i > have are 1.14.4 on Jobmanager and task manager. > However, I still get this error. > > Thanks > Ramana > > > &g

AWS Kinesis Analytics and Pyflink OutputTag

2022-09-11 Thread Matt Fysh
Hi there, I am looking art using Flink to implement a fan-out of events, but discovered a couple of issues attempting to implement on AWS Kinesis Analytics: - the version of Flink is pinned to 1.13.2 - there is currently no version of Pyflink that supports OutputTag Does anyone know if it is

Re: Unable to run pyflink job - NetUtils getAvailablePort Error

2022-09-06 Thread Ramana
Hi Xingbo I have double checked on this, both the flink and pyflink versions that i have are 1.14.4 on Jobmanager and task manager. However, I still get this error. Thanks Ramana On Tue, Sep 6, 2022, 14:23 Xingbo Huang wrote: > Hi Raman, > > This problem comes from the inconsistenc

Re: Unable to run pyflink job - NetUtils getAvailablePort Error

2022-09-06 Thread Xingbo Huang
Hi Raman, This problem comes from the inconsistency between your flink version and pyflink version Best, Xingbo Ramana 于2022年9月6日周二 15:08写道: > Hello there, > > I have a pyflink setup of 1 : JobManager - 1 : Task Manager. > > Trying to run a pyflink job and no matter what

Unable to run pyflink job - NetUtils getAvailablePort Error

2022-09-06 Thread Ramana
Hello there, I have a pyflink setup of 1 : JobManager - 1 : Task Manager. Trying to run a pyflink job and no matter what i do, i get the following error message. - The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The

Re: Pyflink :: Conversion from DataStream to TableAPI

2022-08-17 Thread yu'an huang
Thank you for Dian's explaination. I thought pyflink suported non-keyed stream cause I saw "If key_by(...) is not called, your stream is not keyed." in the document lol. Sorry for the confusion to Ramana. On Thu, 18 Aug 2022 at 9:36 AM, Dian Fu wrote: > Hey Ramana, > >

Re: Pyflink :: Conversion from DataStream to TableAPI

2022-08-17 Thread Dian Fu
n_parallel() ``` Regards, Dian On Wed, Aug 17, 2022 at 11:13 AM Ramana wrote: > Hi Yuan - Thanks for your response. Wondering if the window api supports > non-keyed streams? > > On Wed, Aug 17, 2022, 06:43 yu'an huang wrote: > >> Hi, >> >> >> Pyflink

Re: Pyflink :: Conversion from DataStream to TableAPI

2022-08-16 Thread Ramana
Hi Yuan - Thanks for your response. Wondering if the window api supports non-keyed streams? On Wed, Aug 17, 2022, 06:43 yu'an huang wrote: > Hi, > > > Pyflink should support window api. You can read this document. > > https://nightlies.apache.org/flink/flink-docs-relea

Re: Pyflink :: Conversion from DataStream to TableAPI

2022-08-16 Thread yu'an huang
Hi, Pyflink should support window api. You can read this document. https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/datastream/operators/windows/ Hope this helps. Best, Yuan On Tue, 16 Aug 2022 at 3:11 PM, Ramana wrote: > Hi All - > > Trying to ac

Pyflink :: Conversion from DataStream to TableAPI

2022-08-16 Thread Ramana
Hi All - Trying to achieve the following - 1. Ingest the data from RMQ 2. Decompress the data read from RMQ 3. Window it for 5 mins and process the data 4. Sink the processed data. Was able to achieve step1 and step 2, however realized that Pyflink *DataStream *doesn't have window su

  1   2   3   4   5   6   7   >