Makes sense. Thanks On 13 Oct 2016 12:42 pm, "Michael Noll" <mich...@confluent.io> wrote:
> > But if they arrive out of order, I have to detect / process that myself > in > > the processor logic. > > Yes -- if your processing logic depends on the specific ordering of > messages (which is the case for you), then you must manually implement this > ordering-specific logic at the moment. > > Other use cases may not need to do that and "just work" even with > out-of-order data. If, for example, you are counting objects or are > computing the sum of numbers, then you do not need to anything special. > > > > > > On Wed, Oct 12, 2016 at 10:22 PM, Ali Akhtar <ali.rac...@gmail.com> wrote: > > > Thanks Matthias. > > > > So, if I'm understanding this right, Kafka will not discard which > messages > > which arrive out of order. > > > > What it will do is show messages in the order in which they arrive. > > > > But if they arrive out of order, I have to detect / process that myself > in > > the processor logic. > > > > Is that correct? > > > > Thanks. > > > > On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax <matth...@confluent.io > > > > wrote: > > > > > -----BEGIN PGP SIGNED MESSAGE----- > > > Hash: SHA512 > > > > > > Last question first: A KTable is basically in finite window over the > > > whole stream providing a single result (that gets updated when new > > > data arrives). If you use windows, you cut the overall stream into > > > finite subsets and get a result per window. Thus, I guess you do not > > > need windows (if I understood you use case correctly). > > > > > > However, current state of Kafka Streams DSL, you will not be able to > > > use KTable (directly -- see suggestion to fix this below) because is > > > does (currently) not allow to access the timestamp of the current > > > record (thus, you can not know if a record is late or not). You will > > > need to use Processor API which allows you to access the current > > > records timestamp via the Context object given in init() > > > > > > Your reasoning about partitions and Streams instances is correct. > > > However, the following two are not > > > > > > > - Because I'm using a KTable, the timestamp of the messages is > > > > extracted, and I'm not shown the older bid because I've already > > > > processed the later bid. The older bid is ignored. > > > > > > and > > > > > > > - Because of this, the replica already knows which timestamps it > > > > has processed, and is able to ignore the older messages. > > > > > > Late arriving records are not dropped but processes regularly. Thus, > > > your KTable aggregate function will be called for the late arriving > > > record, too (but as described about, you have currently no way to know > > > it is a later record). > > > > > > > > > Last but not least, you last statement is a valid concern: > > > > > > > Also, what will happen if bid 2 arrived and got processed, and then > > > > the particular replica crashed, and was restarted. The restarted > > > > replica won't have any memory of which timestamps it has previously > > > > processed. > > > > > > > > So if bid 2 got processed, replica crashed and restarted, and then > > > > bid 1 arrived, what would happen in that case? > > > > > > In order to make this work, you would need to store the timestamp in > > > you store next to the actual data. Thus, you can compare the timestamp > > > of the latest result (safely stored in operator state) with the > > > timestamp of the current record. > > > > > > Does this makes sense? > > > > > > To fix you issue, you could add a .transformValue() before you KTable, > > > which allows you to access the timestamp of a record. If you add this > > > timestamp to you value and pass it to KTable afterwards, you can > > > access it and it gets also store reliably. > > > > > > <bid_id : bid_value> => transformValue => <bid_id : {bid_value, > > > timestamp} => aggregate > > > > > > Hope this helps. > > > > > > - -Matthias > > > > > > > > > On 10/11/16 9:12 PM, Ali Akhtar wrote: > > > > P.S, does my scenario require using windows, or can it be achieved > > > > using just KTable? > > > > > > > > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar <ali.rac...@gmail.com> > > > > wrote: > > > > > > > >> Heya, > > > >> > > > >> Say I'm building a live auction site, with different products. > > > >> Different users will bid on different products. And each time > > > >> they do, I want to update the product's price, so it should > > > >> always have the latest price in place. > > > >> > > > >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on > > > >> the same product 100 ms later. > > > >> > > > >> The second bid arrives first and the price is updated to $5. Then > > > >> the first bid arrives. I want the price to not be updated in this > > > >> case, as this bid is older than the one I've already processed. > > > >> > > > >> Here's my understanding of how I can achieve this with Kafka > > > >> Streaming - is my understanding correct? > > > >> > > > >> - I have a topic for receiving bids. The topic has N partitions, > > > >> and I have N replicas of my application which hooks up w/ Kafka > > > >> Streaming, up and running. > > > >> > > > >> - I assume each replica of my app will listen to a different > > > >> partition of the topic. > > > >> > > > >> - A user makes a bid on product A. > > > >> > > > >> - This is pushed to the topic with the key bid_a > > > >> > > > >> - Another user makes a bid. This is also pushed with the same key > > > >> (bid_a) > > > >> > > > >> - The 2nd bid arrives first, and gets processed. Then the first > > > >> (older) bid arrives. > > > >> > > > >> - Because I'm using a KTable, the timestamp of the messages is > > > >> extracted, and I'm not shown the older bid because I've already > > > >> processed the later bid. The older bid is ignored. > > > >> > > > >> - All bids on product A go to the same topic partition, and hence > > > >> the same replica of my app, because they all have the key bid_a. > > > >> > > > >> - Because of this, the replica already knows which timestamps it > > > >> has processed, and is able to ignore the older messages. > > > >> > > > >> Is the above understandning correct? > > > >> > > > >> Also, what will happen if bid 2 arrived and got processed, and > > > >> then the particular replica crashed, and was restarted. The > > > >> restarted replica won't have any memory of which timestamps it > > > >> has previously processed. > > > >> > > > >> So if bid 2 got processed, replica crashed and restarted, and > > > >> then bid 1 arrived, what would happen in that case? > > > >> > > > >> Thanks. > > > >> > > > > > > > -----BEGIN PGP SIGNATURE----- > > > Comment: GPGTools - https://gpgtools.org > > > > > > iQIcBAEBCgAGBQJX/oLPAAoJECnhiMLycopP8akP/3Fo24Xeu1/0LuNdBuwTlJd7 > > > 6r9WrSiSbpiVlWoA1dRjSrkjQoUOwgAD6vXji5Jb8BIT5tMi57KQVrTmXWz/feuy > > > 6qJIvfxj8vYdFLTcTOYZKWCEHQK1am2SGkFEeZKY0BbABNqwWzx6lWAJxKlxoBcn > > > AXi+IZn07fTvQeShahwg7pLL5xbbE4u6w7YBNqTuvlYNglKI2CUK1EE2jw5Gp2sy > > > sjnHCIXDCBhFYyxxdKWTsfHEV74wUI4ARvRChJondY/uRxc5u+INCNax79N2Syq9 > > > S/ffQvaCS5PJ0nwcv2Gu7WDkrxVu+sP+nwSoxoE3bE1iYH91KLmdLlmBnJ9j+6g/ > > > i7P7+kwf4a04KMZtGXCU2ZGQjnSlIsjTSFuEE8ASFeRkzGBhM1zDoMNHys6dQDSR > > > lgB8eIay2jknUeWR+NJLuerwJZTPYfnlPBZ1jYoaKKsnHDleS69sn0BstphZ/3k5 > > > fsQz435/emecRZI6Vok9+9FvehPmJ0Jsz70sUlhJS7hvpJ+0D+aI0VbRAUxML7QX > > > 7IOw3gLGi8K+bCGxB80AidbSGvzcuEqyrW/9wPttgIuqFjfGcF80nyKsvvgySLnE > > > 0RlM0qm24fzCzxFlNZQEJrmJ9YsaNWCQ4qhzuwGhQC1bBEa10Jy5Dqjj1lwA/G+v > > > wLVWRn2J0n9mKSiOnHki > > > =oJIL > > > -----END PGP SIGNATURE----- > > > > > >