Hi Preston, I just noticed I forgot to cc to the user mailing list on my first reply …. I have a few thoughts about the design you are describing.
> In the meantime I have a nasty hack in place that has unblocked me for now in > getting the target file off the LocalRecoverable/HadoopFsRecoverable: > > InProgressFileWriter.PendingFileRecoverable recoverable = > committable.getPendingFile(); > RecoverableWriter.CommitRecoverable commitRecoverable = > ((OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable) > recoverable).getCommitRecoverable(); > > Method m = commitRecoverable.getClass().getMethod("targetFile"); > m.setAccessible(true); > File targetFile = (File) m.invoke(commitRecoverable); > I think a good place to start would be to introduce getTargetFile, and > getTempFile methods on the CommitRecoverable interface, though I haven't > fully studied the impact of this approach on other implementations of that > interface. I might miss the context here or lack of knowledge how the Azure Data Lake works but why do you need access to the target and/or temp file locations. You scenario sounds very similar to any other distributed file system. > A note on implementing our part-file scoped Encoder: The current Encoder > pattern in 1.14 assumes that the same encoder will work for all files, for > all time. We had to make numerous small changes to the File Sink to break > this pattern, and allow for an Encoder instance per part file. My current > solution uses a custom BucketID object with both Path, and EventType > properties. In our BucketWriter.openNew method we can use the > BucketId.EventType to lookup the Protobuf descriptor we need, create a new > Encoder and pass it to our RowWisePartWriter. We had to reimplement/absorb a > significant amount of the File Sink code to accomplish this as the File Sink > implementation assumes a String for BucketID and there are many roadblocks > put in place to prevent extending FileSink functionality. This is an interesting point. I guess we did not think about such use case when developing the sink. Maybe we can approach the problem differently. I am thinking about adding a context to the `Encoder#encode` method where metadata (new bucket, filename, bucketname) is accessible. Does this help in your case? > A perfect example of these roadblocks is the FileWriterBucketFactory > interface. It looks like we could provide our own implementation of this > interface, but the return type of it's methods (FileWriterBucket) have > default (package protected) visibility and so we can neither provide our own > implementation, nor sub-class the return types to add our own logic. Another > example is the OutputStreamBasedPartFileWriter which wraps a default (package > protected) visibility abstract class (OutputStreamBasedBucketWriter). I ran > into numerous issues like these. In general, all classes annotated with @Internal are not meant to be used outside of Flink but I agree sometimes it becomes necessary. Although if more and more people need to reimplement big parts of the FlieSink we have to incorporate that feedback make it extensible. > > A note on implementing our Azure Data Explorer sink: Currently we're looking > to add code in a custom Committer to do this. However, since I can't grok a > way to make the file commit + ADX ingest command atomic we need to (re)ingest > all pending files since there's no way to track what's already been > committed+ingested. For now we're hoping to make ADX (re)ingestion idempotent > using ingest-by tags, but we may have to use something else (maybe memcache) > to track what's already been ingested if ingest-by tagging doesn't scale. A > post-file-commit pipeline like you've mentioned would work for us provided > that we could still get exactly-once guarantees on the operators of that > post-file-commit pipeline. If there's a better way to solve this part of our > use case in 1.14 I'd love to hear it :) Unfortunately I have limited knowledge about ADX. I briefly looked it up and it seems to have two modes batch and streaming ingestion. Which of both are you currently using? Usually the idempotency is guaranteed by the underlying RecoverableFsDataOutputStream if the current implementations do not suffice I can imagine adding for the azure. The post-file-commit pipeline might be just a normal dataStream where users can consume the committables after they have been committed. So the exactly-once guarantee needs to be implemented by the user. Best, Fabian