Hello
I've been trying to setup StateTtl for my PyFlink application using thread
mode. I was following the examples in the docs (needed to use Time instead
of Duration) --
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#state-in-the-scala-datas
, 2025 at 11:26 PM
> To: user@flink.apache.org
> Subject: Using data classes in pyflink
>
> Hi,
>
>
>
> I'm trying to understand how to use python classes in flink DataStream
> pipelines. I'm using python 3.11 and flink 1.19.
>
> I've tried runni
ect: Using data classes in pyflink
Hi,
I'm trying to understand how to use python classes in flink DataStream
pipelines. I'm using python 3.11 and flink 1.19.
I've tried running a few simple programs and require some guidance.
Here's the first example:
from dataclasses import
is that possible, you will
need to investigate.
Nix.
From: Oleksii Sh
Date: Friday, January 10, 2025 at 11:26 PM
To: user@flink.apache.org
Subject: Using data classes in pyflink
Hi,
I'm trying to understand how to use python classes in flink DataStream
pipelines. I'm using pytho
Hi,
I'm trying to understand how to use python classes in flink DataStream
pipelines. I'm using python 3.11 and flink 1.19.
I've tried running a few simple programs and require some guidance.
Here's the first example:
from dataclasses import dataclass
from pyflink.common import Configuration
Hello,
I am trying to configure my Flink (1.18.1) jobs to store checkpoints on s3 but
I am getting the below error.
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create
checkpoint storage at checkpoint coordinator side.
...
Caused by: org.apache.flink.core.fs.UnsupportedFile
Hello,
I want to read data from Kafka topics which have Confluent-encoded Protobuf
messages (not plain Protobuf, Confluent adds a "magic byte" and schema ID)
with Python Datastream API. I have found that Confluent has a Java class[1]
which implements org.apache.kafka.common.serialization.Deseriali
Hi,
I'm trying to send data to a Kafka topic using PyFlink (DataStream API),
while setting a key on the Kakfa record. The key is a simple string, the
value is a JSON string. What I have so far basically works, except the
whole record is sent as both the key and the value. How do I specify t
0-amzn-0.jar:1.18.0-amzn-0]
> at
> org.apache.flink.client.python.PythonEnvUtils.addToPythonPath(PythonEnvUtils.java:291)
> ~[?:?]
> at
> org.apache.flink.client.python.PythonEnvUtils.preparePythonEnvironment(PythonEnvUtils.java:226)
> ~[?:?]
> at
> org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient(PythonEnvUtils.java:487)
> ~[?:?]
> at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:92)
> ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) ~[?:?]
> at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:?]
> at
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:?]
> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
> ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
> ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0]
>
> ```
>
> I tested the Flink job locally and it perfectly worker. I even tried to
> translate it in Java and run using the AWS EMR_on_EKS_Flin container and it
> also workerd. So I do not understand why it does not work in PyFlink.
> Moreover, this error is related to the file system, but I never make any
> call to it. And even if I did, why does it work in Java and not in Python ?
>
>
> Best regards,
>
> KY Alexandre
>
0-amzn-0]
```
I tested the Flink job locally and it perfectly worker. I even tried to
translate it in Java and run using the AWS EMR_on_EKS_Flin container and it
also workerd. So I do not understand why it does not work in PyFlink. Moreover,
this error is related to the file system, but
Hello Rob.
This workaround works indeed !
Cdt.
[Logo Orange]<http://www.orange.com/>
Gwenael Le Barzic
De : Robert Young
Envoyé : mardi 18 juin 2024 03:54
À : LE BARZIC Gwenael DTSI/SI
Cc : user@flink.apache.org
Objet : Re: Problem reading a CSV file with pyflink datastream in k8
70%2085%2075>
> gwenael.lebar...@orange.com
>
>
>
> Nouveau lien vers le Portail de suivi des Tickets du CXP
> <https://portail.agir.orange.com/servicedesk/customer/portal/35>
>
>
>
>
> Orange Restricted
> De : LE BARZIC Gwenael DTSI/SI
> *Envoyé :* v
range.com/servicedesk/customer/portal/35>
Orange Restricted
De : LE BARZIC Gwenael DTSI/SI
Envoyé : vendredi 14 juin 2024 22:02
À : user@flink.apache.org
Objet : Problem reading a CSV file with pyflink datastream in k8s with Flink
operator
Hello everyone.
I get the following error when t
Hello everyone.
I get the following error when trying to read a CSV file with pyflink
datastream in a k8s environment using the flink operator.
###
File "/opt/myworkdir/myscript.py", line 30, in
run_flink_job(myfile)
File "/opt/myworkdir/myscript.py", line
Hey,
I ran into some issues with PyFlink on Kubernetes myself a while ago.
Blogged about it here, perhaps it's useful:
https://www.decodable.co/blog/getting-started-with-pyflink-on-kubernetes
Best,
--Gunnar
Am Fr., 14. Juni 2024 um 20:58 Uhr schrieb Mate Czagany :
> Hi,
>
>
To be sure about that, I can see this in the doc :
# install PyFlink
COPY apache-flink*.tar.gz /
RUN pip3 install /apache-flink-libraries*.tar.gz && pip3 install
/apache-flink*.tar.gz
Is the result the same than this command below :
RUN pip install --no-cache-dir -r requirements.t
stomer/portal/35>
De : Mate Czagany
Envoyé : vendredi 14 juin 2024 18:30
À : LE BARZIC Gwenael DTSI/SI
Cc : user@flink.apache.org
Objet : Re: Which base image to use for pyflink on k8s with flink operator ?
CAUTION : This email originated outside the company. Do not click on any links
or open at
ll Flink
> libraries if I am not mistaken.
>
> Regards,
> Mate
>
> ezt írta (időpont: 2024. jún. 14., P,
> 17:22):
>
>> Hello everyone.
>>
>>
>>
>> I contact you because I’m encountereing some strange difficulties with
>> pyflink on Kuberne
2024. jún. 14., P, 17:22):
> Hello everyone.
>
>
>
> I contact you because I’m encountereing some strange difficulties with
> pyflink on Kubernetes using the flink operator.
>
> So, first thing first, I was wondering which base image should I use for
> my python ima
Hello everyone.
I contact you because I'm encountereing some strange difficulties with pyflink
on Kubernetes using the flink operator.
So, first thing first, I was wondering which base image should I use for my
python image that I will then deploy on my Kubernetes cluster ?
Can I use
filesystem connector as a temporary table to work around?
--
Best!
Xuyang
At 2024-06-07 03:26:27, "Phil Stavridis" wrote:
Hello,
I am trying to create an in-memory table in PyFlink to use as a staging table
after ingesting some data from Kafka but it doesn’t work as expected
Hello,
I am trying to create an in-memory table in PyFlink to use as a staging table
after ingesting some data from Kafka but it doesn’t work as expected.
I have used the print connector which prints the results but I need to use a
similar connector that stores staging results. I have tried
@flink.apache.org
Тема: Re: SSL Kafka PyFlink
Hi Phil,
The kafka configuration keys of ssl maybe not correct. You can refer the kafka
document[1] to get the ssl configurations of client.
[1] https://kafka.apache.org/documentation/#security_configclients
Best,
Zhongqiang Gong
Phil Stavridis mailto:phi
I have a PyFlink job that needs to read from a Kafka topic and the
> communication with the Kafka broker requires SSL.
> I have connected to the Kafka cluster with something like this using just
> Python.
>
> from confluent_kafka import Consumer, KafkaException, KafkaErro
Hi,
I have a PyFlink job that needs to read from a Kafka topic and the
communication with the Kafka broker requires SSL.
I have connected to the Kafka cluster with something like this using just
Python.
from confluent_kafka import Consumer, KafkaException, KafkaError
def get_config
Hi,
I am currently evaluating PyFlink in comparison to Java and did some various
tests, mainly comparing identical pipelines with focus on throughput.
For me it seems, that PyFlink is generally worse for wear and seems to reach
its limits in throughput at a point where Java still has resources
Hi Péter,
Thanks for pointing this out!
I was aware of the difference in version between pyflink and some of the JAR
dependencies. I was starting out with PyFlink 1.16 and I had some errors when
creating the Dockerfile that seemed to be fixed when upgrading the version to
1.18. Thus the
Is it intentional, that you are using iceberg-flink-runtime-1.16-1.3.1.jar
with 1.18.0 PyFlink? This might cause issues later. I would try to
synchronize the Flink versions throughout all the dependencies.
On Tue, Apr 16, 2024, 11:23 Robert Prat wrote:
> I finally managed to make it w
On Mon, Apr 15, 2024 at 16:17 Niklas Wilcke wrote:
> Hi Flink Community,
> u
> I wanted to reach out to you to get some input about Pyflink performance.
> Are there any resources available about Pyflink benchmarks and maybe a
> comparison with the Java API? I wasn't ab
tils
python3-pip
python3-requests
python3-software-properties
python-is-python3
I'm not sure I need all the Jars in the dockerfile but as they say if it ain't
broke, don't fix it.
From: Robert Prat
Sent: Friday, April 12, 2024 3:45 PM
To: user@flink
ng. You might be interested in
> this
> blog:https://flink.apache.org/2022/05/06/exploring-the-thread-mode-in-pyflink/,
> which did a benchmark on the latter with the common the JSON processing
> scenario with UDFs in Java/Python under thread mode/Python under process mode.
>
>
When it comes down to the actual runtime, what really matters is the plan
optimization and the operator impl & shuffling. You might be interested in this
blog:
https://flink.apache.org/2022/05/06/exploring-the-thread-mode-in-pyflink/,
which did a benchmark on the latter with the common
Hi Flink Community,
I wanted to reach out to you to get some input about Pyflink performance. Are
there any resources available about Pyflink benchmarks and maybe a comparison
with the Java API? I wasn't able to find something valuable, but maybe I missed
something?
I am aware
Hi there,
For several days I have been trying to find the right configuration for my
pipeline which roughly consists in the following schema
RabbitMQ->PyFlink->Nessie/Iceberg/S3.
For what I am going to explain I have tried both locally and through the
official Flink docker images.
Hi Thierry,
Your case is not very complex and I believe all programming language(e.g.
Java, Python, SQL) interfaces of flink can do that.
When using pyflink, you can use pyflink datastream/table/SQL API.
Here are some examples of using pyflink table api:
https://nightlies.apache.org/flink/flink
Hi,
i have 2 streams as sean in this example (6> {"tripId": "275118740",
"timestamp": "2024-04-02T06:20:00Z", "stopSequence": 13, "stopId": "61261",
"bearing": 0.0, "speed": 0.0, "vehicleId": "39006", "routeId": "747"}
1> {"visibility": 1, "weather_conditions": "clear sky", "timestamp":
"20
Dear readers,
I'm running into some unexpected behaviour in PyFlink when switching
execution mode from process to thread. In thread mode, my `Row` gets
converted to a tuple whenever I use a UDF in a map operation. By this
conversion to tuples, we lose critical information such as column
Thanks! We’ve created and issue for that:
https://issues.apache.org/jira/browse/FLINK-34625
Yeap, planning to use timers as workaround for now
On Mar 10, 2024 at 02:59 +0400, David Anderson , wrote:
> My guess is that this only fails when pyflink is used with the heap state
> backend, in
My guess is that this only fails when pyflink is used with the heap state
backend, in which case one possible workaround is to use the RocksDB state
backend instead. Another workaround would be to rely on timers in the
process function, and clear the state yourself.
David
On Fri, Mar 8, 2024 at
in Java code, it prints `State: Null`, `State: Null`, as I was
> expecting in, unlike pyflink code
> On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , wrote:
> > Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to
> > work. I have reproduced the exact sam
Note, that in Java code, it prints `State: Null`, `State: Null`, as I was
expecting in, unlike pyflink code
On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , wrote:
> Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to
> work. I have reproduced the exact same code in Ja
Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to
work. I have reproduced the exact same code in Java and it works!
Is this a pyflink bug? If so - how can I report it? If not - what can I try to
do?
Flink: 1.18.0
image: flink:1.18.0-scala_2.12-java11
Code to reproduce
Hi David,
Thanks for the confirmation. Let's fix the docs:
https://github.com/apache/flink/pull/23776
Thanks,
Alex
On Sun, 19 Nov 2023 at 01:55, David Anderson wrote:
> Hi, Alex!
>
> Yes, in PyFlink the various flatmap and process functions are implemented
> as generator f
Hi, Alex!
Yes, in PyFlink the various flatmap and process functions are implemented
as generator functions, so they use yield to emit results.
David
On Tue, Nov 7, 2023 at 1:16 PM Alexander Fedulov <
alexander.fedu...@gmail.com> wrote:
> Java ProcessFunction API defines a clear way t
Java ProcessFunction API defines a clear way to collect data via the
Collector object.
PyFlink documentation also refers to the Collector [1] , but it is not
being passed to the function and is also nowhere to be found in the pyflink
source code.
How can multiple elements be collected? Is "
Hi,
I'm trying to understand where the Apache Beam dependency comes from; it's
not just a regular dependency of PyFlink, but a build system dependency.
Searching through the code, it seems like Beam is only used by PyFlink, and
not by non-Python Flink. In my (limited) understanding, it
#x27;> type elements.
>
> Wanted to check if anyone else faced the same issue while trying to use
> MapState in PyFlink with complex types.
>
> Here is the code:
>
> from pyflink.common import Time
> from pyflink.common.typeinfo import Types
> from pyflink.datastream im
cases for
Pyflink API.
Therefore, I tried my way of Standard Python Unittest library Mockito. But,
I am facing some JAR issues.
This is what I am talking about
https://stackoverflow.com/questions/77234674/pyflink-unit-tests-unable-to-find-the-jar-file-passed-within-the-tests
Also, can somebody
Hi Flinkers,
I'm trying to use MapState, where the value will be a list of type elements.
Wanted to check if anyone else faced the same issue while trying to use
MapState in PyFlink with complex types.
Here is the code:
from pyflink.common import Time
from pyflink.common.typeinfo import
f
>> Kafka and Flink's low level APIs like map and process functions and since I
>> am entirely new to these stuffs, I couldn't get the exact approach/way to
>> test Kafka connector and low level APIs.
>>
>> So can anyone share any working links/references to g
new to these stuffs, I couldn't get the exact approach/way to
> test Kafka connector and low level APIs.
>
> So can anyone share any working links/references to get started with
> atleast. We would be working with Pyflink for now since the use case is
> very limited.
>
> I would really appreciate your time.
>
> J.
>
share any working links/references to get started with
atleast. We would be working with Pyflink for now since the use case is
very limited.
I would really appreciate your time.
J.
hi flink community~
I came across a problem I didn't understand,I can't use pyflink
aggfuction function properly in window tvf, The following are available:
java aggfuntion
flink system aggfunction
window (not window tvf)
I want to know if this is a bug or if I'm using i
Hey folks,
I'm looking for an example for creating a custom source in PyFlink. The one
that I found in the tests is a wrapper around a java class:
def test_add_custom_source(self):
custom_source = SourceFunction(
"org.apache.flink.python.util.MyCustomSourceFunction")
ds = self
lso decided to
> scrap pyflink and go for the sql-client instead to keep things simpler for
> now.
>
> This is the Dockerfile I am using for both the *jobmanager* and the
> *sql-client*
>
> FROM flink:1.16.2-scala_2.12-java11
>
> RUN APACHE_HADOOP_URL=https://archive.apac
Thanks for the tips Martijn!
I've fixed the library versions to 1.16 everywhere and also decided to
scrap pyflink and go for the sql-client instead to keep things simpler for
now.
This is the Dockerfile I am using for both the *jobmanager* and the
*sql-client*
FROM flink:1.16.2-scala
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/
Best regards,
Martijn
On Sat, Jun 24, 2023 at 1:22 PM Dániel Pálma wrote:
> Hey folks,
>
> Nice to meet ya'll!
>
> I'm trying to get the following stack up and running locally:
>
&g
Hey folks,
Nice to meet ya'll!
I'm trying to get the following stack up and running locally:
- Kafka as source
- pyFlink SQL
- Iceberg on top of MinIO
The goal is to have a pyflink script that reads data from a Kafka topic,
does some transformations, and dumps it into an iceberg tabl
FlinkKafkaConsumer(
topics=topic, # Kafka topic
deserialization_schema=deserialization_schema,
properties=consumer_props, # Consumer properties
)
kafka_consumer.set_start_from_earliest()
# Add the Kafka consumer as a source to the execution environment
stream=env.add_source(kafka_consumer)
# Defin
Consumer properties
)
kafka_consumer.set_start_from_earliest()
# Add the Kafka consumer as a source to the execution environment
stream = env.add_source(kafka_consumer)
# Define your data processing logic here
# For example, you can print the stream to the console
stream.print()
# Execute
Hi Nawaz,
>> My concern is, as Flink does not support dynamic windows, is this
approach going against Flink Architecture.
Per my understanding, the session window could be seen as a kind of dynamic
window. Besides, Flink also supports user-defined window with which users
should also be able to imp
Hey, I’ve been trying to emulate the behavior of a dynamic window, as Flink
does not support dynamic window sizes. My operator inherits from
KeyedProcessFunction, and I’m only using KeyedStates to manipulate the
window_size. I’m clearing the KeyedStates when my bucket(window) is
complete, to reset
Hi Jill,
I suspect that the PyFlink isn't installed in the Python environment which
is used to run the example. Could you share the complete command you used
to execute the example: `./flink-1.17.0/bin/flink run -pyclientexec
venv/bin/python3 --python flink-1.17.0/examples/python/
datas
d_count.py",
line 89, in word_count
ds = ds.flat_map(split) \
File
"/Users/jill/flink-1.17.0/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
line 354, in flat_map
File
"/Users/jill/flink-1.17.0/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
Thank you!
On Mon, Feb 13, 2023 at 5:55 AM Dian Fu wrote:
> Thanks Andrew, I think this is a valid advice. I will update the
> documentation~
>
> Regards,
> Dian
>
> ,
>
> On Fri, Feb 10, 2023 at 10:08 PM Andrew Otto wrote:
>
>> Question about side outputs
Thanks Andrew, I think this is a valid advice. I will update the
documentation~
Regards,
Dian
,
On Fri, Feb 10, 2023 at 10:08 PM Andrew Otto wrote:
> Question about side outputs and OutputTags in pyflink. The docs
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/d
Question about side outputs and OutputTags in pyflink. The docs
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/>
say we are supposed to
yield output_tag, value
Docs then say:
> For retrieving the side output stream you use getSideOutput(OutputTa
TaskManagers to run
> pyflink without having pyflink installed themselves? Somehow I'd guess
> this wouldn't work tho; I'd assume TaskManagers would also need some python
> transitive dependencies, e.g. google protobuf.
>
> It has some historical reasons. In the f
>> What is the reason for including
opt/python/{pyflink.zip,cloudpickle.zip,py4j.zip} in the base distribution
then? Oh, a guess: to make it easier for TaskManagers to run
pyflink without having pyflink installed themselves? Somehow I'd guess
this wouldn't work tho; I'd assu
Thanks Dian!
> >> Is using pyflink from the flink distribution tarball (without pip) not
a supported way to use pyflink?
> You are right.
What is the reason for including
opt/python/{pyflink.zip,cloudpickle.zip,py4j.zip} in the base distribution
then? Oh, a guess: to make i
at e.g.
/usr/local/lib/python3.7/dist-packages/pyflink/lib!
So, by following those instructions, flink is effectively installed twice
into the docker image.
Yes, your understanding is correct. The base image `flink:1.15.2` doesn't
include PyFlink and so you need to build a custom image i
Ah, oops and my original email had a typo:
> Some python dependencies are not included in the flink distribution
tarballs: cloudpickle, py4j and pyflink are in opt/python.
Should read:
> Some python dependencies ARE included in the flink distribution tarballs:
cloudpickle, py4j and pyflink
Foundation
On Tue, Jan 24, 2023 at 4:26 PM Andrew Otto wrote:
> Hello,
>
> I'm having quite a bit of trouble running pyflink from the default flink
> distribution tarballs. I'd expect the python examples to work as long as
> python is installed, and we've got
авлено: 25 января 2023 г. 21:03:40
Кому: Evgeniy Lyutikov
Копия: user@flink.apache.org
Тема: Re: PyFlink job in kubernetes operator
Did you check the Python example?
https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-python-example<https://eur04.safelinks.protection.ou
Did you check the Python example?
https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-python-example
Gyula
On Wed, Jan 25, 2023 at 2:54 PM Evgeniy Lyutikov
wrote:
> Hello
>
> Is there a way to run PyFlink jobs in k8s with flink kubernetes operator?
> And i
Hello
Is there a way to run PyFlink jobs in k8s with flink kubernetes operator?
And if not, is it planned to add such functionality?
"This message contains confidential information/commercial secret. If you are
not the intended addressee of this message yo
Hello,
I'm having quite a bit of trouble running pyflink from the default flink
distribution tarballs. I'd expect the python examples to work as long as
python is installed, and we've got the distribution. Some python
dependencies are not included in the flink distribution tarbal
Hello,
I am an user of pyflink. The pseudo code of my pipeline is as follows:
```
if not map_state.contains(data[“key”]):
processing(data)
map_state.put(data[“key”], 0)
```
However, it raises a KeyError:
File
"/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_im
Hey Ramana,
I can't help you with that particular script, but perhaps this commit
is helpful to you which I did for upgrading the PyFlink Playground
project to 1.16:
https://github.com/apache/flink-playgrounds/commit/4aa9a341bbf49e51809bc9cfcf0e946b2accd8ac
In particular see the chang
All - I am trying to upgrade my pyflink installation from 1.15.2 to 1.16.0.
Could someone tell me if that's possible via the
'setup-pyflink-virtual-env.sh 1.16.0' command? I don't want to overwrite
any of my configuration files, so what's the clean way of upgrading to
1.16
instructions in
the documentation.
Best regards,
Martijn
On Wed, Nov 23, 2022 at 1:53 PM Mujahid Niaz
wrote:
> Hi team,
> Hope Everyone is doing good,
>
> We have an issue regarding writing checkpoints metadata to S3 using
> pyflink datastream api. we are using Apache-Flink==1.1
Hi team,
Hope Everyone is doing good,
We have an issue regarding writing checkpoints metadata to S3 using pyflink
datastream api. we are using Apache-Flink==1.16.0. We are able to sink our
Stream into s3 but when it comes to writing checkpoint data. we are getting
the following error.
We tried a
Hi all,
I'm trying to follow the code in 1.16 SNAPSHOT to have a Kinesis sink in
PyFlink 1.15, to write the output of a KeyedCoProcessFunction to Kinesis.
1. If I use ".set_serialization_schema(SimpleStringSchema())", then I got
the error message:
java.lang.ClassCastException: cla
Great, thanks!
Kind regards,
Levan Huyen
On Fri, 21 Oct 2022 at 00:53, Biao Geng wrote:
> You are right.
> It contains the python package `pyflink` and some dependencies like py4j
> and cloudpickle but does not contain all relevant dependencies(e.g.
> `google.protobuf` as the err
You are right.
It contains the python package `pyflink` and some dependencies like py4j
and cloudpickle but does not contain all relevant dependencies(e.g.
`google.protobuf` as the error log shows, which I also reproduce in my own
machine).
Best,
Biao Geng
Levan Huyen 于2022年10月20日周四 19:53写道
Thanks Biao.
May I ask one more question: does the binary package on Apache site (e.g:
https://archive.apache.org/dist/flink/flink-1.15.2) contain the python
package `pyflink` and its dependencies? I guess the answer is no.
Thanks and regards,
Levan Huyen
On Thu, 20 Oct 2022 at 18:13, Biao Geng
Hi Levan,
Great to hear that your issue is resolved!
For the follow-up question, I am not quite familiar with AWS EMR's
configuration for flink but due to the error you attached, it looks like
that pyflink may not ship some 'Google' dependencies in the Flink binary
zip file and
, the packages are in the current user's location (`~/.local/...)
which Flink did not look at.
BTW, is there any way to use the pyflink shipped with the Flink binary zip
file that I downloaded from Apache's site? On EMR, such package is
included, and I feel it's awkward to have to
not launch
the flink cluster. You can do `start-cluster.sh` first to launch a
standalone flink cluster and then try the `flink run ...` command.
For your setup4, the reason why it works well is that it will use the
default mini cluster to run the pyflink job. So even you haven't started a
s
Hi,
I'm new to PyFlink, and I couldn't run a basic example that shipped with
Flink.
This is the command I tried:
./bin/flink run -py examples/python/datastream/word_count.py
Here below are the results I got with different setups:
1. On AWS EMR 6.8.0 (Flink 1.15.1):
*Error: No mo
/dd1fddb13b2d08ade580e5b3ec6b8e910974308d
On Wed, Sep 7, 2022 at 1:37 PM Ramana wrote:
> Hi Xingbo
>
> I have double checked on this, both the flink and pyflink versions that i
> have are 1.14.4 on Jobmanager and task manager.
> However, I still get this error.
>
> Thanks
> Ramana
>
>
>
&g
Hi there,
I am looking art using Flink to implement a fan-out of events, but
discovered a couple of issues attempting to implement on AWS Kinesis
Analytics:
- the version of Flink is pinned to 1.13.2
- there is currently no version of Pyflink that supports OutputTag
Does anyone know if it is
Hi Xingbo
I have double checked on this, both the flink and pyflink versions that i
have are 1.14.4 on Jobmanager and task manager.
However, I still get this error.
Thanks
Ramana
On Tue, Sep 6, 2022, 14:23 Xingbo Huang wrote:
> Hi Raman,
>
> This problem comes from the inconsistenc
Hi Raman,
This problem comes from the inconsistency between your flink version and
pyflink version
Best,
Xingbo
Ramana 于2022年9月6日周二 15:08写道:
> Hello there,
>
> I have a pyflink setup of 1 : JobManager - 1 : Task Manager.
>
> Trying to run a pyflink job and no matter what
Hello there,
I have a pyflink setup of 1 : JobManager - 1 : Task Manager.
Trying to run a pyflink job and no matter what i do, i get the following
error message.
-
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The
Thank you for Dian's explaination. I thought pyflink suported non-keyed
stream cause I saw
"If key_by(...) is not called, your stream is not keyed." in the document
lol. Sorry for the confusion to Ramana.
On Thu, 18 Aug 2022 at 9:36 AM, Dian Fu wrote:
> Hey Ramana,
>
>
n_parallel()
```
Regards,
Dian
On Wed, Aug 17, 2022 at 11:13 AM Ramana wrote:
> Hi Yuan - Thanks for your response. Wondering if the window api supports
> non-keyed streams?
>
> On Wed, Aug 17, 2022, 06:43 yu'an huang wrote:
>
>> Hi,
>>
>>
>> Pyflink
Hi Yuan - Thanks for your response. Wondering if the window api supports
non-keyed streams?
On Wed, Aug 17, 2022, 06:43 yu'an huang wrote:
> Hi,
>
>
> Pyflink should support window api. You can read this document.
>
> https://nightlies.apache.org/flink/flink-docs-relea
Hi,
Pyflink should support window api. You can read this document.
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/datastream/operators/windows/
Hope this helps.
Best,
Yuan
On Tue, 16 Aug 2022 at 3:11 PM, Ramana wrote:
> Hi All -
>
> Trying to ac
Hi All -
Trying to achieve the following -
1. Ingest the data from RMQ
2. Decompress the data read from RMQ
3. Window it for 5 mins and process the data
4. Sink the processed data.
Was able to achieve step1 and step 2, however realized that Pyflink *DataStream
*doesn't have window su
1 - 100 of 671 matches
Mail list logo