Hi Mats and Jakob, +1 to what Till said about non-deterministic behavior. Also I suggest you look at only Pinot's offline segment creation from Flink.
Pinot provides an inbuilt lambda architecture and has the real-time and offline segments per table (architecture diagram <https://docs.pinot.apache.org/basics/concepts#pinot-components>). The rea-time server ingests the steams (e.g. Kafka) directly and buffers in memory. So that the buffer can serve the query within seconds that the events are produced to the stream. If we use Flink to buffer and flush to Pinot upon reaching a threshold, then it defeats the purpose of real-time serving and data freshness. So it makes more sense to use Flink for the offline segment creation in Pinot only. In a typical real-life production environment, Pinot offline segments are generated periodically (e.g. daily) from a scheduled job. If we limit the scope to this, then it's easier to solve the deterministic issue that Till mentioned: the entire job can be relaunched, and overwrite the previous batch of offline segments. Also, there are existing conventions for the segment name, and I suggest you use the SegmentNameGenerator <https://github.com/apache/incubator-pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGenerator.java> to create the segment. In particular, It's important that the segment contains the time range (usually the start/end of the day), so that you can overwrite the entire batch of segments by identifying the range of the offline table. Lastly, a design doc will be helpful and I'm happy to contribute/review. Best, Yupeng On Mon, Jan 25, 2021 at 9:37 AM Till Rohrmann <trohrm...@apache.org> wrote: > Hi Mats and Jakob, > > In the general case, I don't think that elements from upstream Flink tasks > always arrive at the same subtask of the sink. One problem is that user > computations can be non-deterministic. Moreover, a rebalance operation can > distribute the events of a task A among several downstream tasks B_1, B_2. > In this case, records are distributed in a round robin fashion. Therefore, > they depend on the arrival order at task A: > > event_2, event_1 => A => event_1 => B_1 > => event_2 => B_2 > > event_1, event_2 => A => event_2 => B_1 > => event_1 => B_2 > > Since the order in which events from multiple producers arrive at the > consumer is not deterministic (e.g. due to network delays), you might see > different distributions. > > However, I am not sure whether this is really a problem if every Pinot sink > makes sure that all segments which have been written after the last > checkpoint you have recovered from are deleted. You might just end up with > a different job result which is a member of the space of valid results. > > One problem I see with eagerly writing segments to Pinot is that downstream > systems might already start consuming the results even though they might > still change because of a recovery. The way Flink solves this problem is to > only publish results once a checkpoint has been completed. > > Cheers, > Till > > On Mon, Jan 25, 2021 at 4:30 PM Poerschke, Mats < > mats.poersc...@student.hpi.uni-potsdam.de> wrote: > > > Hi all, > > > > We want to give you a short update on the Pinot Sink since we started > > implementing a PoC. > > As described earlier, we aim to use batch-uploading of segments to Pinot > > in combination with caching elements in the Flink sink. > > > > Our current implementation works like this: > > > > Besides the pinot controller URI and the target table’s name, the sink > > allows users to define the max number of rows per segment. > > > > The PinotWriter is responsible for collecting elements, building segments > > and uploading them to Pinot. It therefore retrieves the Schema and > > TableConfig via the Pinot Controller API using the provided tableName. > > Whenever the specified maximum number of rows is reached, it starts the > > segment creation on disk. This process is handled by the Pinot > admin-tool. > > A segmentID is structured as follows: > > <table-name>-<subtask-id>-<incremental-counter> > > Finally the PinotWriter pushes the created segment to the Pinot > Controller > > which will then distribute it onto Pinot Servers. > > > > The PinotCommitter only checkpoints the segment ID of the segment that > was > > last written. It is possible that multiple segments were uploaded to > Pinot > > between two checkpoints. > > > > As for future plans, we want to prevent high memory pressure when > > collecting elements in the PinotWriter by directly writing elements to > > disk. The main question at this point is whether we can assume to have > > access to a disk temp directory. > > > > For the checkpointing and failure recovery we also thought of an approach > > without having tried it yet. Upon recovery from a checkpoint, the latest > > segment ID that is stored in the checkpoint can be accessed by the > > PinotSink. The PinotWriter then compares the incremental counter value of > > the checkpointed segment ID with segments that already exist in Pinot for > > the same table and subtask ID. If segments with a higher counter value in > > their IDs are discovered, they are deleted to avoid duplicates. After > that, > > processing can continue as described above. We think that this mode of > > recovery assumes that elements from upstream Flink tasks always arrive at > > the same subtask of the sink. Is this fair? > > > > Best regards, > > Jakob and Mats > > > > > > On 6. Jan 2021, at 18:22, Yupeng Fu <yup...@uber.com.INVALID<mailto: > > yup...@uber.com.INVALID>> wrote: > > > > Hi Mats, > > > > Glad to see this interest! We at Uber are also working on a Pinot sink > (for > > BATCH execution), with some help from the Pinot community on abstracting > > Pinot interfaces for segment writes and catalog retrieval. Perhaps we can > > collaborate on this proposal/POC. > > > > Cheers, > > > > Yupeng > > > > > > > > On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek <aljos...@apache.org > > <mailto:aljos...@apache.org>> wrote: > > > > That's good to hear. I wasn't sure because the explanation focused a lot > > on checkpoints and the details of it while with the new Sink interface > > implementers don't need to be concerned with those. And in fact, when > > the Sink is used in BATCH execution mode there will be no checkpoints. > > > > Other than that, the implementation sketch makes sense to me. I think to > > make further assessments you will probably have to work on a > > proof-of-concept. > > > > Best, > > Aljoscha > > > > On 2021/01/06 11:18, Poerschke, Mats wrote: > > Yes, we will use the latest sink interface. > > > > Best, > > Mats > > > > On 6. Jan 2021, at 11:05, Aljoscha Krettek <aljos...@apache.org<mailto: > > aljos...@apache.org>> wrote: > > > > It's great to see interest in this. Where you planning to use the new > > Sink interface that we recently introduced? [1] > > > > Best, > > Aljoscha > > > > [1] https://s.apache.org/FLIP-143 > > > > On 2021/01/05 12:21, Poerschke, Mats wrote: > > Hi all, > > > > we want to contribute a sink connector for Apache Pinot. The following > > briefly describes the planned control flow. Please feel free to comment > on > > any of its aspects. > > > > Background > > Apache Pinot is a large-scale real-time data ingestion engine working > > on data segments internally. The controller exposes an external API which > > allows posting new segments via REST call. A thereby posted segment must > > contain an id (called segment name). > > > > Control Flow > > The Flink sink will collect data tuples locally. When creating a > > checkpoint, all those tuples are grouped into one segment which is then > > pushed to the Pinot controller. We will assign each pushed segment a > unique > > incrementing identifier. > > After receiving a success response from the Pinot controller, the > > latest segment name is persisted within the Flink checkpoint. > > In case we have to recover from a failure, the latest successfully > > pushed segment name can be reconstructed from the Flink checkpoint. At > this > > point the system might be in an inconsistent state. The Pinot controller > > might have already stored a newer segment (which’s name was, due to the > > failure, not persisted in the flink checkpoint). > > This inconsistency is resolved with the next successful checkpoint > > creation. The there pushed segment will get the same segment name > assigned > > as the inconsistent segment. Thus, Pinot replaces the old with the new > > segment which prevents introducing duplicate entries. > > > > > > Best regards > > Mats Pörschke > > > > > > > > > > >