Hi Dhurandar,

    With my understand I think what you need is to get notified when a file is 
written successfully (committed) on the S3 FileSystem. However, currently there 
is no public API for the listener and there an issue tracking it [1].

    With the current version, one possible method comes to me is that may have 
to use reflection to access some internal states of StreamFileSink to get the 
committed files. As a whole, you may need to implement a customized 
StreamingFileSink and override the notifyCheckpointComplete method, where the 
new S3 file get committed and visible:

class CustomizedStreamingFileSink extends StreamingFileSink {

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        // 1. First use reflection to get the list of files will be committed 
in this call.
        // The list of files should be get via StreamingFile -> ( 
StreamingFileSink Helper if 1.11 is used ) -> Buckets -> activeBuckets (there 
will be multiple Buckets) -> (for each Bucket) 
pendingFileRecoverablesPerCheckpoint
        // Then we could get the iterator of pending files to commit in this 
time via pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)[2]
  // Then you could get the S3 object names via (PendingFileRecover if 1.11 is 
used) -> CommitRecoverable (Will must be S3Recoverable ) -> objectName.

        super.notifyCheckpointComplete(checkpointId); // Get files committed 
normally.

        // 3. Then here could start writing meta info for S3 objects recorded 
in step 1. 
    }
}

For a single file it may get committed multiple times, therefore the writing 
meta info action must also be able to handle the repeat writing.

Another possible method will be to use a seperate source operator to periodly 
scans the S3 file system to detect the newly added files and modify their meta 
data. There should be embedding source function 
ContinuousFileMonitoringFunction[3] for this work, and I think it might be 
modified or reused for scanning the files. 

Best,
  Yun 


[1] https://issues.apache.org/jira/browse/FLINK-17900
[2] 
https://github.com/apache/flink/blob/a5527e3b2ff4abea2ff8fa05cb755561549be06a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L268
[3] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java


------------------------------------------------------------------
Sender:dhurandar S<dhurandarg...@gmail.com>
Date:2020/06/20 03:19:38
Recipient:user<user@flink.apache.org>; Flink Dev<d...@flink.apache.org>
Theme:adding s3 object metadata while using StreamFileSink

We are creating files in S3 and we want to update the S3 object metadata with 
some security-related information for governance purposes.

Right now Apache Flink totally abstracts how and when S3 object gets created in 
the system. 

Is there a way that we can pass the S3 object metadata and update it for the 
object created.

If not, 

How can we know when Apache Flink has created an S3 file. Deterministically.
Since once its created in S3 we can write Java code after that to add those 
metadata information?

-- 
Thank you and regards,
Dhurandar


Reply via email to