Thanks for your thoughts here Fabian, I've responded inline but I also want
to clarify the reason I need the file paths on commit.
The FileSink works as expected in Azure Data Lake with the ABFS connector,
but I want to perform an additional step by telling Azure Data Explorer to
ingest the committed files, and I need their paths to do so. This is why
I've implemented the hack below to Reflectively get access to the
underlying File, which I can then use to craft my ingestion command to
Azure Data Explorer.

On Tue, Oct 12, 2021 at 2:15 AM Fabian Paul <fabianp...@ververica.com>
wrote:

> Hi Preston,
>
> I just noticed I forgot to cc to the user mailing list on my first reply
> …. I have a few thoughts about the design you are describing.
>
>
> In the meantime I have a nasty hack in place that has unblocked me for now
> in getting the target file off the LocalRecoverable/HadoopFsRecoverable:
>
> InProgressFileWriter.PendingFileRecoverable recoverable =
>> committable.getPendingFile();
>
> RecoverableWriter.CommitRecoverable commitRecoverable =
>> ((OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable)
>> recoverable).getCommitRecoverable();
>
>
>> Method m = commitRecoverable.getClass().getMethod("targetFile");
>> m.setAccessible(true);
>> File targetFile = (File) m.invoke(commitRecoverable);
>
> I think a good place to start would be to introduce getTargetFile, and
> getTempFile methods on the CommitRecoverable interface, though I haven't
> fully studied the impact of this approach on other implementations of that
> interface.
>
>
> I might miss the context here or lack of knowledge how the Azure Data Lake
> works but why do you need access to the target and/or temp file locations.
> You scenario sounds very similar to any other distributed file system.
>

For my case I need to know the final path to the finished files so I can
issue an ingest command to Azure Data Explorer for each file once they're
committed. When using Azure Data Lake for storage I can instruct Azure Data
Explorer to ingest a file from a path in blob storage, but I need to know
what the path is. Alternatively we may be able to leverage something like
Event Grid which can send a signal whenever a new file lands in a
particular path in Azure Data Lake, but there are benefits to having tight
control over issuing the ingest commands.


>
>
> A note on implementing our part-file scoped Encoder: The current Encoder
> pattern in 1.14 assumes that the same encoder will work for all files, for
> all time. We had to make numerous small changes to the File Sink to break
> this pattern, and allow for an Encoder instance per part file. My current
> solution uses a custom BucketID object with both Path, and EventType
> properties. In our BucketWriter.openNew method we can use the
> BucketId.EventType to lookup the Protobuf descriptor we need, create a new
> Encoder and pass it to our RowWisePartWriter. We had to reimplement/absorb
> a significant amount of the File Sink code to accomplish this as the File
> Sink implementation assumes a String for BucketID and there are many
> roadblocks put in place to prevent extending FileSink functionality.
>
>
> This is an interesting point. I guess we did not think about such use case
> when developing the sink. Maybe we can approach the problem differently.
> I am thinking about adding a context to the `Encoder#encode` method where
> metadata (new bucket, filename, bucketname) is accessible. Does this help
> in your case?
>

Yes, this could have saved me a great deal of hassle if there were
additional context provided to the encoder about the lifecycle, and
BucketID of the underlying part file. It would still be a bit of a complex
Encoder as, for my case, each bucket needs to be encoded differently, and
state updated when files roll.


> A perfect example of these roadblocks is the FileWriterBucketFactory
> interface. It looks like we could provide our own implementation of this
> interface, but the return type of it's methods (FileWriterBucket) have
> default (package protected) visibility and so we can neither provide our
> own implementation, nor sub-class the return types to add our own logic.
> Another example is the OutputStreamBasedPartFileWriter which wraps a
> default (package protected) visibility abstract class
> (OutputStreamBasedBucketWriter). I ran into numerous issues like these.
>
>
> In general, all classes annotated with @Internal are not meant to be used
> outside of  Flink but I agree sometimes it becomes necessary. Although if
> more and more people need to reimplement big parts of the FlieSink we have
> to incorporate that feedback make it extensible.
>

In my case the solution is then to reimplement all that great functionality
and it will make upgrading to future versions of Flink harder.


>
> A note on implementing our Azure Data Explorer sink: Currently we're
> looking to add code in a custom Committer to do this. However, since I
> can't grok a way to make the file commit + ADX ingest command atomic we
> need to (re)ingest all pending files since there's no way to track what's
> already been committed+ingested. For now we're hoping to make ADX
> (re)ingestion idempotent using ingest-by tags, but we may have to use
> something else (maybe memcache) to track what's already been ingested if
> ingest-by tagging doesn't scale. A post-file-commit pipeline like you've
> mentioned would work for us provided that we could still get exactly-once
> guarantees on the operators of that post-file-commit pipeline. If there's a
> better way to solve this part of our use case in 1.14 I'd love to hear it :)
>
>
> Unfortunately I have limited knowledge about ADX. I briefly looked it up
> and it seems to have two modes batch and streaming ingestion. Which of both
> are you currently using?
>

For our use case we must use batched, queued ingestion. This decouples our
sink from the ADX engine, a queue provides elasticity between our sink
(producing files to ingest) and ADX's ingestion of those files. Streaming
ingestion has some limitations (more complex schema migrations, lack of
support for continuous export, among others) that disqualify it for our
case.


>
> Usually the idempotency is guaranteed by the underlying
> RecoverableFsDataOutputStream if the current implementations do not suffice
> I can imagine adding for the azure.
>

For our use case we're planning to leverage `ingest-by` tags on ingestion
to get idempotency in ADX at the file level. `ingest-by` tags inform ADX to
only ingest each file once. Therefore if we were to restore and re-ingest
all pending->committed files again ADX would no-op on subsequent ingestion
of the same file. `ingest-by` tags do add some additional overhead
(disables ADX ingestion batching) but we're planning to write large enough
files in most cases that they wouldn't be batched together anyway.


>
> The post-file-commit pipeline might be just a normal dataStream where
> users can consume the committables after they have been committed. So the
> exactly-once guarantee needs to be implemented by the user.
>

I think I understand. We'd still need an idempotent sink for the stream of
comittables as they could be replayed on checkpoint restore.

Best,
> Fabian
>

Reply via email to