Re: PyFlink :: Bootstrap UDF function

2020-10-14 Thread Sharipov, Rinat
Hi Dian ! Thx a lot for your reply, it's very helpful for us. чт, 15 окт. 2020 г. в 04:30, Dian Fu : > Hi Rinat, > > It's called in single thread fashion and so there is no need for the > synchronization. > > Besides, there is a pair of open/close methods in the ScalarFunction and > you could

Re: Processing single events for minimum latency

2020-10-14 Thread Piotr Nowojski
No problem :) Piotrek czw., 15 paź 2020 o 08:18 Pankaj Chand napisał(a): > Thank you for the quick and informative reply, Piotrek! > > On Thu, Oct 15, 2020 at 2:09 AM Piotr Nowojski > wrote: > >> Hi Pankay, >> >> Yes, you can trigger a window per each element, take a look at the Window >> Trig

Re: Broadcasting control messages to a sink

2020-10-14 Thread Piotr Nowojski
Hi Julian, I think the problem is that BroadcastProcessFunction and SinkFunction will be executed by separate operators, so they won't be able to share state. If you can not split your logic into two, I think you will have to workaround this problem differently. 1. Relay on operator chaining and

Re: Processing single events for minimum latency

2020-10-14 Thread Pankaj Chand
Thank you for the quick and informative reply, Piotrek! On Thu, Oct 15, 2020 at 2:09 AM Piotr Nowojski wrote: > Hi Pankay, > > Yes, you can trigger a window per each element, take a look at the Window > Triggers [1]. > > Flink is always processing all records immediately. The only things that >

Re: Processing single events for minimum latency

2020-10-14 Thread Piotr Nowojski
Hi Pankay, Yes, you can trigger a window per each element, take a look at the Window Triggers [1]. Flink is always processing all records immediately. The only things that can delay processing elements are: - buffering elements on the operator's state (vide WindowOperator) - buffer-timeout (but t

Re: Large state RocksDb backend increases app start time

2020-10-14 Thread Yun Tang
Hi Arpith If you use savepoint to restore RocksDB state, the actual phase is to insert original binary key-value pairs into an empty RocksDB which would be slow if state large. There existed several discussions about the optimizations of this phase [1] [2]. If you want to walk around this issu

Re: Setting JDBC connector options using JdbcCatalog

2020-10-14 Thread Leonard Xu
Hi, Dylan The table in JdbcCatalog only contains basic options, it’s normal the table from JdbcCatalog does not bring some options. Flink provides SQL Hints feature to specify or override table options[1], you can have a try. Best, Leonard [1] https://ci.apache.org/projects/flink/flink-docs-

Setting JDBC connector options using JdbcCatalog

2020-10-14 Thread Dylan Forciea
I was experimenting with the JdbcCatalog, and I see that the options match some of the SQL WITH options. I looked at the source code, and even see that it directly references those options from JdbcDynamicTableFactory. However, I didn’t see any obvious way to set scan.fetch-size or any way to ge

Re: PyFlink :: Bootstrap UDF function

2020-10-14 Thread Dian Fu
Hi Rinat, It's called in single thread fashion and so there is no need for the synchronization. Besides, there is a pair of open/close methods in the ScalarFunction and you could also override them and perform the initialization work in the open method. Regards, Dian > 在 2020年10月15日,上午3:19,Sh

Re: Processing single events for minimum latency

2020-10-14 Thread Pankaj Chand
Hi Piotrek, Thank you for replying! I want to process each record as soon as it is ingested (or reaches an operator) without waiting for a window for records to arrive. However, by not using windows, I am not sure if each record gets emitted immediately upon processing. > You still can use window

Re: Safer handling of Scala immutable collections

2020-10-14 Thread Rex Fenley
(Or small correction; a Row with a column of Array of Longs, but still) On Wed, Oct 14, 2020 at 4:46 PM Rex Fenley wrote: > I believe I found the issue: > new RowTypeInfo(Types.PRIMITIVE_ARRAY(Types.LONG())) > vs > new RowTypeInfo(createTypeInformation[Array[Long]]) > I didn't quite understand a

Re: Safer handling of Scala immutable collections

2020-10-14 Thread Rex Fenley
I believe I found the issue: new RowTypeInfo(Types.PRIMITIVE_ARRAY(Types.LONG())) vs new RowTypeInfo(createTypeInformation[Array[Long]]) I didn't quite understand at the time whose type information I was meant to supply, now I do. However, I think my question still stands. Is there a way for this

Re: Safer handling of Scala immutable collections

2020-10-14 Thread Jeff Zhang
Could you share your code to reproduce it ? Rex Fenley 于2020年10月15日周四 上午5:54写道: > Hello, > > I've been playing with UDFs using the Scala API and have repeatedly run > into issues such as this: > ``` > flink-taskmanager_1| java.lang.ClassCastException: > scala.collection.immutable.Set$EmptySe

Re: Safer handling of Scala immutable collections

2020-10-14 Thread Rex Fenley
While trying to use a mutable.Set and later .asJava I receive the following flink-jobmanager_1 | Caused by: java.lang.ClassCastException: scala.collection.convert.Wrappers$MutableSetWrapper cannot be cast to [J flink-jobmanager_1 | at org.apache.flink.table.data.util.DataFormatConverters$Pr

Safer handling of Scala immutable collections

2020-10-14 Thread Rex Fenley
Hello, I've been playing with UDFs using the Scala API and have repeatedly run into issues such as this: ``` flink-taskmanager_1| java.lang.ClassCastException: scala.collection.immutable.Set$EmptySet$ cannot be cast to [J ``` Is there something that can be done on Flink's end, either to catch

PyFlink :: Bootstrap UDF function

2020-10-14 Thread Sharipov, Rinat
Hi mates ! I keep moving in my research of new features of PyFlink and I'm really excited about that functionality. My main goal is to understand how to integrate our ML registry, powered by ML Flow and PyFlink jobs and what restrictions we have. I need to bootstrap the UDF function on it's start

Re: [QUERY] Multiple elastic search sinks for a single flink pipeline

2020-10-14 Thread Chesnay Schepler
Are the number of sinks fixed? If so, then you can just take the output of your map function and apply multiple filters, writing the output of each filter into a sync. You could also use a process function with side-outputs, and apply a source to each output. On 10/14/2020 6:05 PM, Vignesh Ram

Re: Broadcasting control messages to a sink

2020-10-14 Thread Jaffe, Julian
Thanks for the suggestion Piotr! The problem is that the sink needs to have access to the schema (so that it can write the schema only once per file instead of record) and thus needs to know when the schema has been updated. In this proposed architecture, I think the sink would still need to ch

Runtime Dependency Issues Upgrading to Flink 1.11.2 from 1.9.2

2020-10-14 Thread Hailu, Andreas
Hi team! We're trying to upgrade our applications from 1.9.2 to 1.11.2. After re-compiling and updating our runtime dependencies to use 1.11.2, we see this LinkageError: Caused by: java.lang.LinkageError: ClassCastException: attempting to castjar:file:/local/data/scratch/hailua_p2epdlsuat/flink

StatsD metric name prefix change for task manager after upgrading to Flink 1.11

2020-10-14 Thread Allen Wang
Hello, We noticed that after upgrading to Flink 1.11, the StatsD metric prefix is changed from the hostname to IP address of the task manager. The Flink job runs in a k8s cluster. Here is an example of metric reported to StatsD in Flink 1.10: flink-ingest-cx-home-page-feed-flink-task-manager-7f

Large state RocksDb backend increases app start time

2020-10-14 Thread Arpith P
Hi, I'm currently storing around 70GB of data in map sate backed by RocksDB backend . Once I restore an application from savepoint currently the application takes more than 4mins to start processing events. How can I speed this up or is there any other recommended approach. I'm using the followin

Re: Upgrade to Flink 1.11 in EMR 5.31 Command line interface

2020-10-14 Thread Piotr Nowojski
I'm glad to hear that :) Best regards, Piotrek śr., 14 paź 2020 o 18:28 Vijayendra Yadav napisał(a): > Thank You Piotre. I moved *flink-s3-fs-hadoop* library to plugin. Now > it's good. > > > On Wed, Oct 14, 2020 at 6:23 AM Piotr Nowojski > wrote: > >> Hi, >> >> Are you sure you are loading

Re: Upgrade to Flink 1.11 in EMR 5.31 Command line interface

2020-10-14 Thread Vijayendra Yadav
Thank You Piotre. I moved *flink-s3-fs-hadoop* library to plugin. Now it's good. On Wed, Oct 14, 2020 at 6:23 AM Piotr Nowojski wrote: > Hi, > > Are you sure you are loading the filesystems correctly? Are you using the > plugin mechanism? [1] Since Flink 1.10 plugins can only be loaded in thi

回复: what's the datasets used in flink sql document?

2020-10-14 Thread 大森林
much Manks for your replies I mean,where the "france revenue" in the following document ? https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html Thanks for your help~ -- 原始邮件 -- 发件人:

[QUERY] Multiple elastic search sinks for a single flink pipeline

2020-10-14 Thread Vignesh Ramesh
My requirement is to send the data to a different ES sink (based on the data). Ex: If the data contains a particular info send it to sink1 else send it to sink2 etc(basically send it dynamically to any one sink based on the data). I also want to set parallelism separately for ES sink1, ES sink2, Es

Re: Dynamic file name prefix - StreamingFileSink

2020-10-14 Thread Piotr Nowojski
Great! Please let us know if it solves the issue or not. Best, Piotrek śr., 14 paź 2020 o 17:46 Vijayendra Yadav napisał(a): > Hi Piotrek, > > That is correct I was still in 1.10, I am upgrading to 1.11. > > Regards, > Vijay > > On Wed, Oct 14, 2020 at 6:12 AM Piotr Nowojski > wrote: > >> Hi Y

Re: Dynamic file name prefix - StreamingFileSink

2020-10-14 Thread Vijayendra Yadav
Hi Piotrek, That is correct I was still in 1.10, I am upgrading to 1.11. Regards, Vijay On Wed, Oct 14, 2020 at 6:12 AM Piotr Nowojski wrote: > Hi Yadav, > > What Flink version are you using? `getPartPrefix` and `getPartSufix` > methods were not public before 1.10.1/1.11.0, which might be caus

Re: what's the datasets used in flink sql document?

2020-10-14 Thread Piotr Nowojski
Hi, Can you link what document do you have in mind? The documentation [1]? I don't think so. There are working examples, located in the binary distribution under the `examples/table/` directory. Their code is available in the repository [2]. Best regards, Piotrek [1] https://ci.apache.org/proje

回复: what's the datasets used in flink sql document?

2020-10-14 Thread 大森林
sorry that I did not make it clear. I mean: Is there such a dataset can be downloaded  to satisfy all the examples in the document? Thanks for your help -- 原始邮件 -- 发件人:

Re: what's the datasets used in flink sql document?

2020-10-14 Thread Piotr Nowojski
Hi, It depends how you defined `orders` in your example. For example here [1] > Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime) `orders` is obtained from the environment, from a table registered under the name "Orders". You would need to first register such table, or register a

Re: Processing single events for minimum latency

2020-10-14 Thread Piotr Nowojski
Hi Pankaj, I'm not entirely sure if I understand your question. If you want to minimize latency, you should avoid using windows or any other operators, that are buffering data for long periods of time. You still can use windowing, but you might want to emit updated value of the window per every p

Re: Broadcasting control messages to a sink

2020-10-14 Thread Piotr Nowojski
Hi Julian, Have you seen Broadcast State [1]? I have never used it personally, but it sounds like something you want. Maybe your job should look like: 1. read raw messages from Kafka, without using the schema 2. read schema changes and broadcast them to 3. and 5. 3. deserialize kafka records in B

Re: Upgrade to Flink 1.11 in EMR 5.31 Command line interface

2020-10-14 Thread Piotr Nowojski
Hi, Are you sure you are loading the filesystems correctly? Are you using the plugin mechanism? [1] Since Flink 1.10 plugins can only be loaded in this way [2], while there were some changes to plug some holes in Flink 1.11 [3]. Best, Piotrek [1] https://ci.apache.org/projects/flink/flink-docs-s

what's the datasets used in flink sql document?

2020-10-14 Thread ??????
Could anyone tell me  what's the datasets used in flink sql document? For sql like: val revenue = orders .filter($"cCountry" === "FRANCE") .groupBy($"cID", $"cName") .select($"cID", $"cName", $"revenue".sum AS "revSum") Thanks for your help

Re: Dynamic file name prefix - StreamingFileSink

2020-10-14 Thread Piotr Nowojski
Hi Yadav, What Flink version are you using? `getPartPrefix` and `getPartSufix` methods were not public before 1.10.1/1.11.0, which might be causing this problem for you. Other than that, if you are already using Flink 1.10.1 (or newer), maybe please double check what class are you extending? The e

Processing single events for minimum latency

2020-10-14 Thread Pankaj Chand
Hi all, What is the recommended way to make a Flink job that processes each event individually as soon as it comes and without waiting for a window, in order to minimize latency in the entire DAG of operators? For example, here is some sample WordCount code (without windws), followed by some know

Re: NPE when checkpointing

2020-10-14 Thread Piotr Nowojski
No worries, thanks for the update! It's good to hear that it worked for you. Best regards, Piotrek wt., 13 paź 2020 o 22:43 Binh Nguyen Van napisał(a): > Hi, > > Sorry for the late reply. It took me quite a while to change the JDK > version to reproduce the issue. I confirmed that if I upgrade

Broadcasting control messages to a sink

2020-10-14 Thread Jaffe, Julian
Hey all, I’m building a Flink app that pulls in messages from a Kafka topic and writes them out to disk using a custom bucketed sink. Each message needs to be parsed using a schema that is also needed when writing in the sink. This schema is read from a remote file on a distributed file system

Re: Required context properties mismatch in connecting the flink with mysql database

2020-10-14 Thread Dawid Wysakowicz
Hi, I think the problem is that you are using BatchTableEnvironment which is deprecated and does not support newer features such as e.g. FLIP-95 sources/sinks. I am sorry it is not more prominent in the documentation. I am not too familiar with the python API, and I am not sure if a unified Table