Re: Failed to unit test PyFlink UDF

2021-03-22 Thread Yik San Chan
Hi Dian, However users do want to unit test their UDFs, as supported in https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-user-defined-functions Even though the examples are for Flink, I believe PyFlink should ideally be no difference. What do you think? Bes

Re: Failed to unit test PyFlink UDF

2021-03-22 Thread Dian Fu
Hi Yik San, This field isn't expected to be exposed to users and so I'm not convinced that we should add such an interface/method in Flink. Regards, Dian On Tue, Mar 23, 2021 at 2:04 PM Yik San Chan wrote: > Hi Dian, > > The ._func method seems to be internal only. Maybe we can add some > publ

Re: OOM issues with Python Objects

2021-03-22 Thread Dian Fu
Hi Kevin, Is it possible to provide a simple example to reproduce this issue? PS: It will use pickle to perform the serialization/deserialization if you don't specify the type info. Regards, Dian On Mon, Mar 22, 2021 at 10:55 PM Arvid Heise wrote: > Hi Kevin, > > yes I understood that, but t

Re: Failed to unit test PyFlink UDF

2021-03-22 Thread Yik San Chan
Hi Dian, The ._func method seems to be internal only. Maybe we can add some public-facing method to make it more intuitive for use in unit test? What do you think? Best, Yik San On Tue, Mar 23, 2021 at 2:02 PM Yik San Chan wrote: > Hi Dian, > > Thanks! It solves my problem. > > Best, > Yik San

Re: Failed to unit test PyFlink UDF

2021-03-22 Thread Yik San Chan
Hi Dian, Thanks! It solves my problem. Best, Yik San On Tue, Mar 23, 2021 at 1:29 PM Dian Fu wrote: > H Yik San, > > As the udf `add` is decorated with `@udf` decorator, it is no longer a > simple Python function if you reference `add`. If you execute > `print(type(add(1, 1)))`, you will see t

Re: Checkpoint fail due to timeout

2021-03-22 Thread Alexey Trenikhun
I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on each record, it inverted behavior - now Legacy Source thread waits for checkpointLock, while Source requesting memorySegment. "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on java.util.concurrent.Comp

Re: Failed to unit test PyFlink UDF

2021-03-22 Thread Dian Fu
H Yik San, As the udf `add` is decorated with `@udf` decorator, it is no longer a simple Python function if you reference `add`. If you execute `print(type(add(1, 1)))`, you will see the output is something like "". You could try the following code: assert add._func(1, 1) == 3 add._func returns

Re: Flink on Minikube

2021-03-22 Thread Sandeep khanzode
Hi Arvid, I copy the JAR to the usrlib folder. This works in the Cloud EKS cluster. I wanted to set this up for my testing purposes. Below is the Dockerfile: FROM apache/flink:1.12.1-java11 RUN mv /opt/flink/opt/flink-queryable-state-runtime_2.12-1.12.1.jar /opt/flink/lib/flink-queryable-state-

QueryableStateClient getKVState

2021-03-22 Thread Sandeep khanzode
Hi, I have a stream that exposes the state for Queryable State. I am using the key as follows: public class MyKey { private Long first; private EnumType myType; private Long second; private TreeMap map; @Override public boolean equals(Object o) { if (this == o) r

Re: Editing job graph at runtime

2021-03-22 Thread Ejaskhan S
Thanks Arvid for the reply. Can you please elaborate a little bit on option 2 , if possible ? We are looking for a similar option. Currently we are proceeding with option 1. Thanks Jessy for the question On Mon, Mar 22, 2021, 7:27 PM Arvid Heise wrote: > Hi Jessy, > > Can I add a new sink int

Re: Editing job graph at runtime

2021-03-22 Thread Ejaskhan S
Thanks Arvid for the reply. Can you please elaborate a little bit on option 2 , if possible ? Thanks Jessy On Mon, Mar 22, 2021, 7:27 PM Arvid Heise wrote: > Hi Jessy, > > Can I add a new sink into the execution graph at runtime, for example : a >> new Kafka producer , without restarting the c

Failed to unit test PyFlink UDF

2021-03-22 Thread Yik San Chan
(This question is cross-posted on StackOverflow https://stackoverflow.com/questions/66756612/failed-to-unit-test-pyflink-udf ) I am using PyFlink and I want to unit test my UDF written in Python. To test the simple udf below: ```python # tasks/helloworld/udf.py from pyflink.table import DataType

Re: Parameter to config read frequency in Kafka SQL connector

2021-03-22 Thread eef hhj
Hi Dawid, Thank you for the reply, I do not find such property in Kafka connector's config, while it's easy to customize with the passed in properties parameter. *-- Best wishes* *Kai* On Fri, Mar 19, 2021 at 5:21 PM Dawid Wysakowicz wrote: > Hi, > > Unfortunately I have no experience with thi

Re: Approaches to customize the parallelism in SQL generated operators

2021-03-22 Thread eef hhj
Hi David, Thank you for the response. We are facing a situation of cold start for our application. In the cold start phase, it requires a lot of parallelism to make the busiest operator not overwhelmed so that there will be no backpresure and no checkpoint works as normal. The problem is that such

Flink Streaming Counter

2021-03-22 Thread Vijayendra Yadav
Hi Team, Could you provide a sample how to pass Flink Datastream Source and sink results to increment COUNTER and then I want to display the Counter in Local IDE. Counter to display for #1 through #3. 1) DataStream messageStream = env.addSource(Kinesis Source); 2) DataStream outputStream = messag

Re: Flink application has slightly data loss using Processing Time

2021-03-22 Thread Rainie Li
I will try that. Thanks for your help, David. Best regards Rainie On Sat, Mar 20, 2021 at 9:46 AM David Anderson wrote: > You should increase the kafka transaction timeout -- > transaction.max.timeout.ms -- to something much larger than the default, > which I believe is 15 minutes. Suitable val

Re: Checkpoint fail due to timeout

2021-03-22 Thread Alexey Trenikhun
Would it help if checkpoint would be fair lock? It looks strange, downstream produces output, so I assume at some moment buffers become available, but lock can’t be acquired for 3+hours From: Roman Khachatryan Sent: Monday, March 22, 2021 1:36:35 AM To: ChangZhu

Re: Checkpoint fail due to timeout

2021-03-22 Thread Alexey Trenikhun
Great! I doubt that it will help in my case however, since in my case even unaligned checkpoints “stuck”, in difference with aligned checkpoints, after unaligned checkpoint triggered, Flink at some moment become idle, kubernetes metrics report very little CPU usage by container, but unaligned ch

Evenly distribute task slots across task-manager

2021-03-22 Thread Vignesh Ramesh
Hello Everyone, Can someone help me with a solution? I have a flink job(2 task-managers) with a job parallelism of 64 and task slot of 64. I have a parallelism set for one of the operators as 16. This operator(16 parallelism) slots are not getting evenly distributed across two task managers. It o

Re: OrcTableSource in flink 1.12

2021-03-22 Thread Nikola Hrusov
Hi Timo, I need to read ORC files and run a query on them as in the example above. Since the example given in docs is not recommended what should I use? I looked into the method you suggest - TableEnvironment#fromTableSource - it shows as Deprecated on the docs: https://ci.apache.org/projects/fli

Re: Capturing Statement Execution / Results within JdbcSink

2021-03-22 Thread Roman Khachatryan
Hey Rion, Regarding > Accessing Statement Execution / Results, There are no ways currently to get the update count from the database unfortunately. As for the > Batching Mechanisms (withBatchIntervalMs & withBatchSize), These parameters should have "OR" semantics: the database should be updated w

Re: OOM issues with Python Objects

2021-03-22 Thread Arvid Heise
Hi Kevin, yes I understood that, but then your Python class contains a Row field, where no mapping exists. I'm assuming PyFlink tries to do a deep conversion and fails to do so by ending in some infinite loop. On Mon, Mar 22, 2021 at 3:48 PM Kevin Lam wrote: > Thanks for the response Arvid! Poi

Re: Histogram

2021-03-22 Thread Arvid Heise
Hi Alexey, I'm assuming you talk about Prometheus Summaries? Then afaik there is no way to get the histogram, we currently do not expose raw values for consistency. However, you could write your own metric reporter (wrapping your existing reporter) and expose additionally the raw values. On Sat

Re: OrcTableSource in flink 1.12

2021-03-22 Thread Timo Walther
Hi Nikola, the OrcTableSource has not been updated to be used in a SQL DDL. You can define your own table factory [1] that translates properties into a object to create instances or use `org.apache.flink.table.api.TableEnvironment#fromTableSource`. I recommend the latter option. Please kee

Re: OOM issues with Python Objects

2021-03-22 Thread Kevin Lam
Thanks for the response Arvid! Point of clarification, *things do NOT OOM when I use the Row subclass*. Instead, the code that doesn't use the Row subclass is the code that OOMs (ie. the simple python class). On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise wrote: > Hi Kevin, > > I suspect that th

Re: 如何用Java代码的方式在standalone上运行flink的jar包?

2021-03-22 Thread Arvid Heise
This is english user list. You can either repost your question to user...@flink.apache.org (new registration necessary) or translate your question here. Best, Arvid On Mon, Mar 22, 2021 at 3:00 AM 苏喜 张 <15138217...@163.com> wrote: > 如何在项目中使用Java代码的方式,发送一个请求, > > 提交jar包在单节点的flink上运行? > > >

Re: DataDog and Flink

2021-03-22 Thread Arvid Heise
Hi Vishal, I have no experience in the Flink+DataDog setup but worked a bit with DataDog before. I'd agree that the timeout does not seem like a rate limit. It would also be odd that the other TMs with a similar rate still pass. So I'd suspect n/w issues. Can you log into the TM's machine and try

Re: Flink on Minikube

2021-03-22 Thread Arvid Heise
Hi Sandeep, The first error definitively indicates a classloading issue, which may also be the cause for the second error. Can you describe where you put your jar inside the docker image and which execution mode you are using? As a general rule, the jar is not supposed to go into flink/lib. Also

Re: OOM issues with Python Objects

2021-03-22 Thread Arvid Heise
Hi Kevin, I suspect that this is because Row is not supported as a Python field [1]; it's supposed to be a dict that is mapped to a Row by Flink. Maybe it runs in some infinite loop while trying serialize and hence the OOM. Subclassing Row might be an undocumented feature. I'm also pulling in Di

Re: Saved State in FSstate Backend

2021-03-22 Thread Arvid Heise
All checkpoints are managed by the Checkpoint Coordinator and deleted once the job stops. You can use retained checkpoints [1] to keep them after job termination. You can also use stop-with-savepoint to create a savepoint [2]. [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/ch

Re: Production Readiness of File Source

2021-03-22 Thread Arvid Heise
Hi Chirag, we are also happy to take contributions to add features like the file deletion. Since the file source is already running well in our deployments, I'd also recommend it to take it as a base for your custom implementation (getting a source right is not trivial). On Thu, Mar 18, 2021 at

Re: How does Flink SQL read Avro union?

2021-03-22 Thread Arvid Heise
Hi Vincent, I'm not well into Flink SQL, so I'm pulling in Jark. I have stopped using union records in your way and instead only use nullable fields (technically also a union field but much easier to handle in all languages). So if you have a way to change the schema, maybe try it out: record

Re: Checkpoint fail due to timeout

2021-03-22 Thread Arvid Heise
Hi Alexey, rescaling from unaligned checkpoints will be supported with the upcoming 1.13 release (expected at the end of April). Best, Arvid On Wed, Mar 17, 2021 at 8:29 AM ChangZhuo Chen (陳昌倬) wrote: > On Wed, Mar 17, 2021 at 05:45:38AM +, Alexey Trenikhun wrote: > > In my opinion looks

Re: Editing job graph at runtime

2021-03-22 Thread Arvid Heise
Hi Jessy, Can I add a new sink into the execution graph at runtime, for example : a > new Kafka producer , without restarting the current application or using > option1 ? > No, there is no way to add a sink without restart currently. Could you elaborate why a restart is not an option for you? Y

Re: parquet protobuf output and aws athena support

2021-03-22 Thread Arvid Heise
Hi Jin, I have no experience with your combination. Did you check if you can read the file in a standalone java format? That may help to provide you some meaningful logs. On Mon, Mar 15, 2021 at 8:51 PM Jin Yi wrote: > using ParquetProtoWriters >

Re: Read the metadata files (got from savepoints)

2021-03-22 Thread Matthias Pohl
Hi Abdullah, you might also want to have a look at the State Processor API [1]. Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html On Mon, Mar 22, 2021 at 6:28 AM Congxian Qiu wrote: > Hi >Maybe you can reach to this test[1] for refe

Re: Recommended way to split datastream in PyFlink

2021-03-22 Thread Dian Fu
Hi Sumeet, Yes, you are right. It supports mix use of PyFlink DataStream API and PyFlink Table API. It's recommended to use PyFlink Table API for window operations for now before it's supported in PyFlink DataStream API. Regards, Dian On Mon, Mar 22, 2021 at 4:34 PM Sumeet Malhotra wrote: > Ap

Re: Netty LocalTransportException: Sending the partition request to 'null' failed

2021-03-22 Thread Matthias Seiler
Thanks a bunch! I replaced 127.0.1.1 with the actual IP address and it works now :) On 3/15/21 3:22 PM, Robert Metzger wrote: > Hey Matthias, > > are you sure you can connect to 127.0.1.1, since everything between > 127.0.0.1 and  127.255.255.255 is bound to the loopback device?: > https://serverf

Re: Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

2021-03-22 Thread Stephan Ewen
Hi Yingjie! Thanks for doing those experiments, the results look good. Let's go ahead with 32M then. Regarding the key, I am not strongly opinionated there. There are arguments for both keys, (1) making the key part of the network pool config as you did here or (2) making it part of the TM config

Re: Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

2021-03-22 Thread Till Rohrmann
Thanks for the update Yingjie. Then let's go with 32 MB I would say. Concerning the name of the configuration option I see Xintong's point. If the batch shuffle is subtracted from `taskmanager.memory.framework.off-heap.size` because it is part of the off-heap pool, then something like `taskmanager

Re: Checkpoint fail due to timeout

2021-03-22 Thread Roman Khachatryan
Thanks for sharing the thread dump. It shows that the source thread is indeed back-pressured (checkpoint lock is held by a thread which is trying to emit but unable to acquire any free buffers). The lock is per task, so there can be several locks per TM. @ChangZhuo Chen (陳昌倬) , in the thread you

Re: Recommended way to split datastream in PyFlink

2021-03-22 Thread Sumeet Malhotra
Apologies. I meant `StreamTableEnvironment.to_append_stream` in my last message. On Mon, Mar 22, 2021 at 2:03 PM Sumeet Malhotra wrote: > Thanks Dian. > > Another question I have is, since PyFlink Datastream API still doesn't > have native Window support, what's the recommended way to introduce

Re: Recommended way to split datastream in PyFlink

2021-03-22 Thread Sumeet Malhotra
Thanks Dian. Another question I have is, since PyFlink Datastream API still doesn't have native Window support, what's the recommended way to introduce windows? Use PyFlink Table API for windows in conjunction with the Datastream APIs? For example, read input records from Kafka into a table and th

Re: Python API + Unit Testing

2021-03-22 Thread Dian Fu
Hi Kevin, >> One approach I could see is using StreamingFileSinks, and validating the output files for a bounded stream. This is a good approach from my point of view. Actually, the end to end tests in Flink also takes this kind of approach. This doesn't only apply for Python jobs, but also for

Re: Recommended way to split datastream in PyFlink

2021-03-22 Thread Dian Fu
Hi Sumeet, It still doesn't support side outputs in PyFlink. >> Or do I have to replicate the input datastream and then apply record specific filters? I'm afraid that yes. Regards, Dian On Sun, Mar 21, 2021 at 5:20 PM Sumeet Malhotra wrote: > Hi, > > I have a use case where I need to process