Hi, Jay and Guozhang, In the long term, I think Kafka can be evolved to replace general-purpose messaging systems. For example, if I were a customer who is using IBM MQ, could we use Kafka instead with very minor code changes? http://www-01.ibm.com/support/knowledgecenter/?lang=en#!/SSFKSJ_7.1.0/com.ibm.mq.doc/fr16190_.htm Regarding transactional messaging, the following MQ document gives a basic introduction and use cases. http://www-01.ibm.com/support/knowledgecenter/#!/SSFKSJ_8.0.0/com.ibm.mq.sce.doc/q023310_.htm
To attract more enterprise customers to using Kafka, I think Kafka needs to minimize function gaps and behavior differences. I know, adding more options and new functions might sacrifice the performance, but that also means more business opportunities to Kafka. At least, I hope the difference can be clearly documented for potential users? Regarding the design of “transactional messaging", to my use cases, I need four additional external Kafka APIs for controlling when a broker issues fsync: - DisableAutoUpdateRecoveryPoint (partitionID) Broker will not do an asynchronous update the recovery point and the metadata file "recovery-point-offset-checkpoint” for this partition. - EnableAutoUpdateRecoveryPoint (partitionID) Broker will do an asynchronous update the recovery point and the metadata file "recovery-point-offset-checkpoint” for this partition, if it is disabled. - GetCurrSyncPoint (partitionID) Broker will return the latest recovery point whose value have been fsynced to the metadata file. - IssueFsync (producerId, generation, partitionID) Broker will find a thread from the thread pool to issue flush and sync on all the unflushed and unsynced segments and update the recovery point and the metadata file "recovery-point-offset-checkpoint". After completion, it will return an ACK message with the latest recovery point whose value have been fsynced to the metadata file. Using these four APIs, I can easily do a flush (by sending CommitTransaction), and issue a fsync (IssueFsync) only if necessary. When my producers are up after a stop, they can easily know the current recovery point and continue the work. Like what Jay said, I have a performance concern on the currently documented transaction flow when the transaction is short. - Maybe Kafka can provide a light-weight protocol when the transaction is only against a single partition? We do have monster transactions, which are normally caused by uncommitted batch jobs. However, that should be very rare. Maybe monthly, quarterly or yearly. Thank you very much! Xiao Li On Mar 11, 2015, at 9:06 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Jay, > > I think what Xiao was proposing is allowing synchronous fsync, i.e. calling > fsync after appending messages to the log, for applications that require > disk data persistency. Maybe we can add another ack condition in addition > to "#.isr replicated" that requires the data to be persisted to disk before > returning the request, and add the immediate fsync task to the delayed > scheduler of the background thread if necessary. This of course will change > the produce request protocol quite a lot. > > Guozhang > > On Wed, Mar 11, 2015 at 8:37 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > >> Xiao, >> >> Not sure about AIX or HP-UX. There are some people running on Windows, >> though we don't do real systemic testing against that. I would be surprised >> if z/os worked, someone would have to try. >> >> The existing fsync policy already works at the batch level, and Kafka >> already does batching quite aggressively. If you set the fsync policy to >> force fsync and you get an acknowledgement that the fsync occurred. >> >> In any case, it has not been my experience that datacenter power outages >> are a common failure mode whereas disk failures happen continually. >> >> I agree that the prototype transaction support would not be a good fit for >> trying to commit each message in a transaction. >> >> -Jay >> >> >> >> On Mon, Mar 9, 2015 at 10:56 PM, Xiao <lixiao1...@gmail.com> wrote: >> >>> Hi, Jay, >>> >>> Thank you! >>> >>> The Kafka document shows “Kafka should run well on any unix system". I >>> assume it includes the major two Unix versions, IBM AIX and HP-UX. Right? >>> >>> 1. Unfortunately, we aims at supporting all the platforms, Linux, Unix, >>> Windows and especially z/OS. I know z/OS is not easy to support. >>> >>> 2. Fsync per message is very expensive and Fsync per batch will break the >>> transaction atomicity. We are looking for transaction-level fsync, which >> is >>> more efficient. Then, our producers can easily combine multiple small >>> transactions into a single bigger batch transaction. I hope the >>> implementation of the ongoing Kafka feature “Transactional Messaging in >>> Kafka” already considered all these issues, although the following link >>> does not mention it: >>> >> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka >>> >>> 3. After a quick reading of the design of “Transactional Messaging in >>> Kafka”, I have a doubt if it can scale especially when most transactions >>> are very short (e.g., containing a single message). >>> - In many use cases, we do not need global transactions, which >>> might be too expensive. The partition-specific transaction granularity >>> might be fine. >>> - To reduce the overhead of transaction-level fsync + network or >>> address-space roundtrips, Kafka might need two extra parameters for >>> batching the small transactions into a single one. The performance >> benefits >>> of batching are huge, as shown in the MRI (multi-row insert) feature in >>> Oracle and DB2 z/OS. >>> >>> I believe transactional messaging is a critical feature. The design >>> document is not very clear. Do you have more materials or links about it? >>> >>> Thanks, >>> >>> Xiao Li >>> >>> >>> On Mar 7, 2015, at 9:33 AM, Jay Kreps <jay.kr...@gmail.com> wrote: >>> >>>> Xiao, >>>> >>>> FileChannel.force is fsync on unix. >>>> >>>> To force fsync on every message: >>>> log.flush.interval.messages=1 >>>> >>>> You are looking at the time based fsync, which, naturally, as you say, >> is >>>> time-based. >>>> >>>> -Jay >>>> >>>> On Fri, Mar 6, 2015 at 11:35 PM, Xiao <lixiao1...@gmail.com> wrote: >>>> >>>>> Hi, Jay, >>>>> >>>>> Thank you for your answer. >>>>> >>>>> Sorry, I still do not understand your meaning. >>>>> >>>>> I guess the two parameters you mentioned are log.flush.interval and >>>>> log.default.flush.interval.ms. However, these two parameters only >>> control >>>>> when Kafka issues a flush (i.e., calling FileChannel.force()). >>>>> >>>>> Fsync (fileOutputStream.getFD().sync()) is controlled by another >>> parameter >>>>> log.default.flush.scheduler.interval.ms. >>>>> >>>>> scheduler.schedule("kafka-recovery-point-checkpoint", >>>>> checkpointRecoveryPointOffsets, >>>>> delay = InitialTaskDelayMs, >>>>> period = flushCheckpointMs, >>>>> TimeUnit.MILLISECONDS) >>>>> >>>>> This thread is only time-controlled. It does not check the number of >>>>> messages. >>>>> >>>>> Thank you, >>>>> >>>>> Xiao Li >>>>> >>>>> >>>>> On Mar 5, 2015, at 11:59 AM, Jay Kreps <jay.kr...@gmail.com> wrote: >>>>> >>>>>> Hey Xiao, >>>>>> >>>>>> That's not quite right. Fsync is controlled by either a time based >>>>> criteria >>>>>> (flush every 30 seconds) or a number of messages criteria. So if you >>> set >>>>>> the number of messages to 1 the flush is synchronous with the write, >>>>> which >>>>>> I think is what you are looking for. >>>>>> >>>>>> -Jay >>>>> >>>>> >>> >>> >> > > > > -- > -- Guozhang