+dev <d...@beam.apache.org> Hi Yu, Which runner are you using for your pipeline? Also it would be helpful to share your pipeline code as well.
On Mon, Jan 25, 2021 at 10:19 PM <yu.b.zh...@oracle.com> wrote: > Hi Beam Community, > > I have a splittable `DoFn` that reads message from some stream and output > the result to down stream. The pseudo code looks like: > > @DoFn.ProcessElement > public DoFn.ProcessContinuation processElement(@DoFn.Element SourceDescriptor > sourceDescriptor, > > RestrictionTracker<OffsetRange, Long> tracker, > WatermarkEstimator > watermarkEstimator, > DoFn.OutputReceiver<Record> > receiver) throws Exception { > while(true){ > messages = getMessageFromStream(); > if (messages.isEmpty()) { > return DoFn.ProcessContinuation.resume(); > } > for(message: messages){ > if (!tracker.tryClaim(message)) { > return DoFn.ProcessContinuation.stop(); > } > record = Record(message); > receiver.outputWithTimestamp(record, message.getTimestamp); > } > } > } > > > I expected to see the output in downstream immediately, but the results > are grouped into batch (4, 5 output) and emitted to down stream. Is this > size configurable in `DoFn` or runner? > > Thanks for any answer, > Yu > > > >