The discussion on dropping the ES5 connector was not conclusive, when we
discussed it in February 2020. We wanted to revisit it for the 1.12 release.
>From maven central, we have the following download numbers
ES2: 500 downloads
ES5: 10500 downloads (the es5_2.10:1.3.1 had 8000 downloads last mont
Ok, I see your problem. And yes, keeping a map of metrics should work.
Just for double checking, I assume there's an upper bound of your map keys
(table names)?
Because if not, an infinitely increasing in-memory map that is not managed
by Flink's state might become problematic.
Thank you~
Xinton
Hi Yun,
Thanks for the info. These materials help a lot.
Best Regards
Peter Huang
On Thu, Jul 2, 2020 at 11:36 PM Yun Tang wrote:
> Hi Peter
>
> This is a general problem and you could refer to RocksDB's tuning
> guides[1][2], you could also refer to Flink built-in PredefinedOptions.java
> [3
Seems there's no direct solution.
Perhaps i can implement this by initializing a HashMap with all
the possible value of tableName in `open` mehtod and get the corresponding
Meter according to tableName in the `invoke` method.
Thanks,
Lei
wangl...@geekplus.com.cn
Sender: wangl...@geekp
Hi Peter
This is a general problem and you could refer to RocksDB's tuning guides[1][2],
you could also refer to Flink built-in PredefinedOptions.java [3].
Generally speaking, increase write buffer size to reduce write amplification,
increase the parallelism of keyed operator to share the pressu
Hi,yun tang
I dont open checkpoint,so when my job restart,flink how to clean history state?
my pod be killed only happend after the job restart again and again, in this
case ,I have to rebuild the flink cluster 。
| |
a511955993
|
|
邮箱:a511955...@163.com
|
签名由 网易邮箱大师 定制
On 07/03/2020 14:
Hi Xintong,
Yes, initializing the metric in the `open` method works, but it doesn't solve
my problem.
I want to initialize the metric with a name that is extracted from the record
content. Only in the `invoke` method i can do it.
Actually my scenario is as follows.
The record is MySQL binlog
Hi Felipe,
I noticed my previous mail has a typo: RocksDB is executed in task main thread
which does not take the role to respond to heart beat. Sorry for previous typo,
and the key point I want to clarify is that RocksDB should not have business
for heartbeat problem.
Best
Yun Tang
__
Hi
If your job does not need checkpoint, why you would still restore your job with
checkpoints?
Actually, I did not total understand what you want, are you afraid that the
state restored from last checkpoint would not be cleared? Since the event timer
is also stored in checkpoint, after you re
Hi
My job work on flink 1.10.1 with event time , container memory usage will rise
2G after one restart,then pod will be killed by os after some times restart。
I find history data will be cleared when new data arrive, call the function
onEventTime() to clearAllState.But my job no need Checkpo
Hi,
I have a stateful Flink job with 500k QPS. The job basically counts the
message number on a combination key with 10 minutes tumbling window. If I
use memory state backend, the job can run without lag but periodically
fails due to OOM. If I turn up RocksDB state backend, it will have a high
Ka
But would it be possible to somehow use AvroSerializer for now?
Best,
Georg
Am Do., 2. Juli 2020 um 23:44 Uhr schrieb Georg Heiler <
georg.kf.hei...@gmail.com>:
> What is the suggested workaround for now?
>
>
> Thanks!
>
> Aljoscha Krettek schrieb am Do. 2. Juli 2020 um
> 20:55:
>
>> Hi Georg,
Hi Lei,
I think you should initialize the metric in the `open` method. Then you can
save the initialized metric as a class field, and update it in the `invoke`
method for each record.
Thank you~
Xintong Song
On Fri, Jul 3, 2020 at 11:50 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn>
The configuration option you're looking for is `akka.ask.timeout`.
However, I'm not sure increasing this configuration would help in your
case. The error message shows that there is a timeout on a local message.
It is wired a local message does not get replied within 10 sec. I would
suggest to lo
Hi, Thanks for your help.
The checkpoint configuration is
checkpoint.intervalMS=30
checkpoint.timeoutMS=30
The error callstack is from JM's log, which happened in every cp. Currently
I don't have a success cp yet.
Khachatryan Roman 于2020年7月3日周五 上午3:50写道:
> Hi,
>
> Thanks for the detai
I agree with Roman's suggestion for increasing heap size.
It seems that the heap grows faster than freed. Thus eventually the Full GC
is triggered, taking more than 50s and causing the timeout. However, even
the full GC frees only 2GB space out of the 28GB max size. That probably
suggests that the
In one flink operator, i want to initialize multiple flink metrics according to
message content.
As the code below.
public void invoke(ObjectNode node, Context context) throws Exception {
String tableName = node.get("metadata").get("topic").asText();
Meter meter = getRuntimeContext().g
What is the suggested workaround for now?
Thanks!
Aljoscha Krettek schrieb am Do. 2. Juli 2020 um 20:55:
> Hi Georg,
>
> unfortunately, it seems I only fixed the issue for AvroSerializer and
> not for AvroDeserializationSchema. I created a new issue (which is a
> clone of the old one) to track
I am trying to read a parquet file into a datastream and then register that
stream as a temporary table. This file is created by spark 2.4 in HDFS on AWS
EMR. I am using flink version 1.10.0 with EMR 5.30.
I am getting the following error:
Caused by: org.apache.flink.streaming.runtime.tasks.Asy
Hi,
I am running my Flink jobs on EMR and i didn't include any
log4j.properties as part of my JAR and i am using slf4j (and included the
dependent jars in the uber jar i created) and logging in my app.
When i run my everything is running fine except i cannot find my
application logs any where
Hi:
I am using Flink 1.10 on AWS EMR cluster.
We are getting AskTimeoutExceptions which is causing the flink jobs to die.
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/resourcemanager#-1602864959]] after [1 ms].
Message of type [org.apache.flink.run
Hi,
Thanks for the details.
However, I was not able to reproduce the issue. I used parallelism levels
4, file system backend and tried different timings for
checkpointing, windowing and source.
Do you encounter this problem deterministically, is it always 1st
checkpoint?
What checkpointing interva
Hi Sateesh,
Note that there are three classes of log files, when running Flink on EMR:
1. The output from the main class.
Since I typically run the job by sshing onto the master and using the CLI from
there, I have control over where that output goes. E.g.
nohup bin/flink run -m yarn-cluster -
Hi Georg,
unfortunately, it seems I only fixed the issue for AvroSerializer and
not for AvroDeserializationSchema. I created a new issue (which is a
clone of the old one) to track this [1]. The fix should be very simple
since it's the same issue.
Best,
Aljoscha
[1] https://issues.apache.org
Hi,
based on Gordons excellent advice on how to handle JSON messages with
remote functions
(https://www.mail-archive.com/user@flink.apache.org/msg34385.html) I was
able to:
1) Deserialize JSON Messages from a Kafka Stream
2) Route the message to an embedded StatefulFunction
3) Serialize th
Hi Linlin,
There may be a historic confusion in terminology.
We often refer to 'JobManager' as a component which manages a single job.
Names of all related classes usually contain 'JobManager'.
At the same time, we can refer to it as a master process in Flink's
cluster, potentially running multipl
Thank you very much for your analysis.
When I said there was no memory leak - I meant that from the specific
TaskManager I monitored in real-time using JProfiler.
Unfortunately, this problem occurs only in 1 of the TaskManager and you
cannot anticipate which. So when you pick a TM to profile at ra
Hi, Manas
You need to define the schema. You can refer to the following example:
t_env.connect(
Kafka()
.version('0.11')
.topic(INPUT_TOPIC)
.property("bootstrap.servers", PROD_KAFKA)
.property("zookeeper.connect", "localhost:2181")
.start_from_latest()
Thanks, Ori
>From the log, it looks like there IS a memory leak.
At 10:12:53 there was the last "successfull" gc when 13Gb freed in
0.4653809 secs:
[Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M Heap:
23280.3M(28960.0M)->10047.0M(28960.0M)]
Then the heap grew from 10G to 2
Hi,
I'm trying to get a simple consumer/producer running using the following
code referred from the provided links :
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, BatchTableEnvironment,
DataTypes, S
Hi, this is our production code so I have to modify it a little bit, such
as variable name and function name. I think 3 classes I provide here is
enough.
I try to join two streams, but I don't want to use the default join
function, because I want to send the joined log immediately and remove it
fr
Thanks for the clarification.
Can you also share the code of other parts, particularly MyFunction?
Regards,
Roman
On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu wrote:
> Rocksdb backend has the same problem
>
> Khachatryan Roman 于2020年7月2日周四 下午6:11写道:
>
>> Thanks for reporting this.
>>
>> Looks l
Rocksdb backend has the same problem
Khachatryan Roman 于2020年7月2日周四 下午6:11写道:
> Thanks for reporting this.
>
> Looks like the window namespace was replaced by VoidNamespace in state
> entry.
> I've created https://issues.apache.org/jira/browse/FLINK-18464 to further
> investigate it.
>
> Regards
Thanks for reporting this.
Looks like the window namespace was replaced by VoidNamespace in state
entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further
investigate it.
Regards,
Roman
On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu wrote:
> I'm using flink 1.9 on Mesos and
Hi
I need to set up a dockerized *session cluster* using Flink *1.8.2* for
development and troubleshooting. We are bound to 1.8.2 as we are deploying
to AWS Kinesis Data Analytics for Flink.
I am using an image based on the semi-official flink:1.8-scala_2.11
I need to add to my dockerized cluster
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor.
The state is stored to memory.
input.setParallelism(processParallelism)
.assignTimestampsAndWatermarks(new UETimeAssigner)
.keyBy(_.key)
.window(TumblingEventTimeWindows.of(Time.minutes(20)))
Hi Doinesh,
I think the problem you meet is quite common.
But with the current Flink architecture, operators must be determined at
compile time (when you submit your job). This is by design IIUC.
Suppose the operators are changeable, then Flink would need to go through the
compile-optimize-sch
37 matches
Mail list logo