Hello,
This can be caused by several reasons such as back-pressure, large
snapshots or bugs.
Could you please share:
- the stats of the previous (successful) checkpoints
- back-pressure metrics for sources
- which Flink version do you use?
Regards,
Roman
On Thu, Mar 11, 2021 at 7:03 AM Alexey
Hey Sonam,
I'm wondering whether it may be helpful to have a min and max parallelism,
> and the actual parallelism be determined by the scaling policy mentioned
> next?
Yes, that's certainly possible.
Thanks a lot for your input on the design of a scaling policy. Your input
is very valuable fo
Essentially, Does this code leak state
private static class SessionIdProcessWindowFunction
extends
ProcessWindowFunction, KeyedSessionWithSessionID<
KEY, VALUE>, KEY, TimeWindow> {
private static final long serialVersionUID = 1L;
private final static ValueStateDescriptor sessionId = new
ValueState
Hello folks,
The suggestion is to use windowState() for a key key per
window state and clear the state explicitly. Also it seems that
getRuntime().getState() will return a globalWindow() where state is shared
among windows with the same key. I desire of course to have state scope
Hi,
We have multiple jobs that need to be deployed to a Flink cluster. Parallelism
for jobs vary and dependent on the type of work being done and so are the
memory requirements. All jobs currently use the same state backend. Since the
workloads handled by each job is different, the scaling p
Let's close this issue guys please answer my questions. I am using Flink
1.8.1.
Thanks
Sri
On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:
> Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see
> ConfigConstants.ENV_FLINK_LIB_DIR will this
Makes total sense, Thanks, I'll check it out.
On Wed, Mar 10, 2021 at 1:28 PM Till Rohrmann wrote:
> Hi Vishal,
>
> There is no specific reason why Flink does not have a Nomad HA
> implementation other than it has not been done yet. As long as Nomad
> supports leader election, service recovery a
Thanks Arvid,
If too many jobs run in the same task manager JVM, will it cause too much
metaspace memory occupation?
Thanks,
Lei
On Thu, Mar 11, 2021 at 11:54 PM Arvid Heise wrote:
> Hi Lei,
>
> each application has its own classloader as such each static constant
> exists multiple times (1
On Thu, Mar 11, 2021 at 02:14:32PM +0100, Arvid Heise wrote:
> Hi ChangZhuo,
>
> Did you upgrade to Flink 1.12.2 and change the settings at the time? If so,
> could you maybe reset the settings to the old values on Flink 1.12.2 and
> check if the job still gets stuck? Especially, turning off unali
Hi Arvid,
Thanks for responding. I did check the configuration tab of the job
manager and the setting cluster.evenly-spread-out-slots: true is
there. However I'm still observing unevenness in the distribution of
source tasks. Perhaps this additional information could shed light.
Version: 1.12.1
I too think this would be a useful capability for the job manager to be able
to send metrics easily. Sometimes additional compute responsibilities are
placed in the job manager and having a convenient way to add telemetry data
into a metrics stream would be very useful.
--
Sent from: http://apac
Hey Till,
You are right.
I'm new to Flink, I was looking for a Java way to deploy an application
cluster. I first tried the standalone approach and changed to native (although
the official documents specify that application mode is more suitable for
production , they show only the CLI way).
I
Hi Robert,
Thanks for getting back to me. We are currently assessing Flink Standalone on
Kubernetes and Native Flink on Kubernetes and haven't yet decided on which
model we intend to use. We want to ensure that whichever model we choose, we'll
be able to get the benefits of the new features add
Hi, folk
Using State Processor Api can I:
1. get full state of flink-application with rocksdb backend in cluster mode
(as I realised it's checkpoins or savepoints)?
2. update it?
3. get this state from other flink-application (other jar)?
4. query it with sql (Table API & SQL) to get data I need?
Hi, folk
Using State Processor Api can I:
1. get full state of flink-application with rocksdb backend in cluster mode
(as I realised it's checkpoins or savepoints)?
2. update it?
3. get this state from other flink-application (other jar)?
4. query it with sql (Table API & SQL) to get data I need?
Hi Arvid,
assuming that I have A0,B0,C0 parquet files with different schema and a
common field *ID*, I want to write them to A1,B2,C3 files respectively. My
problem is that in my code I do not want to know the full schema just by
filtering using the ID field and writing the unfiltered lines to the
What the Flink client does for you when starting an application mode
cluster in native K8s mode is to generate the K8s job specification and to
submit it to the K8s cluster. Hence, you can also do it yourself if you
prefer to manage the K8s resources directly. Writing your own client should
also wo
Hi Yang,
Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true, but
perhaps I hit FLINK-21028. This lead to question, if normal via API
take-savepoint-and-cancel-job fails, what steps should be done outside Flink to
be able to resume from savepoint with new job version? Is del
Hi everyone,
It seems I'm having either the same problem, or a problem similar to the one
mentioned here:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-when-restoring-from-savepoint-with-missing-state-amp-POJO-modification-td39945.html
I have a POJO class that is u
Hi Lei,
each application has its own classloader as such each static constant
exists multiple times (1 per job). So there should be no interference. You
could verify it by logging the value of the constant and see it yourself.
Best,
Arvid
On Thu, Mar 11, 2021 at 7:11 AM Lei Wang wrote:
> Cons
Hi Abdullah,
without specific logs, it's hard to diagnose what went wrong. Could you
check in your taskmanager logs if any error occurred and add it? In Flink
UI, you can also browse the latest exceptions and look at the checkpoint
history. That may give you (and us) additional insights.
On Thu,
Hi Alexis,
could you open a new thread and post your exception? It sounds as if it
should work, but it's not for some reason.
Did you double check that the PojoSerializer is used?
On Wed, Mar 10, 2021 at 10:27 PM sardaesp <
alexis.sarda-espin...@microfocus.com> wrote:
> I'm having the same issu
Hi Jin,
as Till already answered on the ticket: in general, there is no guarantee
that stuff works in between different versions. Everything that builds on
public APIs is guaranteed to be forward compatible. However, in this case,
you want things to be backward-compatible, which is impossible to b
Couple of use cases, I have metric representing job version, currently it bound
for a task, but I want bound it to job manager, another example I have dump to
on OOM exception configured, and on start, I want to check content of directory
with dumps and if not empty increase restarted-due-to-OOM
Hi Arvid,
Thank you for your reply.
I am using to get input by using,
DataStream data = env.socketTextStream("localhost", 9090);
It shows a error: The main method caused an error:
org.apache.flink.client.program.ProgramInvocationException: Job failed
Is there any particular rule to get inpu
Thanks for the suggestion, Arvid.
Currently my job is using producer.kafka.request.timeout.ms=9
I will try to increase to 12.
Best regards
Rainie
On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise wrote:
> Hi Rainie,
>
> This looks like the record batching in Kafka producer timed out. At this
Hey Till,
Thank you for responding.
I've already read the link you send , but they are not enough , they don't
provide a good solution for production.
Standalone-Kubernetes is not a good approach for production for 3 main
reasons(In my opinion):
* TMs are defined as deployment which means
I missed in documentation:
A KeyedProcessFunction is always a RichFunction. Therefore, access to the
RuntimeContext is always available and setup and teardown methods can be
implemented. See
RichFunction.open(org.apache.flink.configuration.Configuration) and
RichFunction.close().
https://ci.apach
Hi Abdullah,
You don't need to implement checkpointed functions for checkpointing to
work - but you may lose state if you manage it manually.
If you have enabled checkpointing, you should see it with any application
that is running. Make sure that the checkpointing interval is small enough
so tha
Hi Jan,
10 MB sounds very tight. How much memory are you giving your JVM? Are you
loading big data structures in your user-defined functions?
You can read about Flink's memory here [1]. You may need to lower memory
fractions or set .max if you provide only little RAM to your JVM.
I'm also pullin
Hey Dan,
I think the logic should be correct. Mind that in the processElement we
are using *relative*Upper/LowerBound, which are inverted global bound:
relativeUpperBound = upperBound for left and -lowerBound for right
relativeLowerBound = lowerBound for left and -upperBound for right
Therefore
Hey Sonam,
I'm very happy to hear that you are interested in reactive mode. Your
understanding of the limitations for 1.13 is correct. Note that you can
deploy standalone Flink on Kubernetes [1]. I'm actually currently preparing
a demo for this [2].
We are certainly aware that support for active
Hi Aeden,
the option that you mentioned should have actually caused your desired
behavior. Can you double-check that it's set for the job (you can look at
the config in the Flink UI to be 100% sure).
Another option is to simply give all task managers 2 slots. In that way,
the scheduler can only e
Hi Alexey,
could you describe what you want to achieve? Most metrics are bound to a
specific task (available in RuntimeContext). You can also access them in
custom operators and state backends.
Then you have some metrics bound to taskmanager and even java processes,
but I don't see an easy way to
Hi ChangZhuo,
Did you upgrade to Flink 1.12.2 and change the settings at the time? If so,
could you maybe reset the settings to the old values on Flink 1.12.2 and
check if the job still gets stuck? Especially, turning off unaligned
checkpoints (UC) should clarify if it's a general issue in Flink 1
Hello,
I'm learning State Processor API:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
There is example in this page with StatefulFunctionWithTime extends
KeyedProcessFunction. And here we can see method open() we need implement to
initialize state. But
Hi Maciek,
Thanks for reaching out. Only through these interactions, we know how
important certain features are to users.
Queryable State has some limitations and makes the whole system rather
fragile. Most users that try it out are disappointed that there is actually
no SQL support. If we could
Hi Hemant,
Yes, this looks like an issue with different library versions. You probably
have 3 solutions:
* use the netty version of Flink
* shade your netty into your jar with relocations
* ditch jasync and just use jdbc with a custom thread pool (little overhead)
On Wed, Mar 10, 2021 at 2:40 PM
Hi Avi,
I'm not entirely sure I understand the question. Let's say you have source
A, B, C all with different schema but all have an id. You could use the
ParquetMapInputFormat that provides a map of the records and just use a
map-lookup.
However, I'm not sure how you want to write these records
Hi Rainie,
This looks like the record batching in Kafka producer timed out. At this
point, the respective records are lost forever. You probably want to tweak
your Kafka settings [1].
Usually, Flink should fail and restart at this point and recover without
data loss. However, if the transactions
Hi Suxi,
to expand a bit on the answer of Yun: it depends on which kind of algorithm
do you have. If you want to apply your C++ function to each record, then
you can go Yun's way and use a RichMapFunction to load your library and
invoke it for each record in map. If you need more records, then you
The documentation for ValueStateDescriptor documents the name parameter as -
"name - The (unique) name for the state."
What is the scope of the uniqueness? Unique within an RichFunction instance?
Unique withing job? Unique within a session cluster?
I ask because I have several jobs that use a K
Hi Alexey,
>From your attached logs, it seems that the leader related config map is
reused.
Then the Flink application is recovered instead of submitting a new one.
This is
the root cause it is trying to recover from a wrong savepoint which is
specified in
your last submission.
> So how to fix th
What I mean is you could create a K8s deployment using the Flink image just
like following.
After then, you could use "kubectl exec -it {pod_name} bash" to tunnel in
and submit the Flink
python application to the existing session cluster.
apiVersion: apps/v1
kind: Deployment
metadata:
name: fli
Hi,
In short, [1] means whether the job will trigger checkpoints, and [2] means
which operators will take action when checkpoints are triggered.
If use ExampleCountSource, flink-streaming-java should be a dependency in
pom.xml and classes such as ListState, ListStateDescriptor,
FunctionInitial
Hi,
Im using Flink 1.11.3 and run a batch job. In the log of the jobmanager I see
that all operators switched from running to finished. And then there is a
timeout of the jobmanager. And after some pause the overall status is switched
from running to finished.
Why is there a big gap in betwee
Hi,
Yes, as Danny said, it is very hard work...
A suggestion is that you can cherry-pick some bugfixs from the new Calcite
version to your own internal Calcite branch, if you just want to fix some
bugs.
Best,
Jingsong
On Thu, Mar 11, 2021 at 2:28 PM Danny Chan wrote:
> Hi Sheng ~
>
> It is a
47 matches
Mail list logo