turcsanyip edited a comment on pull request #4822: URL: https://github.com/apache/nifi/pull/4822#issuecomment-808562542
@ChrisSamo632 Thanks for picking up this task and implementing this non-trivial processor! I did not review the code in detail (yet) but spotted some issues regarding thread handling: The `onTrigger()` method should not block its thread. In NiFi, there is a shared thread pool for executing all processors' `onTrigger()` methods. If some processors held up execution threads, it could lead to starvation of other processors. I think a similar pattern could be used as in case of `ConsumeAzureEventHub`: start up the Worker at the first execution of `onTrigger()` and then just yield in the subsequent calls. I believe one Worker per processor should be enough and it is not necessary to maintain a pool of workers. A single Worker can run multiple threads for executing the Kinesis RecordProcessors. As far as I saw, the Worker spins up a thread for each assigned shard by default (RecordProcessor-xxxx threads). So the parallel processing is provided with one Worker too. The code is simpler in this way and there is less overhead at runtime (each Worker has its own "maintenance" threads like LeaseRenewer-xxxx, LeaseCoordinator-xxxx). The processor cannot stop cleanly due to a bug in KCL. Interestingly, it has just been reported to AWS by someone else: https://github.com/awslabs/amazon-kinesis-client/issues/796 When the processor shuts down the Worker, the Worker leaves behind a thread running. Stopping/starting the processor multiple times would lead to thread leaking. Furthermore, the "zombie" thread(s) prevent NiFi to shut down properly: `2021-03-26 12:12:52,715 WARN [main] org.apache.nifi.bootstrap.Command NiFi has not finished shutting down after 20 seconds. Killing process.` For this reason, I think we need to downgrade the KCL version to 1.13.3 until the bug is fixed. And an idea: the way the processor receives data via KCL is quite similar to other Consume*** processors (like the already mentioned `ConsumeAzureEventHub` or `ConsumeAMQP`), so I would consider to name the processor `ConsumeKinesisStream` instead of `Get~` (actually there is a Get-like/polling API for Kinesis too and another processor may be implemented later using that API). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
