Hi Beam Community,

I have a splittable `DoFn` that reads message from some stream and output the 
result to down stream. The pseudo code looks like:
@DoFn.ProcessElement
public DoFn.ProcessContinuation processElement(@DoFn.Element SourceDescriptor 
sourceDescriptor,
                                               RestrictionTracker<OffsetRange, 
Long> tracker,
                                               WatermarkEstimator 
watermarkEstimator,
                                               DoFn.OutputReceiver<Record> 
receiver) throws Exception {
    while(true){
        messages = getMessageFromStream();
        if (messages.isEmpty()) {
            return DoFn.ProcessContinuation.resume();
        }
        for(message: messages){
            if (!tracker.tryClaim(message)) {
                return DoFn.ProcessContinuation.stop();
            }
            record = Record(message);
            receiver.outputWithTimestamp(record, message.getTimestamp);
        }
    }
}

I expected to see the output in downstream immediately, but the results are 
grouped into batch (4, 5 output) and emitted to down stream. Is this size 
configurable in `DoFn` or runner? 

Thanks for any answer,
Yu
 


Reply via email to