BucketID is a variable type, and conceptually you can use any type so long as you can provide a serializer for it (BucketAssigner#getSerializer). The documentation is wrong in this instance.

The convenience Flink APIs (StreamingFileSink#forRowFormat/StreamingFileSink#forBulkFormat) default to Strings; but you can change the type by setting both an assigner and policy via "withBucketAssignerAndPolicy"; you should be able to use ".DefaultRollingPolicy.create().build()" as a default policy.

On 02/04/2019 20:18, Jeff Crane wrote:
According to my IDE (Jetbrains), I get an error with getBucketID(IN, Context) signature requiring a return of string (Flink 1.7 libs), so I still don't think the BucketID is a variable type.

I still don't understand the role of the:
public SimpleVersionedSerializer<String> getSerializer() {
         return SimpleVersionedStringSerializer.INSTANCE;
     }
Where does that come into play, if the getBucketID makes a string anyway?




On Monday, April 1, 2019, 11:44:14 AM PDT, Jeff Crane <jeffcra...@yahoo.com> wrote:


I have had an issue understanding the documentation, in regard to 
BucketAssigner.
BucketID <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html> getBucketId(IN <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html> element, BucketAssigner.Context <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.Context.html> context) SimpleVersionedSerializer <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/io/SimpleVersionedSerializer.html><BucketID <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html>> getSerializer() First of all, I don't understand what type of "BucketID" means. I assume that's the returned type fo the getBucketID, which doesn't make sense. The description says getBucketId (returns?) "A string representing the identifier of the bucket" So BucketID is not a type, it's always a string?
Base on the docs, I implemented like this, which doesn't write anything!
public final class CustomBucketAssignerimplements BucketAssigner<MyEvent, 
String> {
public String getBucketId(final MyEvent element,final Context context) {

         DateTime dateTimeL =new DateTime(context.currentWatermark());

         return String.join("_",
                 String.valueOf(dateTimeL.getYear()),
                 String.valueOf(dateTimeL.getMonthOfYear()),
                 String.valueOf(dateTimeL.getDayOfMonth()),
                 String.valueOf(dateTimeL.getHourOfDay()),
                 String.valueOf(dateTimeL.getMinuteOfHour())
         );
     }

     // I assume <String> because BucketID is always string?
public SimpleVersionedSerializer<String> getSerializer() {
         return SimpleVersionedStringSerializer.INSTANCE;
     }
}

Can someone explain how bucketAssigned is supposed to do in plainer english. I don't think the docs are clear and I'm lost.


Reply via email to