Re: Checkpoint fail due to timeout

2021-03-11 Thread Roman Khachatryan
Hello, This can be caused by several reasons such as back-pressure, large snapshots or bugs. Could you please share: - the stats of the previous (successful) checkpoints - back-pressure metrics for sources - which Flink version do you use? Regards, Roman On Thu, Mar 11, 2021 at 7:03 AM Alexey

Re: Question about Reactive mode support

2021-03-11 Thread Robert Metzger
Hey Sonam, I'm wondering whether it may be helpful to have a min and max parallelism, > and the actual parallelism be determined by the scaling policy mentioned > next? Yes, that's certainly possible. Thanks a lot for your input on the design of a scaling policy. Your input is very valuable fo

Re: clear() in a ProcessWindowFunction

2021-03-11 Thread Vishal Santoshi
Essentially, Does this code leak state private static class SessionIdProcessWindowFunction extends ProcessWindowFunction, KeyedSessionWithSessionID< KEY, VALUE>, KEY, TimeWindow> { private static final long serialVersionUID = 1L; private final static ValueStateDescriptor sessionId = new ValueState

clear() in a ProcessWindowFunction

2021-03-11 Thread Vishal Santoshi
Hello folks, The suggestion is to use windowState() for a key key per window state and clear the state explicitly. Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scope

Guidelines for setting task slots when running multiple jobs in a Flink cluster

2021-03-11 Thread Sush Bankapura
Hi, We have multiple jobs that need to be deployed to a Flink cluster. Parallelism for jobs vary and dependent on the type of work being done and so are the memory requirements. All jobs currently use the same state backend. Since the workloads handled by each job is different, the scaling p

Re: Flink Read S3 Intellij IDEA Error

2021-03-11 Thread sri hari kali charan Tummala
Let's close this issue guys please answer my questions. I am using Flink 1.8.1. Thanks Sri On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see > ConfigConstants.ENV_FLINK_LIB_DIR will this

Re: Flink and Nomad ( from Hashicorp)

2021-03-11 Thread Vishal Santoshi
Makes total sense, Thanks, I'll check it out. On Wed, Mar 10, 2021 at 1:28 PM Till Rohrmann wrote: > Hi Vishal, > > There is no specific reason why Flink does not have a Nomad HA > implementation other than it has not been done yet. As long as Nomad > supports leader election, service recovery a

Re: How to do code isolation if muiltple jobs run on the same taskmanager process?

2021-03-11 Thread Lei Wang
Thanks Arvid, If too many jobs run in the same task manager JVM, will it cause too much metaspace memory occupation? Thanks, Lei On Thu, Mar 11, 2021 at 11:54 PM Arvid Heise wrote: > Hi Lei, > > each application has its own classloader as such each static constant > exists multiple times (1

Re: How to debug checkpoint/savepoint stuck in Flink 1.12.2

2021-03-11 Thread 陳昌倬
On Thu, Mar 11, 2021 at 02:14:32PM +0100, Arvid Heise wrote: > Hi ChangZhuo, > > Did you upgrade to Flink 1.12.2 and change the settings at the time? If so, > could you maybe reset the settings to the old values on Flink 1.12.2 and > check if the job still gets stuck? Especially, turning off unali

Re: Evenly Spreading Out Source Tasks

2021-03-11 Thread Aeden Jameson
Hi Arvid, Thanks for responding. I did check the configuration tab of the job manager and the setting cluster.evenly-spread-out-slots: true is there. However I'm still observing unevenness in the distribution of source tasks. Perhaps this additional information could shed light. Version: 1.12.1

Re: User metrics outside tasks

2021-03-11 Thread Bob Tiernay
I too think this would be a useful capability for the job manager to be able to send metrics easily. Sometimes additional compute responsibilities are placed in the job manager and having a convenient way to add telemetry data into a metrics stream would be very useful. -- Sent from: http://apac

Re: Application cluster - Best Practice

2021-03-11 Thread Tamir Sagi
Hey Till, You are right. I'm new to Flink, I was looking for a Java way to deploy an application cluster. I first tried the standalone approach and changed to native (although the official documents specify that application mode is more suitable for production , they show only the CLI way). I

Re: Question about Reactive mode support

2021-03-11 Thread Sonam Mandal
Hi Robert, Thanks for getting back to me. We are currently assessing Flink Standalone on Kubernetes and Native Flink on Kubernetes and haven't yet decided on which model we intend to use. We want to ensure that whichever model we choose, we'll be able to get the benefits of the new features add

Questions with State Processor Api

2021-03-11 Thread Maminspapin
Hi, folk Using State Processor Api can I: 1. get full state of flink-application with rocksdb backend in cluster mode (as I realised it's checkpoins or savepoints)? 2. update it? 3. get this state from other flink-application (other jar)? 4. query it with sql (Table API & SQL) to get data I need?

Questions with State Processor Api

2021-03-11 Thread Maminspapin
Hi, folk Using State Processor Api can I: 1. get full state of flink-application with rocksdb backend in cluster mode (as I realised it's checkpoins or savepoints)? 2. update it? 3. get this state from other flink-application (other jar)? 4. query it with sql (Table API & SQL) to get data I need?

Re: Filtering lines in parquet

2021-03-11 Thread Avi Levi
Hi Arvid, assuming that I have A0,B0,C0 parquet files with different schema and a common field *ID*, I want to write them to A1,B2,C3 files respectively. My problem is that in my code I do not want to know the full schema just by filtering using the ID field and writing the unfiltered lines to the

Re: Application cluster - Best Practice

2021-03-11 Thread Till Rohrmann
What the Flink client does for you when starting an application mode cluster in native K8s mode is to generate the K8s job specification and to submit it to the K8s cluster. Hence, you can also do it yourself if you prefer to manage the K8s resources directly. Writing your own client should also wo

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

2021-03-11 Thread Alexey Trenikhun
Hi Yang, Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true, but perhaps I hit FLINK-21028. This lead to question, if normal via API take-savepoint-and-cancel-job fails, what steps should be done outside Flink to be able to resume from savepoint with new job version? Is del

[Schema Evolution] Cannot restore from savepoint after deleting field from POJO

2021-03-11 Thread Alexis Sarda-Espinosa
Hi everyone, It seems I'm having either the same problem, or a problem similar to the one mentioned here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-when-restoring-from-savepoint-with-missing-state-amp-POJO-modification-td39945.html I have a POJO class that is u

Re: How to do code isolation if muiltple jobs run on the same taskmanager process?

2021-03-11 Thread Arvid Heise
Hi Lei, each application has its own classloader as such each static constant exists multiple times (1 per job). So there should be no interference. You could verify it by logging the value of the constant and see it yourself. Best, Arvid On Thu, Mar 11, 2021 at 7:11 AM Lei Wang wrote: > Cons

Re: Checkpointing Completed and then failed

2021-03-11 Thread Arvid Heise
Hi Abdullah, without specific logs, it's hard to diagnose what went wrong. Could you check in your taskmanager logs if any error occurred and add it? In Flink UI, you can also browse the latest exceptions and look at the checkpoint history. That may give you (and us) additional insights. On Thu,

Re: Problem when restoring from savepoint with missing state & POJO modification

2021-03-11 Thread Arvid Heise
Hi Alexis, could you open a new thread and post your exception? It sounds as if it should work, but it's not for some reason. Did you double check that the PojoSerializer is used? On Wed, Mar 10, 2021 at 10:27 PM sardaesp < alexis.sarda-espin...@microfocus.com> wrote: > I'm having the same issu

Re: mixing java libraries between 1.12.x and 1.11.x

2021-03-11 Thread Arvid Heise
Hi Jin, as Till already answered on the ticket: in general, there is no guarantee that stuff works in between different versions. Everything that builds on public APIs is guaranteed to be forward compatible. However, in this case, you want things to be backward-compatible, which is impossible to b

Re: User metrics outside tasks

2021-03-11 Thread Alexey Trenikhun
Couple of use cases, I have metric representing job version, currently it bound for a task, but I want bound it to job manager, another example I have dump to on OOM exception configured, and on start, I want to check content of directory with dumps and if not empty increase restarted-due-to-OOM

Re: Is there any complete code available for checkpointing

2021-03-11 Thread Abdullah bin Omar
Hi Arvid, Thank you for your reply. I am using to get input by using, DataStream data = env.socketTextStream("localhost", 9090); It shows a error: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed Is there any particular rule to get inpu

Re: Flink application has slightly data loss using Processing Time

2021-03-11 Thread Rainie Li
Thanks for the suggestion, Arvid. Currently my job is using producer.kafka.request.timeout.ms=9 I will try to increase to 12. Best regards Rainie On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise wrote: > Hi Rainie, > > This looks like the record batching in Kafka producer timed out. At this

Re: Application cluster - Best Practice

2021-03-11 Thread Tamir Sagi
Hey Till, Thank you for responding. I've already read the link you send , but they are not enough , they don't provide a good solution for production. Standalone-Kubernetes is not a good approach for production for 3 main reasons(In my opinion): * TMs are defined as deployment which means

Re: KeyedProcessFunction

2021-03-11 Thread Maminspapin
I missed in documentation: A KeyedProcessFunction is always a RichFunction. Therefore, access to the RuntimeContext is always available and setup and teardown methods can be implemented. See RichFunction.open(org.apache.flink.configuration.Configuration) and RichFunction.close(). https://ci.apach

Re: Is there any complete code available for checkpointing

2021-03-11 Thread Arvid Heise
Hi Abdullah, You don't need to implement checkpointed functions for checkpointing to work - but you may lose state if you manage it manually. If you have enabled checkpointing, you should see it with any application that is running. Make sure that the checkpointing interval is small enough so tha

Re: Flink Non-Heap Memory Configuration

2021-03-11 Thread Arvid Heise
Hi Jan, 10 MB sounds very tight. How much memory are you giving your JVM? Are you loading big data structures in your user-defined functions? You can read about Flink's memory here [1]. You may need to lower memory fractions or set .max if you provide only little RAM to your JVM. I'm also pullin

Re: Gradually increasing checkpoint size

2021-03-11 Thread Dawid Wysakowicz
Hey Dan, I think the logic should be correct. Mind that in the processElement we are using *relative*Upper/LowerBound, which are inverted global bound: relativeUpperBound = upperBound for left and -lowerBound for right relativeLowerBound = lowerBound for left and -upperBound for right Therefore

Re: Question about Reactive mode support

2021-03-11 Thread Robert Metzger
Hey Sonam, I'm very happy to hear that you are interested in reactive mode. Your understanding of the limitations for 1.13 is correct. Note that you can deploy standalone Flink on Kubernetes [1]. I'm actually currently preparing a demo for this [2]. We are certainly aware that support for active

Re: Evenly Spreading Out Source Tasks

2021-03-11 Thread Arvid Heise
Hi Aeden, the option that you mentioned should have actually caused your desired behavior. Can you double-check that it's set for the job (you can look at the config in the Flink UI to be 100% sure). Another option is to simply give all task managers 2 slots. In that way, the scheduler can only e

Re: User metrics outside tasks

2021-03-11 Thread Arvid Heise
Hi Alexey, could you describe what you want to achieve? Most metrics are bound to a specific task (available in RuntimeContext). You can also access them in custom operators and state backends. Then you have some metrics bound to taskmanager and even java processes, but I don't see an easy way to

Re: How to debug checkpoint/savepoint stuck in Flink 1.12.2

2021-03-11 Thread Arvid Heise
Hi ChangZhuo, Did you upgrade to Flink 1.12.2 and change the settings at the time? If so, could you maybe reset the settings to the old values on Flink 1.12.2 and check if the job still gets stuck? Especially, turning off unaligned checkpoints (UC) should clarify if it's a general issue in Flink 1

KeyedProcessFunction

2021-03-11 Thread Maminspapin
Hello, I'm learning State Processor API: https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html There is example in this page with StatefulFunctionWithTime extends KeyedProcessFunction. And here we can see method open() we need implement to initialize state. But

Re: Future of QueryableState

2021-03-11 Thread Arvid Heise
Hi Maciek, Thanks for reaching out. Only through these interactions, we know how important certain features are to users. Queryable State has some limitations and makes the whole system rather fragile. Most users that try it out are disappointed that there is actually no SQL support. If we could

Re: io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.channel.MultithreadEventLoopGroup

2021-03-11 Thread Arvid Heise
Hi Hemant, Yes, this looks like an issue with different library versions. You probably have 3 solutions: * use the netty version of Flink * shade your netty into your jar with relocations * ditch jasync and just use jdbc with a custom thread pool (little overhead) On Wed, Mar 10, 2021 at 2:40 PM

Re: Filtering lines in parquet

2021-03-11 Thread Arvid Heise
Hi Avi, I'm not entirely sure I understand the question. Let's say you have source A, B, C all with different schema but all have an id. You could use the ParquetMapInputFormat that provides a map of the records and just use a map-lookup. However, I'm not sure how you want to write these records

Re: Flink application has slightly data loss using Processing Time

2021-03-11 Thread Arvid Heise
Hi Rainie, This looks like the record batching in Kafka producer timed out. At this point, the respective records are lost forever. You probably want to tweak your Kafka settings [1]. Usually, Flink should fail and restart at this point and recover without data loss. However, if the transactions

Re: How do I call an algorithm written in C++ in Flink?

2021-03-11 Thread Arvid Heise
Hi Suxi, to expand a bit on the answer of Yun: it depends on which kind of algorithm do you have. If you want to apply your C++ function to each record, then you can go Yun's way and use a RichMapFunction to load your library and invoke it for each record in map. If you need more records, then you

uniqueness of name when constructing a StateDescriptor

2021-03-11 Thread Colletta, Edward
The documentation for ValueStateDescriptor documents the name parameter as - "name - The (unique) name for the state." What is the scope of the uniqueness? Unique within an RichFunction instance? Unique withing job? Unique within a session cluster? I ask because I have several jobs that use a K

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

2021-03-11 Thread Yang Wang
Hi Alexey, >From your attached logs, it seems that the leader related config map is reused. Then the Flink application is recovered instead of submitting a new one. This is the root cause it is trying to recover from a wrong savepoint which is specified in your last submission. > So how to fix th

Re: Error Starting PyFlink in Kubernetes Session Cluster "Could Not Get Rest Endpoint"

2021-03-11 Thread Yang Wang
What I mean is you could create a K8s deployment using the Flink image just like following. After then, you could use "kubectl exec -it {pod_name} bash" to tunnel in and submit the Flink python application to the existing session cluster. apiVersion: apps/v1 kind: Deployment metadata: name: fli

Re:Re: Trigger and completed Checkpointing do not appeared

2021-03-11 Thread Smile@LETTers
Hi, In short, [1] means whether the job will trigger checkpoints, and [2] means which operators will take action when checkpoints are triggered. If use ExampleCountSource, flink-streaming-java should be a dependency in pom.xml and classes such as ListState, ListStateDescriptor, FunctionInitial

Jobmanager time out / long running batch job

2021-03-11 Thread Jan Oelschlegel
Hi, Im using Flink 1.11.3 and run a batch job. In the log of the jobmanager I see that all operators switched from running to finished. And then there is a timeout of the jobmanager. And after some pause the overall status is switched from running to finished. Why is there a big gap in betwee

Re: Upgrade calcite version

2021-03-11 Thread Jingsong Li
Hi, Yes, as Danny said, it is very hard work... A suggestion is that you can cherry-pick some bugfixs from the new Calcite version to your own internal Calcite branch, if you just want to fix some bugs. Best, Jingsong On Thu, Mar 11, 2021 at 2:28 PM Danny Chan wrote: > Hi Sheng ~ > > It is a