[ 
https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596276#comment-16596276
 ] 

ASF GitHub Bot commented on FLINK-9962:
---------------------------------------

asfgit closed pull request #6492: [FLINK-9962] [FS connector] allow users to 
specify TimeZone in DateTimeBucketer
URL: https://github.com/apache/flink/pull/6492
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/connectors/filesystem_sink.md 
b/docs/dev/connectors/filesystem_sink.md
index af1349d6665..79ed08e9d41 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -70,7 +70,8 @@ stored. The sink can be further configured by specifying a 
custom bucketer, writ
 
 By default the bucketing sink will split by the current system time when 
elements arrive and will
 use the datetime pattern `"yyyy-MM-dd--HH"` to name the buckets. This pattern 
is passed to
-`SimpleDateFormat` with the current system time to form a bucket path. A new 
bucket will be created
+`DateTimeFormatter` with the current system time and JVM's default timezone to 
form a bucket path.
+Users can also specify a timezone for the bucketer to format bucket path. A 
new bucket will be created
 whenever a new date is encountered. For example, if you have a pattern that 
contains minutes as the
 finest granularity you will get a new bucket every minute. Each bucket is 
itself a directory that
 contains several part files: each parallel instance of the sink will create 
its own part file and
@@ -105,7 +106,7 @@ Example:
 DataStream<Tuple2<IntWritable,Text>> input = ...;
 
 BucketingSink<String> sink = new BucketingSink<String>("/base/path");
-sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
+sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")));
 sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
 sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
 sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
@@ -119,7 +120,7 @@ input.addSink(sink);
 val input: DataStream[Tuple2[IntWritable, Text]] = ...
 
 val sink = new BucketingSink[String]("/base/path")
-sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
+sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")))
 sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
 sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
 sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
index b7035fe8ab4..d549eda3062 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
@@ -19,13 +19,15 @@
 package org.apache.flink.streaming.connectors.fs.bucketing;
 
 import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 
 /**
  * A {@link Bucketer} that assigns to buckets based on current system time.
@@ -38,8 +40,8 @@
  * is determined based on the current system time and the user provided format 
string.
  *
  *
- * <p>{@link SimpleDateFormat} is used to derive a date string from the 
current system time and
- * the date format string. The default format string is {@code 
"yyyy-MM-dd--HH"} so the rolling
+ * <p>{@link DateTimeFormatter} is used to derive a date string from the 
current system time and
+ * the date format string with a timezone. The default format string is {@code 
"yyyy-MM-dd--HH"} so the rolling
  * files will have a granularity of hours.
  *
  *
@@ -61,44 +63,67 @@
        private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
 
        private final String formatString;
+       private final ZoneId zoneId;
 
-       private transient SimpleDateFormat dateFormatter;
+       private transient DateTimeFormatter dateTimeFormatter;
 
        /**
-        * Creates a new {@code DateTimeBucketer} with format string {@code 
"yyyy-MM-dd--HH"}.
+        * Creates a new {@code DateTimeBucketer} with format string {@code 
"yyyy-MM-dd--HH"} using JVM's default timezone.
         */
        public DateTimeBucketer() {
                this(DEFAULT_FORMAT_STRING);
        }
 
        /**
-        * Creates a new {@code DateTimeBucketer} with the given date/time 
format string.
+        * Creates a new {@code DateTimeBucketer} with the given date/time 
format string using JVM's default timezone.
         *
-        * @param formatString The format string that will be given to {@code 
SimpleDateFormat} to determine
+        * @param formatString The format string that will be given to {@code 
DateTimeFormatter} to determine
         *                     the bucket path.
         */
        public DateTimeBucketer(String formatString) {
-               this.formatString = formatString;
+               this(formatString, ZoneId.systemDefault());
+       }
+
+       /**
+        * Creates a new {@code DateTimeBucketer} with format string {@code 
"yyyy-MM-dd--HH"} using the given timezone.
+        *
+        * @param zoneId The timezone used to format {@code DateTimeFormatter} 
for bucket path.
+        */
+       public DateTimeBucketer(ZoneId zoneId) {
+               this(DEFAULT_FORMAT_STRING, zoneId);
+       }
+
+       /**
+        * Creates a new {@code DateTimeBucketer} with the given date/time 
format string using the given timezone.
+        *
+        * @param formatString The format string that will be given to {@code 
DateTimeFormatter} to determine
+        *                     the bucket path.
+        * @param zoneId The timezone used to format {@code DateTimeFormatter} 
for bucket path.
+        */
+       public DateTimeBucketer(String formatString, ZoneId zoneId) {
+               this.formatString = Preconditions.checkNotNull(formatString);
+               this.zoneId = Preconditions.checkNotNull(zoneId);
 
-               this.dateFormatter = new SimpleDateFormat(formatString);
+               this.dateTimeFormatter = 
DateTimeFormatter.ofPattern(this.formatString).withZone(zoneId);
        }
 
        private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
                in.defaultReadObject();
 
-               this.dateFormatter = new SimpleDateFormat(formatString);
+               this.dateTimeFormatter = 
DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
        }
 
        @Override
        public Path getBucketPath(Clock clock, Path basePath, T element) {
-               String newDateTimeString = dateFormatter.format(new 
Date(clock.currentTimeMillis()));
+               String newDateTimeString = 
dateTimeFormatter.format(Instant.ofEpochMilli(clock.currentTimeMillis()));
                return new Path(basePath + "/" + newDateTimeString);
        }
 
        @Override
        public String toString() {
                return "DateTimeBucketer{" +
-                               "formatString='" + formatString + '\'' +
-                               '}';
+                       "formatString='" + formatString + '\'' +
+                       ", zoneId=" + zoneId +
+                       '}';
        }
 }
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketerTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketerTest.java
new file mode 100644
index 00000000000..3ce9f496c7f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketerTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.flink.streaming.connectors.fs.Clock;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.time.ZoneId;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link DateTimeBucketer}.
+ */
+public class DateTimeBucketerTest {
+       private static final long TEST_TIME_IN_MILLIS = 1533363082011L;
+       private static final Path TEST_PATH = new Path("test");
+
+       private static final Clock mockedClock = new MockedClock();
+
+       @Test
+       public void testGetBucketPathWithSpecifiedTimezone() {
+               DateTimeBucketer bucketer = new 
DateTimeBucketer(ZoneId.of("America/Los_Angeles"));
+
+               assertEquals(new Path("test/2018-08-03--23"), 
bucketer.getBucketPath(mockedClock, TEST_PATH, null));
+       }
+
+       @Test
+       public void testGetBucketPathWithSpecifiedFormatString() {
+               DateTimeBucketer bucketer = new 
DateTimeBucketer("yyyy-MM-dd-HH", ZoneId.of("America/Los_Angeles"));
+
+               assertEquals(new Path("test/2018-08-03-23"), 
bucketer.getBucketPath(mockedClock, TEST_PATH, null));
+       }
+
+       private static class MockedClock implements Clock {
+
+               @Override
+               public long currentTimeMillis() {
+                       return TEST_TIME_IN_MILLIS;
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
index 5d30f39fa2e..32e3b2a864b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
@@ -21,9 +21,11 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.util.Preconditions;
 
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 
 /**
  * A {@link BucketAssigner} that assigns to buckets based on current system 
time.
@@ -37,14 +39,14 @@
  * user provided format string.
  *
  *
- * <p>{@link SimpleDateFormat} is used to derive a date string from the 
current system time and
+ * <p>{@link DateTimeFormatter} is used to derive a date string from the 
current system time and
  * the date format string. The default format string is {@code 
"yyyy-MM-dd--HH"} so the rolling
  * files will have a granularity of hours.
  *
  * <p>Example:
  *
  * <pre>{@code
- *     Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
+ *     BucketAssigner bucketAssigner = new 
DateTimeBucketAssigner("yyyy-MM-dd--HH");
  * }</pre>
  *
  * <p>This will create for example the following bucket path:
@@ -59,32 +61,55 @@
        private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
 
        private final String formatString;
+       private final ZoneId zoneId;
 
-       private transient SimpleDateFormat dateFormatter;
+       private transient DateTimeFormatter dateTimeFormatter;
 
        /**
-        * Creates a new {@code DateTimeBucketer} with format string {@code 
"yyyy-MM-dd--HH"}.
+        * Creates a new {@code DateTimeBucketAssigner} with format string 
{@code "yyyy-MM-dd--HH"}.
         */
        public DateTimeBucketAssigner() {
                this(DEFAULT_FORMAT_STRING);
        }
 
        /**
-        * Creates a new {@code DateTimeBucketer} with the given date/time 
format string.
+        * Creates a new {@code DateTimeBucketAssigner} with the given 
date/time format string.
         *
         * @param formatString The format string that will be given to {@code 
SimpleDateFormat} to determine
-        *                     the bucket path.
+        *                     the bucket id.
         */
        public DateTimeBucketAssigner(String formatString) {
-               this.formatString = formatString;
+               this(formatString, ZoneId.systemDefault());
+       }
+
+       /**
+        * Creates a new {@code DateTimeBucketAssigner} with format string 
{@code "yyyy-MM-dd--HH"} using the given timezone.
+        *
+        * @param zoneId The timezone used to format {@code DateTimeFormatter} 
for bucket id.
+        */
+       public DateTimeBucketAssigner(ZoneId zoneId) {
+               this(DEFAULT_FORMAT_STRING, zoneId);
+       }
+
+       /**
+        * Creates a new {@code DateTimeBucketAssigner} with the given 
date/time format string using the given timezone.
+        *
+        * @param formatString The format string that will be given to {@code 
DateTimeFormatter} to determine
+        *                     the bucket path.
+        * @param zoneId The timezone used to format {@code DateTimeFormatter} 
for bucket id.
+        */
+       public DateTimeBucketAssigner(String formatString, ZoneId zoneId) {
+               this.formatString = Preconditions.checkNotNull(formatString);
+               this.zoneId = Preconditions.checkNotNull(zoneId);
        }
 
        @Override
        public String getBucketId(IN element, BucketAssigner.Context context) {
-               if (dateFormatter == null) {
-                       dateFormatter = new SimpleDateFormat(formatString);
+               if (dateTimeFormatter == null) {
+                       dateTimeFormatter = 
DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
                }
-               return dateFormatter.format(new 
Date(context.currentProcessingTime()));
+
+               return 
dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
        }
 
        @Override
@@ -94,6 +119,9 @@ public String getBucketId(IN element, BucketAssigner.Context 
context) {
 
        @Override
        public String toString() {
-               return "DateTimeBucketAssigner{formatString='" + formatString + 
'\'' + '}';
+               return "DateTimeBucketAssigner{" +
+                       "formatString='" + formatString + '\'' +
+                       ", zoneId=" + zoneId +
+                       '}';
        }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssignerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssignerTest.java
new file mode 100644
index 00000000000..b4fac1837d3
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssignerTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners;
+
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.ZoneId;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link DateTimeBucketAssigner}.
+ */
+public class DateTimeBucketAssignerTest {
+       private static final long TEST_TIME_IN_MILLIS = 1533363082011L;
+
+       private static final MockedContext mockedContext = new MockedContext();
+
+       @Test
+       public void testGetBucketPathWithSpecifiedTimezone() {
+               DateTimeBucketAssigner bucketAssigner = new 
DateTimeBucketAssigner(ZoneId.of("America/Los_Angeles"));
+
+               assertEquals("2018-08-03--23", bucketAssigner.getBucketId(null, 
mockedContext));
+       }
+
+       @Test
+       public void testGetBucketPathWithSpecifiedFormatString() {
+               DateTimeBucketAssigner bucketAssigner = new 
DateTimeBucketAssigner("yyyy-MM-dd-HH", ZoneId.of("America/Los_Angeles"));
+
+               assertEquals("2018-08-03-23", bucketAssigner.getBucketId(null, 
mockedContext));
+       }
+
+       private static class MockedContext implements BucketAssigner.Context {
+               @Override
+               public long currentProcessingTime() {
+                       return TEST_TIME_IN_MILLIS;
+               }
+
+               @Override
+               public long currentWatermark() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Nullable
+               @Override
+               public Long timestamp() {
+                       return null;
+               }
+       }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> allow users to specify TimeZone in DateTimeBucketer
> ---------------------------------------------------
>
>                 Key: FLINK-9962
>                 URL: https://issues.apache.org/jira/browse/FLINK-9962
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming Connectors
>    Affects Versions: 1.5.1, 1.6.0
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> Currently {{DateTimeBucketer}} will return a bucket path by using local 
> timezone. We should add a {{timezone}} constructor param to allow users to 
> specify a timezone.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to