turcsanyip edited a comment on pull request #4822:
URL: https://github.com/apache/nifi/pull/4822#issuecomment-808562542


   @ChrisSamo632 Thanks for picking up this task and implementing this 
non-trivial processor!
   
   I did not review the code in detail (yet) but spotted some issues regarding 
thread handling:
   
   The `onTrigger()` method should not block its thread. In NiFi, there is a 
shared thread pool for executing all processors' `onTrigger()` methods. If some 
processors held up execution threads, it could lead to starvation of other 
processors.
   I think a similar pattern could be used as in case of 
`ConsumeAzureEventHub`: start up the Worker at the first execution of 
`onTrigger()` and then just yield in the subsequent calls.
   
   I believe one Worker per processor should be enough and it is not necessary 
to maintain a pool of workers. A single Worker can run multiple threads for 
executing the Kinesis RecordProcessors. As far as I saw, the Worker spins up a 
thread for each assigned shard by default (RecordProcessor-xxxx threads). So 
the parallel processing is provided with one Worker too. The code is simpler in 
this way and there is less overhead at runtime (each Worker has its own 
"maintenance" threads like LeaseRenewer-xxxx, LeaseCoordinator-xxxx).
   
   The processor cannot stop cleanly due to a bug in KCL. Interestingly, it has 
just been reported to AWS by someone else: 
https://github.com/awslabs/amazon-kinesis-client/issues/796
   When the processor shuts down the Worker, the Worker leaves behind a thread 
running. Stopping/starting the processor multiple times would lead to thread 
leaking. Furthermore, the "zombie" thread(s) prevent NiFi to shut down properly:
   `2021-03-26 12:12:52,715 WARN [main] org.apache.nifi.bootstrap.Command NiFi 
has not finished shutting down after 20 seconds. Killing process.`
   For this reason, I think we need to downgrade the KCL version to 1.13.3 
until the bug is fixed.
   
   And an idea: the way the processor receives data via KCL is quite similar to 
other Consume*** processors (like the already mentioned `ConsumeAzureEventHub` 
or `ConsumeAMQP`), so I would consider to name the processor 
`ConsumeKinesisStream` instead of `Get~` (actually there is a Get-like/polling 
API for Kinesis too and another processor may be implemented later using that 
API).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to