Hi Marco,

For the remaining issues, 

1. For the aggregation, the 500GB of files are not required to be fit into 
memory.
Rough speaking for the keyed().window().reduce(), the input records would be 
first
sort according to the key (time_series.name) via external sorts, which only 
consumes
a fix amount of memory. Then for all the records of each key, flink would use a 
special 
single key statebackend to hold the intermediate result for this key, whose 
memory
consumption usually could be ignored. 
2. The first operator (namely the source) would read the files direclty and 
emit to the
following operators directly, thus there should be no intermediate result 
before the first
operator. 
3. I wonder now flink does not support using S3 to store the intermediate 
result, since it
relies on local I/O mechanisms like mmap or local file read/write, and S3 seems 
not
support. EBS seems to be ok.
4. The heartbeat timeout happens normally due to akka thread get blocked or 
network issue. 
To check if thread get blocked, you may first check the GC log to see if there 
are long full gc, 
if not, then check if the JM or TM akka thread get blocked via thread dump. If 
it seems to be 
the network issues, the job could configure heartbeat.timeout [1] to increase 
the timeout.

Best,
Yun

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout



 ------------------Original Mail ------------------
Sender:Marco Villalobos <mvillalo...@kineteque.com>
Send Date:Wed May 19 14:03:48 2021
Recipients:user <user@flink.apache.org>
Subject:Questions Flink DataStream in BATCH execution mode scalability advice

Questions Flink DataStream in BATCH execution mode scalability advice.

Here is the problem that I am trying to solve.

Input is an S3 bucket directory with about 500 GB of data across many files. 
The instance that I am running on only has 50GB of EBS storage. The nature of 
this data is time series data. Imagine name, value, timestamp.

I must average the time_series.value by time_series.name on a tumbling window 
of 15 minutes. Upon aggregation, the time_series.timestamp gets rounded up a 
quarter.  I key by tag name and 15-minute interval.

After aggregation, I must forward fill the missing quarters for each 
time_series.name. Currently, this forward fill operator is keyed only by 
time_series.name. Does this mean that in batch mode, all of the time series 
with the same time_series.name within the 500 gb of files must fit in memory?

The results are saved in a rdbms.

If this job somehow reads all 500 GB before it sends it to the first operator, 
where is the data store?

Now considering that the EMR node only has 50GB of ebs (that's disk storage), 
is there a means to configure Flink to store its intermediate results within S3?

When the job failed, I saw this exception in the log: "Recovery is suppressed 
by NoRestartBackoffTimeStrategy." Is there a way to configure this to recover?

My job keeps on failing for the same reason, it says, "The heartbeat of 
TaskManager with id container_xxx timed out." Is there a way to configure it 
not to timeout?

I would appreciate any advice on how I should save these problems. Thank you.


Reply via email to