I am using Samza 1.0, yes. The stacktrace is:
19:24:49.326 [Samza StreamProcessor Container Thread-0] ERROR
org.apache.samza.processor.StreamProcessor - Container:
org.apache.samza.container.SamzaContainer@3e923d9e failed with an exception.
Stopping the stream processor: c13057a8-42c5-4b68-9f73-29138fc6eb89. Original
exception:
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Callback
failed for task Partition 0, ssp SystemStreamPartition [kafka, audit-events,
0], offset 682227.
at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:150)
~[audit-app.jar:?]
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:829)
[audit-app.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_181]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_181]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: org.apache.samza.SamzaException: Callback failed for task Partition
0, ssp SystemStreamPartition [kafka, audit-events, 0], offset 682227.
at org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89)
~[audit-app.jar:?]
at
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75)
~[audit-app.jar:?]
at
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
~[audit-app.jar:?]
at
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
~[audit-app.jar:?]
at
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
~[audit-app.jar:?]
at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
~[audit-app.jar:?]
at
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
~[audit-app.jar:?]
at
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
~[audit-app.jar:?]
at
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
~[audit-app.jar:?]
at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
~[audit-app.jar:?]
at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
~[audit-app.jar:?]
... 6 more
19:24:49.355 [Samza StreamProcessor Container Thread-0] WARN
org.apache.samza.container.SamzaContainer - Shutdown is no-op since the
container is already in state: FAILED
I have left out the root exception thrown by my operator, but hopefully
this is sufficient. After this, the app continues to run and the status
remains "RUNNING".
Prateek Maheshwari <prateek...@gmail.com> writes:
Hi Tom,
This sounds like a bug. ApplicationRunner should return the correct status
when the processor has shut down. We fixed a similar standalone bug
recently, are you already using Samza 1.0.
If this is reproducible / happens again, a thread dump + logs would also be
very helpful for debugging and verifying if the issue is already fixed.
Thanks,
Prateek
On Fri, Mar 22, 2019 at 7:23 AM Tom Davis <t...@recursivedream.com> wrote:
Prateek Maheshwari <prateek...@gmail.com> writes:
> Hi Tom,
>
> This would depend on what your k8s container orchestration logic looks
> like. For example, in YARN, 'status' returns 'not running' after 'start'
> until all the containers requested from the AM are 'running'. We also
> leverage YARN to restart containers/job automatically on failures (within
> some bounds). Additionally, we set up a monitoring alert that goes off if
> the number of running containers stays lower than the number of expected
> containers for extended periods of time (~ 5 minutes).
>
> Are you saying that you noticed that the LocalApplicationRunner status
> returns 'running' even if its stream processor / SamzaContainer has
stopped
> processing?
>
Yeah, this is what I mean. We have a health check for the overall
ApplicationStatus but if the containers enter a failed state that
doesn't result in a shut down of the runner itself. An example from last
night: Kafka became unavailable at some point and Samza failed to write
checkpoints for a while, ultimately leading to container failures. The
last log line is:
o.a.s.c.SamzaContainer - Shutdown is no-op since the container is already
in
state: FAILED
This doesn't cause the Pod to be killed, though, so we just silently
stop processing events. How do you determine the number of expected
containers? Or are you speaking of containers in terms of YARN and not
Samza processors?
>
> - Prateek
>
> On Fri, Mar 15, 2019 at 7:26 AM Tom Davis <t...@recursivedream.com>
wrote:
>
>> I'm using the LocalApplicationRunner and had added a liveness check
>> around the `status` method. The app is running in Kubernetes so, in
>> theory, it could be restarted if exceptions happened during processing.
>> However, it seems that "container failure" is divorced from "app
>> failure" because the app continues to run even after all the task
>> containers have shut down. Is there a better way to check for
>> application health? Is there a way to shut down the application if all
>> containers have failed? Should I simply ensure exceptions never escape
>> operators? Thanks!
>>