Re: [PyFlink] Issue Deploying FlinkSessionJob with PVC and Python Script Access

2025-07-07 Thread Dian Fu
Hi Pachara, Could you check if this is a similar issue of https://issues.apache.org/jira/browse/FLINK-37266. If so, you could try to replace `-pyclientexec` with configuration option `python.client.executable` to see if it works. Regards, Dian On Mon, Jul 7, 2025 at 3:39 PM Nikola Milutinovic w

Re: PyFink Application Mode Dependency Resolution

2025-04-08 Thread Dian Fu
hope this information helps clarify the situation. Looking forward to your > advice. > Best regards, > Shiquan > > > From: Dian Fu > Sent: Wednesday, April 9, 2025 1:31 AM > To: Joska H > Cc: user@flink.apache.org > Subject: Re: PyFink Application Mode Depend

Re: PyFink Application Mode Dependency Resolution

2025-04-08 Thread Dian Fu
Have you installed PyFlink in the used Python environment? Regards, Dian On Tue, Apr 1, 2025 at 2:43 PM Joska H wrote: > > When submitting PyFink jobs via Application Mode: > > ./bin/flink run-application -t yarn-application \ > -Djobmanager.memory.process.size=1024m \ > -Dtaskmanage

Re: Using data classes in pyflink

2025-01-16 Thread Dian Fu
Re problem 1: Could you share an example which could be run by others? It depends on `dataclass` which seems like a private class. Re problem 2: Seems like a classloader issue. Could you share how to reproduce it? Since you could run the first example in thread mode, it seems only happening in ce

Re: python udf with flinksql

2023-05-21 Thread Dian Fu
Hi Tom, The following statement is incorrect. ``` CREATE FUNCTION add AS 'custom_udf_2.MyUDF' LANGUAGE PYTHON; ``` You should define it as following: custom_udf_2.py ``` from pyflink.table.udf import ScalarFunction, udf from pyflink.table import DataTypes class MyUDF(ScalarFunction): def __in

Re: Dynamin Windowing in with Pyflink

2023-05-17 Thread Dian Fu
Hi Nawaz, >> My concern is, as Flink does not support dynamic windows, is this approach going against Flink Architecture. Per my understanding, the session window could be seen as a kind of dynamic window. Besides, Flink also supports user-defined window with which users should also be able to imp

Re: Issues using PyFlink

2023-05-14 Thread Dian Fu
Hi Jill, I suspect that the PyFlink isn't installed in the Python environment which is used to run the example. Could you share the complete command you used to execute the example: `./flink-1.17.0/bin/flink run -pyclientexec venv/bin/python3 --python flink-1.17.0/examples/python/ datastream/word_

Re: Python Datastream: CountTumblingWindowAssigner never purges?

2023-04-26 Thread Dian Fu
Filed ticket https://issues.apache.org/jira/browse/FLINK-31949 to track this issue. On Thu, Apr 27, 2023 at 11:14 AM Dian Fu wrote: > Hi Urs, > > I guess you are right. This seems like a bug which should be addressed. > > Regards, > Dian > > On Mon, Apr 24, 2023 at 5

Re: Python Datastream: CountTumblingWindowAssigner never purges?

2023-04-26 Thread Dian Fu
Hi Urs, I guess you are right. This seems like a bug which should be addressed. Regards, Dian On Mon, Apr 24, 2023 at 5:07 AM Urs Schönenberger < urs.schoenenber...@tngtech.com> wrote: > Hi all, > > In FLINK-26444, a couple of convenience window assigners were added to > the Python Datastream A

Re: Pyflink Side Output Question and/or suggested documentation change

2023-02-13 Thread Dian Fu
Thanks Andrew, I think this is a valid advice. I will update the documentation~ Regards, Dian , On Fri, Feb 10, 2023 at 10:08 PM Andrew Otto wrote: > Question about side outputs and OutputTags in pyflink. The docs >

Re: Using pyflink from flink distribution

2023-01-30 Thread Dian Fu
t; /usr/local/lib/python3.7/dist-packages/pyflink/lib manually after `pip > install apache-flink` > > Since we're building our own Docker image, I'm going the other way around: > just install pyflink, and symlink /opt/flink -> > /usr/lib/python3.7/dist-packages/pyflink. S

Re: Using pyflink from flink distribution

2023-01-28 Thread Dian Fu
Hi Andrew, >> By pip installing apache-flink, this docker image will have the flink distro installed at /opt/flink and FLINK_HOME set to /opt/flink . BUT ALSO flink lib jars will be installed at e.g. /usr/

Re: Unable to run pyflink job - NetUtils getAvailablePort Error

2022-09-20 Thread Dian Fu
Hi Ramana, The method appearing in the exception message was updated in Flink 1.15, see [1] for more details. So I believe there must be jars of version 1.15 in your environment and could you double check that? Regards, Dian [1] https://github.com/apache/flink/commit/dd1fddb13b2d08ade580e5b3ec6b

Re: Pyflink :: Conversion from DataStream to TableAPI

2022-08-17 Thread Dian Fu
Hey Ramana, Non-keyed window will be supported in the coming Flink 1.16. See https://issues.apache.org/jira/browse/FLINK-26480 for more details. In releases prior to 1.16, you could work around it as following: ``` data_stream = xxx data_stream.key_by(lambda x: 'key').xxx().force_non_parallel() `

Re: PyFlink SQL: force maximum use of slots

2022-07-24 Thread Dian Fu
k ensure > consistency of results irrespective of the parallelism used, or does it > just copy all events to all slots, in which case I don't understand how > parallelism assists? > > Many thanks, > > John > > -- > *From:* Dian Fu >

Re: PyFlink SQL: force maximum use of slots

2022-07-19 Thread Dian Fu
Hi John, All the operators in the same slot sharing group will be put in one slot. The slot sharing group is only configurable in DataStream API [1]. Usually you don't need to set the slot sharing group explicitly [2] and this is good to share the resource between the operators running in the same

Re: PyFlink and parallelism

2022-07-17 Thread Dian Fu
Hi John, You could try to use `filtered_stream._j_data_stream.getTransformation().setParallelism(int(table_env.get_config().get('my.custom.parallelism', 1)))`. Usually you don't need to do this, however, when converting Table to DataStream, it returns a Java DataStream object which has no setPara

Re: PyFlink: restoring from savepoint

2022-07-07 Thread Dian Fu
Hi John, Could you provide more information, e.g. the exact command submitting the job, the logs file, the PyFlink version, etc? Regards, Dian On Thu, Jul 7, 2022 at 7:53 PM John Tipper wrote: > Hi all, > > I have a PyFlink job running in Kubernetes. Savepoints and checkpoints are > being suc

Re: [CEP] State compatibility when a pattern is modified

2022-07-04 Thread Dian Fu
Hi Nicolas, The state isn't compatible, besides, as the partial matches will also not be dropped and so the behavior is undefined. The original events will be dropped after being evaluated and so when the pattern changes, there is no way to evaluate them against the new pattern. Regards, Dian On

Re: How to convert Table containing TIMESTAMP_LTZ into DataStream in PyFlink 1.15.0?

2022-06-26 Thread Dian Fu
Hi John, This seems like a bug and I have created a ticket https://issues.apache.org/jira/browse/FLINK-28253 to track it. For now, you could try replacing to_data_stream with to_append_stream` to see if it works. Regards, Dian On Sat, Jun 25, 2022 at 4:07 AM John Tipper wrote: > Hi, > > I hav

Re: How to use connectors in PyFlink 1.15.0 when not defined in Python API?

2022-06-26 Thread Dian Fu
Hi John, Kinesis and most of the other connectors will be supported in 1.16, see [1] for more details about kinesis. For versions prior to 1.16, you could try just as Andrew suggested or refer to the implementations which are already available in the master as examples. Regards, Dian [1] https:

Re: How to use RockDb in pyflink(version 1.14.4)

2022-06-22 Thread Dian Fu
Hi Harshit, Yes, this should be supported. Have you tried the following doc [1] for Table API and [2] for DataStream API? Regards, Dian [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/table/table_environment/#statebackend-checkpoint-and-restart-strategy [2] https:

Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-17 Thread Dian Fu
a way to manually bypass that issue? > > J > > Sent from my iPhone > > On 17 Jun 2022, at 04:59, Dian Fu wrote: > >  > >> This error generally occurs in jobs where there are transfers between > Table and datastream. > AFAIK, this issue should have already bee

Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-16 Thread Dian Fu
>> This error generally occurs in jobs where there are transfers between Table and datastream. AFAIK, this issue should have already been fixed, see https://issues.apache.org/jira/browse/FLINK-26920 and https://issues.apache.org/jira/browse/FLINK-23133 for more details. Regards, Dian On Fri, Jun

Re: Exception: class java.sql.Timestamp cannot be cast to class java.time.LocalDateTime | pyflink 1.15.0

2022-06-13 Thread Dian Fu
Hi Mark, Could you share an example which could reproduce this issue? Regards, Dian On Thu, Jun 9, 2022 at 9:22 PM Márk Bartos wrote: > Hi, > > I'd like to ask for help regarding the java exception: > Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot > be cast to class j

Re: How to implement unnest() as udtf using Python?

2022-06-13 Thread Dian Fu
I second Martijn, UNNEST should be supported. Besides, regarding the above exception, could you share an example which could reproduce this issue? Regards, Dian On Mon, Jun 13, 2022 at 8:21 PM Martijn Visser wrote: > Hi John, > > You're mentioning that Flink doesn't support UNNEST, but it does

Re: custom table source, how to support json?

2022-06-13 Thread Dian Fu
Hi Ivan, Is your question how to parse the JSON string in PyFlink? If so, maybe you could take a look at this [1]. Regards, Dian [1] https://stackoverflow.com/questions/71820015/how-to-reference-nested-json-within-pyflink-sql-when-json-schema-varies On Fri, Jun 10, 2022 at 7:40 PM ivan.ros...@a

Re: How to handle deletion of items using PyFlink SQL?

2022-06-08 Thread Dian Fu
Hi John, If you are using Table API & SQL, the framework is handling the RowKind and it's transparent for you. So usually you don't need to handle RowKind in Table API & SQL. Regards, Dian On Thu, Jun 9, 2022 at 6:56 AM John Tipper wrote: > Hi Xuyang, > > Thank you very much, I’ll experiment t

Re: FW: Issue Facing While Using EmbeddedRocksDbCheckpointing FlinkVersion(1.15.0)

2022-06-05 Thread Dian Fu
None of the attachments are logs of the TaskManger. The TaskManger log should be located in the directory `E:\pythonProject16\lib\site-packages\pyflink\log`. On Fri, Jun 3, 2022 at 8:41 PM harshit.varsh...@iktara.ai < harshit.varsh...@iktara.ai> wrote: > > > > > *From:* Shuiqiang Chen [mailto:acq

Re: How to parse protobuf data from kafka by pyflink

2022-06-01 Thread Dian Fu
It still doesn't support custom Deserializer developed with Python language. However, you could create a Java Deserializer and use it in PyFlink. Regards, Dian On Thu, Jun 2, 2022 at 10:25 AM laizhic...@hongkingsystem.cn < laizhic...@hongkingsystem.cn> wrote: > > Hi Everyone: >I used protobu

Re: Converting from table to stream, following Avro schema

2022-05-13 Thread Dian Fu
ot;uploads". That is, the > following is wrong: > Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["uploads", > "likes"], [Types.STRING(), Types.INT()])]) > > After making these changes, things indeed work out well - I am able to >

Re: Converting from table to stream, following Avro schema

2022-05-10 Thread Dian Fu
Hi Dhavan, The type of the `ds` appearing in `t_env.from_data_stream(ds) should be known. Otherwise, it's impossible to infer the schema of the converted table, as a result, `raw` type will be used which makes the schema of the resulting table not expected. You could either declare the type of th

Re: FW: Rabbitmq Connection error with Flink version(1.15.0)

2022-05-09 Thread Dian Fu
Hi Harshit, You should use https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-rabbitmq/1.15.0/flink-sql-connector-rabbitmq-1.15.0.jar which is a fat jar containing all the dependencies. Regards, Dian On Mon, May 9, 2022 at 10:05 PM harshit.varsh...@iktara.ai < harshit.vars

Re: How should I call external HTTP services with PyFlink?

2022-05-05 Thread Dian Fu
lue)) > for result in results: > yield result > > > ds = ds.flat_map(MyFlatMapFunction()) > > > ds.print() > env.execute() > > On Thu, 5 May 2022 at 08:26, Dian Fu wrote: > >> Hi Dhavan, >> >> Asyncio operator is still not s

Re: Pyflink -> Redshift/S3/Firehose

2022-05-04 Thread Dian Fu
It uses connectors to send data to external storages. It should be noted that it shares the connector implementations between Java API and Python API and so if you could find a Java connector, usually it could be also be used in PyFlink. For firehose, it has provided a firehose sink connector in F

Re: How should I call external HTTP services with PyFlink?

2022-05-04 Thread Dian Fu
Hi Dhavan, Asyncio operator is still not supported in PyFlink. Regards, Dian On Tue, May 3, 2022 at 3:48 PM Dhavan Vaidya wrote: > Hey Francis! > > Thanks for the insights! I am thinking of using Java / Scala for this > scenario given your input. Introducing a new language to the team, however

Re: AvroRowDeserializationSchema

2022-04-28 Thread Dian Fu
from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for > Windows > > > > *From: *Dian Fu > *Sent: *Thursday, April 28, 2022 2:59 PM > *To: *lan tran > *Cc: *user@flink.apache.org > *Subject: *Re: AvroRowDeserializationSchema > > > > Yes, I t

Re: AvroRowDeserializationSchema

2022-04-28 Thread Dian Fu
t; > > > Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for > Windows > > > > *From: *Dian Fu > *Sent: *Tuesday, April 26, 2022 7:54 AM > *To: *lan tran > *Cc: *user@flink.apache.org > *Subject: *Re: AvroRowDeserializationSchema > >

Re: Kafka Sink Error in sending Row Type data in Flink-(version 1.14.4)

2022-04-27 Thread Dian Fu
Hi Harshit, I should have already replied to you in an earlier thread[1] for the same question. It seems that you have missed that. Please double check if that reply is helpful for you. Regards, Dian [1] https://lists.apache.org/thread/cm6r569spq67249dxw57q8lxh0mk3f7y On Wed, Apr 27, 2022 at 6

Re: Unit testing PyFlink SQL project

2022-04-25 Thread Dian Fu
Great to hear! Regards, Dian On Tue, Apr 26, 2022 at 4:11 AM John Tipper wrote: > Hi Dian, > > I've tried this and it works nicely, on both MacOS and Windows, thank you > very much indeed for your help. > > Kind regards, > > John > ------

Re: AvroRowDeserializationSchema

2022-04-25 Thread Dian Fu
ull flow. > > Best, > Quynh > > > > Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for > Windows > > > > *From: *Dian Fu > *Sent: *Monday, April 25, 2022 7:46 PM > *To: *lan tran > *Cc: *user@flink.apache.org > *Subject: *Re: Avr

Re: AvroRowDeserializationSchema

2022-04-25 Thread Dian Fu
eAPI (SQL API), the uid is generated automatically which means we > cannot revert if the system is crashed. > > Best, > Quynh > > > > Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for > Windows > > > > *From: *Dian Fu > *Sent: *Monda

Re: AvroRowDeserializationSchema

2022-04-24 Thread Dian Fu
ttps://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors On Mon, Apr 25, 2022 at 11:48 AM Dian Fu wrote: > Yes, we should support them. > > For now, if you want to use them, you could create ones in your own &

Re: AvroRowDeserializationSchema

2022-04-24 Thread Dian Fu
/?LinkId=550986> for > Windows > > > > *From: *Dian Fu > *Sent: *Friday, April 22, 2022 9:36 PM > *To: *lan tran > *Cc: *user@flink.apache.org > *Subject: *Re: AvroRowDeserializationSchema > > > > Hi Quynh, > > I have added an example on how to use A

Re: Unit testing PyFlink SQL project

2022-04-24 Thread Dian Fu
Regarding the problem `python setup.py install` vs `pip install apache-flink==1.14.4`, have created an issue https://issues.apache.org/jira/browse/FLINK-27373 to track it. On Mon, Apr 25, 2022 at 9:42 AM Dian Fu wrote: > Hi John, > > I'm also using MacOS. This is the steps I'

Re: Unit testing PyFlink SQL project

2022-04-24 Thread Dian Fu
ependencies could be > specified via command line argument '--jarfile' or the config option > 'pipeline.jars' > > > -- > > Ran 1 test in 0.401s > > > FAILED (errors=1) > >

Re: Unit testing PyFlink SQL project

2022-04-23 Thread Dian Fu
Dian Fu wrote: > Hi John, > > >> I don't know how to fix this. I've tried adding `flink-table-planner` > and `flink-table-planner-blink` dependencies with `test-jar` > to my dummy pom.xml, but it still fails. > What's the failure after doing this? The fli

Re: KAFKA SINK ERROR IN Flink-(version 1.14.4)

2022-04-23 Thread Dian Fu
Hi Harshit, Could you try to update the following line `ds = ds.map(lambda x: ','.join([str(value) for value in x]))` as following: `ds = ds.map(lambda x: ','.join([str(value) for value in x]), output_type=Types.STRING())` The reason is that if the output type is not specified, it will be seriali

Re: Unit testing PyFlink SQL project

2022-04-23 Thread Dian Fu
Hi John, >> I don't know how to fix this. I've tried adding `flink-table-planner` and `flink-table-planner-blink` dependencies with `test-jar` to my dummy pom.xml, but it still fails. What's the failure after doing this? The flink-table-planner*-tests.jar should be available in maven repository[1]

Re: AvroRowDeserializationSchema

2022-04-22 Thread Dian Fu
, Apr 22, 2022 at 7:24 PM Dian Fu wrote: > Hi Quynh, > > Could you show some sample code on how you use it? > > Regards, > Dian > > On Fri, Apr 22, 2022 at 1:42 PM lan tran wrote: > >> Wonder if this is a bug or not but if I use >> *AvroRowDeserializationSc

Re: AvroRowDeserializationSchema

2022-04-22 Thread Dian Fu
Hi Quynh, Could you show some sample code on how you use it? Regards, Dian On Fri, Apr 22, 2022 at 1:42 PM lan tran wrote: > Wonder if this is a bug or not but if I use > *AvroRowDeserializationSchema,* > > In PyFlink the error still occure ? > > py4j.protocol.Py4JError: An error occurred whil

Re: TumblingEventTimeWindows is not recognised by Flink-(version 1.14.4)

2022-04-19 Thread Dian Fu
Hi Harshit, I guess you are using the reference code of the master instead of Flink 1.14 which is the version you are using. TumblingEventTimeWindows is introduced in Flink 1.16 which is still not released. However, it could be seen as an utility class and so I think you could just copy it into y

Re: Interrupt collect() function when reading from kafka topic (in pyflink)

2022-04-14 Thread Dian Fu
Hi Marjan, The method `collect` is used to collect the content of a table. However, as `insert_statement` is a `INSERT INTO` statement and so there is no table to collect from in your example. You could try the following code: ``` sql_statement = """ SELECT window_start, window_end, COUNT(

Re: FW: Pyflink Kafka consumer error (version 1.14.4)

2022-04-14 Thread Dian Fu
Hi Harshit, I think you could double check whether the version of flink-sql-connector-kafka.jar is also 1.14.4. Regards, Dian On Thu, Apr 14, 2022 at 7:51 PM harshit.varsh...@iktara.ai < harshit.varsh...@iktara.ai> wrote: > > > > > *From:* harshit.varsh...@iktara.ai [mailto:harshit.varsh...@ikt

Re: How to process events with different JSON schemas in single Kinesis stream using PyFlink?

2022-04-10 Thread Dian Fu
Hi John, 1) Regarding to Table API, you could declare the column `detail` as STRING and then parse it into a json in the Python use-defined function as following: ``` @udf(result_type=DataTypes.STRING()) def get_id(detail): detail_json = json.loads(detail) if 'build-id' in detail_json:

Re: table api conditionally double quoting data in csv

2022-04-10 Thread Dian Fu
Hi Shameet, The reason should be that it adds quotes around string data according to the length by default. You could disable the quotes using option csv-disable-quote-character [1]. However, there are still no options to configure it to always add quotes around string data. If that's your require

Re: python table api

2022-04-06 Thread Dian Fu
You have not configured the tumbling window at all. Please refer to [1] for more details. Regards, Dian [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation On Wed, Apr 6, 2022 at 10:46 PM ivan.ros...@agilent.com < ivan.ro

Re: Not able to connect with Elasticsearch using PyFlink and connector-jar

2022-04-06 Thread Dian Fu
gt; password nothing happens. It gives no error message. > > On Wed, Apr 6, 2022 at 11:54 AM Dian Fu wrote: > > > I think you should use [1] or [2] instead. See [3] for more details. > > > > PS: This question is more fit for the user mailing list. > > > > Re

Re: Not able to connect with Elasticsearch using PyFlink and connector-jar

2022-04-05 Thread Dian Fu
I think you should use [1] or [2] instead. See [3] for more details. PS: This question is more fit for the user mailing list. Regards, Dian [1] https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.14.4/flink-sql-connector-elasticsearch6_2.11-1.14.4.jar

Re: Python UDF Gauge Metrics not working & error log on Vectorized UDF

2022-03-24 Thread Dian Fu
Hi Jesry, Regarding the gauge metrics, I have verified that it's a bug introduced when bumping the Beam version. I have opened a ticket [1] to track it. Regarding the warning message, I think you could just ignore it. Per my understanding, it's saying that it's using a deprecated API of pyarrow.

Re: PyFlink : submission via rest

2022-03-06 Thread Dian Fu
The dedicated REST API is still not supported. However, you could try to use PythonDriver just like you said and just submit it like a Java Flink job. Regards, Dian On Sun, Mar 6, 2022 at 3:38 AM aryan m wrote: > Thanks Zhilong for taking a look! > > Primarily I am looking for ways to start it

Re: Pyflink1.13 or JavaFlink1.13 + Jpython + Python2.7, which way has better performance?

2022-03-06 Thread Dian Fu
Hi Vtygoss, >> As far as i know, the python APIs only provide a subset of about 2/3 of what's available in Java APIs; the performance of PyFlink is worse than JavaFlink and some features contributed after 1.10 are not implemented in PyFlink yet. There are two levels of API in Flink: Table API and

Re: Pyflink with pulsar

2022-02-22 Thread Dian Fu
Hi Ananth, It's already code freeze for 1.15 and you can refer to [1] for more details. Regards, Dian [1] https://www.mail-archive.com/dev@flink.apache.org/msg54262.html On Sun, Feb 20, 2022 at 1:51 AM Ananth Gundabattula < agundabatt...@darwinium.com> wrote: > Thanks a lot Wong. > > > > I was

Re: Change column names Pyflink Table/Datastream API

2022-02-16 Thread Dian Fu
Hi Francis, There should be multiple ways to achieve this. Do you mean that all these methods don't work for you? If so, could you show the sample code? Besides, another way you may try is `inputmetrics.alias("timestamp, device, name, value")`. Regards, Dian On Wed, Feb 16, 2022 at 8:14 AM Franc

Re: pyflink datastream job

2022-02-06 Thread Dian Fu
The following code snippet should just work: ``` from pyflink.datastream import StreamExecutionEnvironment env = StreamExecutionEnvironment.get_execution_environment() ``` It works both in local deployment and in flink clusters. You could refer to [1] on how to submit PyFlink jobs to a remote cl

Re: pyflink mixed with Java operators

2022-01-10 Thread Dian Fu
Hi, You could try the following method: ``` from pyflink.java_gateway import get_gateway jvm = get_gateway().jvm ds = ( DataStream(ds._j_data_stream.map(jvm.com.example.MyJavaMapFunction())) ) ``` Regards, Dian On Fri, Jan 7, 2022 at 1:00 PM Francis Conroy wrote: > Hi all, > > Does anyon

Re: Using venv in Pyflink

2021-12-23 Thread Dian Fu
ing the > process of pip install. > > I wonder if the installation affects the heartbeats between taskmanager > and jobmanager? > > Best, > Paul Lam > > 2021年12月23日 19:16,Dian Fu 写道: > > Hi Paul, > > Currently, you need to build venv in an environment where you wa

Re: Using venv in Pyflink

2021-12-23 Thread Dian Fu
Hi Paul, Currently, you need to build venv in an environment where you want to execute the PyFlink jobs. >> Also, I wonder if it’s possible for pyflink to optionally provide an automatically created venv for each pyflink job? Do you mean to create the venv during executing the job? If this is you

Re: PyFlink Perfomance

2021-12-21 Thread Dian Fu
Hi Francis, Could you share the benchmark code you use? Regards, Dian On Wed, Dec 22, 2021 at 11:31 AM Francis Conroy < francis.con...@switchdin.com> wrote: > I've just run an analysis using a similar example which involves a single > python flatmap operator and we're getting 100x less through

Re: Python Interop with Java defined org.apache.flink.api.common.functions.Function classes?

2021-12-14 Thread Dian Fu
Hi Kevin, You could try to use it as following: ``` from pyflink.java_gateway import get_gateway jvm = get_gateway().jvm ds = ( DataStream(ds._j_data_stream.map(jvm.com.example.MyJavaMapFunction())) ) ``` Regards, Dian On Wed, Dec 15, 2021 at 5:41 AM Kevin Lam wrote: > Hi all, > > We cur

Re: Java dependencies management in Pyflink

2021-12-14 Thread Dian Fu
Hi Paul, For connectors(including Kafka), it's recommended to use the fat jar which contains the dependencies. For example, for kafka, you could use https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.14.0/flink-sql-connector-kafka_2.11-1.14.0.jar Regards, Dian

Re: Pyflink 1.13.2 convert datastream into table BIG_INT type

2021-11-28 Thread Dian Fu
Hi Kamil, You need to use Types.LONG() for bigint type in Python DataStream API. See [1] for more details. Regards, Dian [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/data_types/#supported-data-types On Sun, Nov 28, 2021 at 11:26 PM Kamil ty wrote:

Re: Flink CLI - pass command line arguments to a pyflink job

2021-11-24 Thread Dian Fu
; argument is accessed from the python code, and the > arguments work as expected. > > Best regards > Kamil > > On Tue, 23 Nov 2021 at 02:48, Dian Fu wrote: > >> Hi Kamil, >> >> It's documented at the end of the page: >> https://nightli

Re: How to add Python packages to PyFlink job running in Session Cluster

2021-11-22 Thread Dian Fu
Does the exception `ModuleNotFoundError: No module named 'dotenv' ` occur during job submission or during job running? The argument `--pyRequirements` only works during job running, that's it will download the dependencies specified via `--pyRequirements` and add them to PYTHONPATH of the Python w

Re: Flink CLI - pass command line arguments to a pyflink job

2021-11-22 Thread Dian Fu
Hi Kamil, It's documented at the end of the page: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#submitting-pyflink-jobs . Regards, Dian On Tue, Nov 23, 2021 at 12:10 AM Matthias Pohl wrote: > Hi Kamil, > afaik, the parameter passing should work as normal by ju

Re: Deserialize generic kafka json message in pyflink. Single kafka topic, multiple message schemas (debezium).

2021-11-21 Thread Dian Fu
Hi Kamil, Actually FlinkKafkaConsumer expects a DeserializationSchema instead of JsonRowDeserialization and so I guess you could try SimpleStringSchema. Regards, Dian On Sat, Nov 20, 2021 at 5:55 AM Kamil ty wrote: > Hello all, > > I'm working on a pyflink job that's supposed to consume json m

Re: PyFlink Perfomance

2021-11-17 Thread Dian Fu
Hi, Is it possible to perform some benchmark for the first map (not the whole job)? Then you could get a basic understanding of whether the map implementation is a problem. Besides the map implementation, there is also some overhead introduced by the framework, e.g. the Java and Python process com

Re: Datastream processing on AWS with Python model

2021-11-10 Thread Dian Fu
Hi Des, Regarding kinesis datastream source: currently it still hasn't supported kinesis source natively in PyFlink DataStream API, however, you could use the Kinesis Table API & SQL connectors [1] and then convert the Table to DataStream [2] if you want to work with PyFlink DataStream API. Regar

Re: Getting mini-cluster logs when debugging pyflink from IDE

2021-11-09 Thread Dian Fu
this path? For example to write logs to a file > in the working dir? > > And is there a way to redirect logst from file to stdout? > > On 9 Nov 2021, at 09:00, Dian Fu wrote: > > Hi, > > The logs should appear in the log file of the TaskManger and you could > find it

Re: Pyflink PyPi build - scala 2.12 compatibility

2021-11-09 Thread Dian Fu
Hi Kamil, You are right that it comes with JAR packages of scala 2.11 in the PyFlink package as it has to select one version of JARs to bundle, either 2.11 or 2.12. Whether it works with scala 2.12 depends on how you submit your job. - If you execute the job locally, then it will use the JARs bund

Re: Getting mini-cluster logs when debugging pyflink from IDE

2021-11-08 Thread Dian Fu
Hi, The logs should appear in the log file of the TaskManger and you could find it under directory $PYTHON_INSTALLATION_DIR/site-packages/pyflink/log/ Regards, Dian On Mon, Nov 8, 2021 at 10:53 PM Роман VVvKamper wrote: > Hello, > > I'm trying to debug flink and pyflink job from IDE using mini

Re: How to write unit test for stateful operators in Pyflink apps

2021-11-07 Thread Dian Fu
Hi Long, I agree with Fabian that currently you have to test it with a e2e job. There are still no such test harnesses for PyFlink jobs. Regards, Dian On Fri, Nov 5, 2021 at 5:22 PM Long Nguyễn wrote: > Thanks, Fabian. I'll check it out. > > Hope that Dian can also give me some advice. > > Bes

Re: Execute PyFlink jobs: flink-python module seems to be missing or not working

2021-10-28 Thread Dian Fu
rks fine. > > > > Regards, > > Christian > > > > > > *Von:* Dian Fu > *Gesendet:* Donnerstag, 28. Oktober 2021 10:21 > *An:* Schmid Christian > *Cc:* user@flink.apache.org > *Betreff:* Re: Execute PyFlink jobs: flink-python module seems to be > missin

Re: Execute PyFlink jobs: flink-python module seems to be missing or not working

2021-10-28 Thread Dian Fu
Hi Schmid, Just as you have seen that the jar package of flink-python is located in the directory $FLINK_HOME/opt. It seems that this jar doesn't exist in your environment. Could you double check that? Regards, Dian On Thu, Oct 28, 2021 at 3:35 PM Schmid Christian wrote: > Hi, > > > > When I e

Re: Application mode - Custom Flink docker image with Python user code

2021-10-26 Thread Dian Fu
Hi Sumeet, It still has not provided special support to handle the dependencies for the Application mode in PyFlink. This means that the dependencies could be handled the same as the other deployment modes. However, it is indeed correct that the dependencies could be handled differently in applica

Re: FlinkKafkaProducer deprecated in 1.14 but pyflink binding still present?

2021-10-26 Thread Dian Fu
Hi Francis, Yes, you are right. It's still not updated in PyFlink as KafkaSource/KafkaSink are still not supported in PyFlink. Hopeful we could add that support in 1.15 and then we could deprecate/remove the legacy interfaces. Regards, Dian On Tue, Oct 26, 2021 at 12:53 PM Francis Conroy < franc

Re: Using the flink CLI option --pyRequirements

2021-10-20 Thread Dian Fu
Hi Francis Conroy, Do you want to debug the PyFlink job submitted via `flink run`? There is documentation [1] on how to debug it via `PyCharm`. PS: It supports the loopback mode in PyFlink which is enabled in local deployment. That's when you execute the PyFlink jobs locally, e.g. when executing

Re: pyflink keyed stream checkpoint error

2021-10-14 Thread Dian Fu
erCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341) > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal. > ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed > .runInContext(ServerImpl.java:867) > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable > .r

Re: PyFlink JDBC SQL Connector for SQL Server

2021-10-11 Thread Dian Fu
Hi, Currently it only supports derby, mysql, postgresql dialect. The dialect 'sqlserver' is still not supported. There is a ticket https://issues.apache.org/jira/browse/FLINK-14101 for this. Regards, Dian On Mon, Oct 11, 2021 at 9:43 PM Schmid Christian wrote: > Hi all > > > > According to the

Re: Pyflink data stream API to Table API conversion with multiple sinks.

2021-10-08 Thread Dian Fu
Hi Kamil, I guess `statement_set.execute()` should be enough. You could also check whether the job graph is expected via one of the following ways: - Call `print(statement_set.explain())` - Check the Flink web ui to see the job graph of the running job For your problems, could you double check wh

Re: Pyflik job data stream to table conversion declareManagedMemory exception

2021-10-07 Thread Dian Fu
Hi Kamil, I have checked that this method exists in 1.12.3: https://github.com/apache/flink/blob/release-1.12.3/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java#L137 Could you double check whether the Flink version is 1.12.3 (not just the PyFlink version)? Regards, D

Re: pyflink keyed stream checkpoint error

2021-09-23 Thread Dian Fu
PS: there are more information about this configuration in https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/python_config/#python-fn-execution-bundle-size > 2021年9月24日 上午10:07,Dian Fu 写道: > > I agree with Roman that it seems that the Python process ha

Re: pyflink keyed stream checkpoint error

2021-09-23 Thread Dian Fu
I agree with Roman that it seems that the Python process has crashed. Besides the suggestions from Roman, I guess you could also try to configure the bundle size to smaller value via “python.fn-execution.bundle.size”. Regards, Dian > 2021年9月24日 上午3:48,Roman Khachatryan 写道: > > Hi, > > Is it

Re: Flink CEP in PyFlink

2021-09-07 Thread Dian Fu
Hi Kamil, It still doesn’t support CEP in PyFlink. However, as it supports SQL in PyFlink and so you could take a look at if CEP on SQL [1] could meet your requirements. If so, you could use CEP on SQL in PyFlink. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/

Re: [Question] Basic Python examples.wordcount on local FlinkRunner

2021-09-02 Thread Dian Fu
This seems more like a Beam issue although it uses Flink runner. It would be helpful to also send it to the Beam user mailing list. Regarding to this issue itself, could you check is input.txt accessible in the Docker container? Regards, Dian > 2021年9月3日 上午5:19,Adam Pearce 写道: > > Hello all,

Re: PyFlink StreamingFileSink bulk-encoded format (Avro)

2021-08-17 Thread Dian Fu
Hi Kamil, AFAIK, it should still not support Avro format in Python StreamingFileSink in the Python DataStream API. However, I guess you could convert DataStream to Table[1] and then you could use all the connectors supported in the Table & SQL. In this case, you could use the FileSystem connect

Re: PyFlink performance and deployment issues

2021-08-15 Thread Dian Fu
Hi Wouter, I suspect that it’s transferring the file venv.zip and so it may take some time. Does it stuck there forever? Could you share some log file? Regards, Dian > 2021年8月14日 下午4:47,Wouter Zorgdrager 写道: > > Hi all, > > I'm still dealing with the PyFlink deployment issue as described bel

Re: ProcessFunctionTestHarnesses for testing Python functions

2021-08-15 Thread Dian Fu
Hi Bogdan, There is still no TestHarness for Python ProcessFunction. However it seems a good idea to provide a TestHarness for Python functions such as ProcessFunction. I have created https://issues.apache.org/jira/browse/FLINK-23787 as a following-up. Regards, Dian > 2021年8月13日 下午7:11,Matthi

Re: Using event time with Python DataStreamAPI

2021-08-03 Thread Dian Fu
f process( > self, > key: str, > content: ProcessWindowFunction.Context, > elements: Iterable[str], > ) -> Iterable[Tuple[str, int]]: > yield (key, sum(1 for elem in elements)) > > def clear(self, context: ProcessWindowFun

Re: Using event time with Python DataStreamAPI

2021-08-02 Thread Dian Fu
Regarding "Kafka consumer doesn’t read any message”: I’m wondering about this. Usually the processing logic should not affect the Kafka consumer. Did you judge this as there is no output for the job? If so, I’m guessing that it’s because the window wasn’t triggered in case of event-time. Could

  1   2   3   >