Hi Susan, -- " I have a custom SystemStreamPartitionGrouper" -- this is a very good try!
-- "However, it looks like messages from the Rules stream are going to only one task instance, and not to all of them like I hoped. I have them running in the same container. Is it because there is only one offset value for the container for the Rules stream?" You are right in the guess. Currently there is only one consumer for each container. The consumer accepts the SystemStreamPartitio (SSP) from all the task Instances as a Set<SSP>. So if there are two Rules Streams, it only stores one of them. That is why even you assign the Rules stream to all the tasks, if they are running in the same container, there is only one Rules stream assigned to arbitrary one of the task instances in this container. So by using your custom SystemStreamPartitionGrouper, one way of solving this is to bring up as many containers as your task number (that is the biggest partition number of your input stream by default). This will make your job work. The limitation is that, it takes more resources. Another way is to change the SystemConsumer. This requires a little more work. If you do it in your application side, maybe a little easier. From the Samza's perspective, it may require to change the SystemConsumer API. Opened SAMZA-676 <https://issues.apache.org/jira/browse/SAMZA-676> for your use case. You may have a look at the Consumer part of the design doc to get a brief idea how to change the SystemConsumer if you want. The last option is to duplicate the same rule stream to as many partitions as your input stream. This does not require any changes in the Group or Consumer. -- "get the rule when starting up StreamTasks and then localize it." Here I mean, if you do not change the rules, you can read the rules in the init() method of the StreamTask. If the rules are in DB, just read the DB; if the rules are in a stream, read the stream using a consumer and then close the consumer. Because it's one-time task, so a little cost is acceptable. Thank you. Cheers, Fang, Yan yanfang...@gmail.com On Tue, May 12, 2015 at 8:49 AM, Susan Luong <susan...@gmail.com> wrote: > Hi Yan, > > We are looking for a solution for a similar problem. > > Currently, we have 2 input streams, one transactions stream (5 partitions) > and one rules stream (one partition). I have a custom > SystemStreamPartitionGrouper that assigns the SSPs like so:. > > taskName0: Trx stream partition 0, Rules stream partition 0 > taskName1: Trx stream partition 1, Rules stream partition 0 > taskName2: Trx stream partition 2, Rules stream partition 0 > taskName3: Trx stream partition 3, Rules stream partition 0 > taskName4: Trx stream partition 4, Rules stream partition 0 > > However, it looks like messages from the Rules stream are going to only one > task instance, and not to all of them like I hoped. I have them running in > the same container. Is it because there is only one offset value for the > container for the Rules stream? > > Can you please expand on what you mean by "get the rule when starting > up StreamTasks > and then localize it."? Do you mean, loading messages into a changelog > stream using a bootstrap job? > > Thanks in advance for your help! > > Susan > > > > > On Tue, May 5, 2015 at 6:02 PM, Yan Fang <yanfang...@gmail.com> wrote: > > > If the rule does not change, we can get the rule when starting up > > StreamTasks and then localize it. > > > > Cheers, > > > > Fang, Yan > > yanfang...@gmail.com > > > > On Tue, May 5, 2015 at 2:41 PM, Yan Fang <yanfang...@gmail.com> wrote: > > > > > "If I understand it correctly the only viable solution at the moment is > > to > > > create a new stream for the rules messages with as many partitions as > > the > > > data stream and write each rules update message to all partitions of > the > > > new rules stream." > > > > > > If the data is constantly changing, yes, AFAIK, this is the only viable > > > solution before we provide "shared store". > > > > > > Cheers, > > > > > > Fang, Yan > > > yanfang...@gmail.com > > > > > > On Tue, May 5, 2015 at 12:34 PM, Ueli Gallizzi < > ueli.galli...@gmail.com> > > > wrote: > > > > > >> Hi Yan, > > >> > > >> Thanks for your quick response. > > >> > > >> After I read the discussion on SAMZA-353 I think the best solution for > > my > > >> use case is a "shared state" store among StreamTasks described in > > >> SAMZA-402. To give you some background I have a stream with rules > which > > >> are > > >> constantly changing and a data stream on which I apply the rules. The > > >> rules > > >> set is very small if you compare it with the data stream. > > >> > > >> If I understand it correctly the only viable solution at the moment is > > to > > >> create a new stream for the rules messages with as many partitions as > > the > > >> data stream and write each rules update message to all partitions of > the > > >> new rules stream. > > >> > > >> Cheers, > > >> - ueli > > >> > > >> On Tue, May 5, 2015 at 12:06 PM, Yan Fang <yanfang...@gmail.com> > wrote: > > >> > > >> > Hi Ueli, > > >> > > > >> > This feature currently is not supported by Samza. There was some > > >> > discussions in the JIRA - SAMZA-353 > > >> > <https://issues.apache.org/jira/browse/SAMZA-353>. > > >> > > > >> > But there are some workaround for this, depends on what you want to > > >> > achieve. If you can specify what your requirement is, we can help > > think > > >> of > > >> > the solution. > > >> > > > >> > In another thread > > >> > < > > >> > > > >> > > > http://mail-archives.apache.org/mod_mbox/samza-dev/201504.mbox/%3c552410e2.7000...@tivo.com%3E > > >> > >, > > >> > Tommy Becker has similar requirement and he maybe helpful as well. > > >> > > > >> > Cheers, > > >> > Fang, Yan > > >> > yanfang...@gmail.com > > >> > > > >> > On Tue, May 5, 2015 at 7:42 AM, Ueli Gallizzi < > > ueli.galli...@gmail.com> > > >> > wrote: > > >> > > > >> > > Hi, > > >> > > > > >> > > Is it possible that multiple tasks read from the same input stream > > >> > > partition? > > >> > > > > >> > > example: > > >> > > task 0 stream A partition 0, stream B partition 0 > > >> > > task 1 stream A partition 1, stream B partition 0 > > >> > > task 2 stream A partition 3, stream B partition 0 > > >> > > > > >> > > In this example all messages in stream B partition 0 would be > > >> processed > > >> > by > > >> > > all 3 tasks. > > >> > > > > >> > > Cheers, > > >> > > - ueli > > >> > > > > >> > > > >> > > > >> > > > > > > > > >