Hi, Xiaochuan, Please refer to the document here: https://samza.apache.org/learn/tutorials/0.13/samza-async-user-guide.html
As stated, by default, when task.max.concurrency=1, in-order processing within a task is guaranteed, no matter whether you implement StreamTask or AsyncStreamTask. The engineering blog that you referred to at the beginning is referring to AsyncStreamTask cases, where the completion of the pending process() calls may need special treatment to ensure in-order completion (note that in-order invocation of process() calls within a task is always guaranteed). For your use case, I would suggest to: - use StreamTask - configure x number of threads in the thread pool - set task.max.concurrency = 1 What you get here from the above configuration is: - in-order process() invocation and completion within a task - maximum of x tasks can be executed in parallel in a container If that fits your performance requirement, it would be the simplest configuration for your use case. Otherwise, let us know if you want to explore options to maintain in-order process() completion with task.max.concurrency > 1. Best! -Yi On Fri, Aug 25, 2017 at 5:02 PM, XiaoChuan Yu <xiaochuan...@kik.com> wrote: > Hi Jagadish, > > This is a rather late reply but I don't think I understand of the effect of > changing job.container.thread.pool.size config in the synchronous case very > well. > Suppose I have an input topic partitioned by memberID processed by a > synchronous job. > Does increasing container thread pool size still guarantee that the > messages will be processed serially for messages with the same key? > For example, if I had 2 messages msg1 followed by msg2 with the same key > then is it guaranteed that msg1 will finish processing before starting to > process msg2? > > My actual situation is that we currently have a job that has maxed out the > allowed number of containers but is still not processing messages fast > enough. > We want to speedup processing but we would like to avoid increasing the > number of partitions of the input topic. > The job is IO bound and has the requirement that messages with the same key > are processed serially. > > Thanks, > Xiaochuan Yu > > On Thu, Aug 10, 2017 at 11:28 PM Jagadish Venkatraman < > jagadish1...@gmail.com> wrote: > > > Hi XiaoChuan, > > > > Are you setting task.max.concurrency > 1 that allows multiple messages > > in-flight? (The "keyed executor pool" is only meaningful with that > > scenario) > > > > Also, Have you tried increasing your *job.container.thread.pool.size > > *config > > and setting it to the number of tasks in the container? Given that your > > input topic is already partitioned by memberID, it'll probably be simpler > > to try this first, benchmark your QPS and see if it meets your > performance > > goals. I'd tune these config-knobs first and confirm that you need the > > "keyed executor thread pool". You may find that it introduces more > > complexity. > > > > Please let us know if you had further questions. We are happy to further > > help you tune your job for maximum performance. > > > > > > > > On Wed, Aug 9, 2017 at 4:03 PM, xinyu liu <xinyuliu...@gmail.com> wrote: > > > > > Hi, XiaoChuan, > > > > > > For your questions: > > > > > > 1. By "keyed single thread executor pool", it means something like a > map > > > from a key to a single thread executor, like Map<String, Executor> > where > > > each Executor is a Executors.*newSingleThreadExecutor > > > <https://docs.oracle.com/javase/7/docs/api/java/util/ > > > concurrent/Executors.html#newSingleThreadExecutor()>* > > > (). This means for a particular key, it will be executed in a > designated > > > thread, which guarantees the ordering of the key. > > > > > > 2. For your use case, you can create the above keyed executors by > setting > > > the key being some hash of the user id. For example: > > > > > > Map<Integer, Executor> keyedExecutors = new HashMap<>(); > > > > > > in processAsync(): > > > String memberId = .... > > > int hash = memberId.hashCode(); // you can reduce the hash size by % > > > Executor executor = keyedExecutors.get(hash); > > > if (executor == null) { > > > executor = Executors.newSingleThreadExecutor(); > > > keyedExecutors.put(hash, executor); > > > } > > > > > > executor.execute(() -> process your message here); > > > ... > > > > > > So the same user will always be executed in a single thread, which > > ensures > > > the ordering. Does this make sense to you? > > > > > > Thanks, > > > Xinyu > > > > > > > > > > > > On Wed, Aug 9, 2017 at 10:07 AM, XiaoChuan Yu <xiaochuan...@kik.com> > > > wrote: > > > > > > > Hi, > > > > > > > > I have a few questions regarding the order of processing when using > > > > processAsync. > > > > > > > > From the LinkedIn article here > > > > <https://engineering.linkedin.com/blog/2017/01/asynchronous- > > > > processing-and-multithreading-in-apache-samza--part> > > > > it > > > > mentions the following: > > > > "For parallelism within a task, Samza guarantees processAsync will be > > > > invoked in order for a task. The processing or completion, however, > can > > > go > > > > out of order. With this guarantee, users can implement sub-task-level > > > data > > > > pipelining with customized ordering and parallelism. For example, > users > > > can > > > > use a keyed single thread executor pool to have in-order processing > per > > > key > > > > while processing messages with different keys in parallel." > > > > > > > > 1. What exactly is meant by a "keyed single thread executor pool"? > Are > > > > there any code examples available on what this looks like? > > > > 2. I need to process a stream keyed on user IDs in parallel using > > > > processAsync but would like each user's event be processed in order. > > Does > > > > this then require custom ordering logic mentioned in the article? > > > > > > > > Thanks, > > > > Xiaochuan Yu > > > > > > > > > > > > > > > -- > > Jagadish V, > > Graduate Student, > > Department of Computer Science, > > Stanford University > > >