Hi Till,

When I'm submitting one big job, both JM and TM (sometimes just JM) are
crashing at the time of initialization itself (i.e. not all operators
switch to RUNNING) with OOM. The number of threads on TM go to almost 1000.

But when I'm submitting multiple jobs, job submission is completed. But
when data starts coming in (its a live stream), the task managers memory
usage grows and eventually it crashes.

The patterns I'm trying to match are simple (A followed by B, A followed by
B within X mins etc.), but the number of patterns is large (due to the
reason mentioned in my question 2 below).

Configuration: 1 JM and 1 TM

jobmanager.heap.mb: 512
taskmanager.heap.mb: 3596
taskmanager.numberOfTaskSlots: 5
parallelism.default: 1
jobmanager.rpc.port: 6123
state.backend: filesystem
taskmanager.debug.memory.startLogThread: true
taskmanager.debug.memory.logIntervalMs: 120000
akka.ask.timeout: 2 min
akka.client.timeout: 5 min
akka.framesize: 404857600b
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

I'm submitting 5 jobs, and each job has ~80 operators.

With the above configuration, the job submission is successful, but the
TM's eventually max out their heap usage.

But, as mentioned earlier, when I change the number of slots to 1 and
submit 1 job with 300+ operators, the job submission fails with OOM.

3 questions here:

1. Is it possible to chain multiple CEP operators into a single task? So
that the number of threads is reduced. The reason here is that when I'm
submitting one big job, the OOM always occurs when JVM is trying to create
a new thread.

2. Instead of using a KeyedStream, I'm creating multiple streams per key
(using a filter operator) and then applying all N patterns to that stream.
So essentially it is resulting in M (number of patterns) x N (number of
keys) CEP operators/tasks. The reason behind creating this is that I need
to have different watermarks per key (a key represents a physical source,
and the source time could be different, resulting in events getting
dropped), and I believe generating watermarks per key is not supported yet.
Is this understanding correct? Do you have any ideas/recommendations to
address this use case?

3. How can we benchmark the resources required by JM? Is it OK to assume
that the amount of memory required by JM grows linearly with the total
number of operators deployed?

Thanks,
Shailesh


On Mon, Feb 19, 2018 at 10:18 PM, Till Rohrmann <trohrm...@apache.org>
wrote:

> Hi Shailesh,
>
> my question would be where do you see the OOM happening? Does it happen on
> the JM or the TM.
>
> The memory requirements for each operator strongly depend on the operator
> and it is hard to give a general formula for that. It mostly depends on the
> user function. Flink itself should not need too much extra memory for the
> framework specific code.
>
> CEP, however, can easily add a couple of hundred megabytes to your memory
> requirements. This depends strongly on the pattern you're matching and
> which state backend you're using.
>
> Concerning your question one big job vs. multiple jobs, I could see that
> this helps if not all jobs are executed at the same time. Especially if you
> only have a single TM with a limited number of slots, I think that you
> effectively queue up jobs. That should reduce the required amount of
> resources for each individual job.
>
> Cheers,
> Till
>
> On Mon, Feb 19, 2018 at 11:35 AM, Shailesh Jain <
> shailesh.j...@stellapps.com> wrote:
>
>> Actually, there are too many hyperparameters to experiment with, that is
>> why I'm trying to understand if there is any particular way in which a
>> cluster could be benchmarked.
>>
>> Another strange behaviour I am observing is: Delaying the operator
>> creation (by distributing the operators across jobs, and submitting
>> multiple jobs to the same cluster instead of one) is helping in creating
>> more operators. Any ideas on why that is happening?
>>
>> Shailesh
>>
>>
>> On Sun, Feb 18, 2018 at 11:16 PM, Pawel Bartoszek <
>> pawelbartosze...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> You could definitely try to find formula for heap size, but isnt's it
>>> easier just to try out different memory settings and see which works best
>>> for you?
>>>
>>> Thanks,
>>> Pawel
>>>
>>> 17 lut 2018 12:26 "Shailesh Jain" <shailesh.j...@stellapps.com>
>>> napisaƂ(a):
>>>
>>> Oops, hit send by mistake.
>>>
>>> In the configuration section, it is mentioned that for "many operators"
>>> heap size should be increased.
>>>
>>> "JVM heap size (in megabytes) for the JobManager. You may have to
>>> increase the heap size for the JobManager if you are running very large
>>> applications (with many operators), or if you are keeping a long history of
>>> them."
>>>
>>> Is there any recommendation on the heap space required when there are
>>> around 200 CEP operators, and close 80 Filter operators?
>>>
>>> Any other leads on calculating the expected heap space allocation to
>>> start the job would be really helpful.
>>>
>>> Thanks,
>>> Shailesh
>>>
>>>
>>>
>>> On Sat, Feb 17, 2018 at 5:53 PM, Shailesh Jain <
>>> shailesh.j...@stellapps.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have flink job with almost 300 operators, and every time I'm trying
>>>> to submit the job, the cluster crashes with OutOfMemory exception.
>>>>
>>>> I have 1 job manager and 1 task manager with 2 GB heap space allocated
>>>> to both.
>>>>
>>>> In the configuration section of the documentation
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>

Reply via email to