Hi XiaoChuan, Are you setting task.max.concurrency > 1 that allows multiple messages in-flight? (The "keyed executor pool" is only meaningful with that scenario)
Also, Have you tried increasing your *job.container.thread.pool.size *config and setting it to the number of tasks in the container? Given that your input topic is already partitioned by memberID, it'll probably be simpler to try this first, benchmark your QPS and see if it meets your performance goals. I'd tune these config-knobs first and confirm that you need the "keyed executor thread pool". You may find that it introduces more complexity. Please let us know if you had further questions. We are happy to further help you tune your job for maximum performance. On Wed, Aug 9, 2017 at 4:03 PM, xinyu liu <xinyuliu...@gmail.com> wrote: > Hi, XiaoChuan, > > For your questions: > > 1. By "keyed single thread executor pool", it means something like a map > from a key to a single thread executor, like Map<String, Executor> where > each Executor is a Executors.*newSingleThreadExecutor > <https://docs.oracle.com/javase/7/docs/api/java/util/ > concurrent/Executors.html#newSingleThreadExecutor()>* > (). This means for a particular key, it will be executed in a designated > thread, which guarantees the ordering of the key. > > 2. For your use case, you can create the above keyed executors by setting > the key being some hash of the user id. For example: > > Map<Integer, Executor> keyedExecutors = new HashMap<>(); > > in processAsync(): > String memberId = .... > int hash = memberId.hashCode(); // you can reduce the hash size by % > Executor executor = keyedExecutors.get(hash); > if (executor == null) { > executor = Executors.newSingleThreadExecutor(); > keyedExecutors.put(hash, executor); > } > > executor.execute(() -> process your message here); > ... > > So the same user will always be executed in a single thread, which ensures > the ordering. Does this make sense to you? > > Thanks, > Xinyu > > > > On Wed, Aug 9, 2017 at 10:07 AM, XiaoChuan Yu <xiaochuan...@kik.com> > wrote: > > > Hi, > > > > I have a few questions regarding the order of processing when using > > processAsync. > > > > From the LinkedIn article here > > <https://engineering.linkedin.com/blog/2017/01/asynchronous- > > processing-and-multithreading-in-apache-samza--part> > > it > > mentions the following: > > "For parallelism within a task, Samza guarantees processAsync will be > > invoked in order for a task. The processing or completion, however, can > go > > out of order. With this guarantee, users can implement sub-task-level > data > > pipelining with customized ordering and parallelism. For example, users > can > > use a keyed single thread executor pool to have in-order processing per > key > > while processing messages with different keys in parallel." > > > > 1. What exactly is meant by a "keyed single thread executor pool"? Are > > there any code examples available on what this looks like? > > 2. I need to process a stream keyed on user IDs in parallel using > > processAsync but would like each user's event be processed in order. Does > > this then require custom ordering logic mentioned in the article? > > > > Thanks, > > Xiaochuan Yu > > > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University