[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596276#comment-16596276 ]
ASF GitHub Bot commented on FLINK-9962: --------------------------------------- asfgit closed pull request #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md index af1349d6665..79ed08e9d41 100644 --- a/docs/dev/connectors/filesystem_sink.md +++ b/docs/dev/connectors/filesystem_sink.md @@ -70,7 +70,8 @@ stored. The sink can be further configured by specifying a custom bucketer, writ By default the bucketing sink will split by the current system time when elements arrive and will use the datetime pattern `"yyyy-MM-dd--HH"` to name the buckets. This pattern is passed to -`SimpleDateFormat` with the current system time to form a bucket path. A new bucket will be created +`DateTimeFormatter` with the current system time and JVM's default timezone to form a bucket path. +Users can also specify a timezone for the bucketer to format bucket path. A new bucket will be created whenever a new date is encountered. For example, if you have a pattern that contains minutes as the finest granularity you will get a new bucket every minute. Each bucket is itself a directory that contains several part files: each parallel instance of the sink will create its own part file and @@ -105,7 +106,7 @@ Example: DataStream<Tuple2<IntWritable,Text>> input = ...; BucketingSink<String> sink = new BucketingSink<String>("/base/path"); -sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm")); +sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))); sink.setWriter(new SequenceFileWriter<IntWritable, Text>()); sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB, sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins @@ -119,7 +120,7 @@ input.addSink(sink); val input: DataStream[Tuple2[IntWritable, Text]] = ... val sink = new BucketingSink[String]("/base/path") -sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm")) +sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))) sink.setWriter(new SequenceFileWriter[IntWritable, Text]()) sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java index b7035fe8ab4..d549eda3062 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java @@ -19,13 +19,15 @@ package org.apache.flink.streaming.connectors.fs.bucketing; import org.apache.flink.streaming.connectors.fs.Clock; +import org.apache.flink.util.Preconditions; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.io.ObjectInputStream; -import java.text.SimpleDateFormat; -import java.util.Date; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; /** * A {@link Bucketer} that assigns to buckets based on current system time. @@ -38,8 +40,8 @@ * is determined based on the current system time and the user provided format string. * * - * <p>{@link SimpleDateFormat} is used to derive a date string from the current system time and - * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling + * <p>{@link DateTimeFormatter} is used to derive a date string from the current system time and + * the date format string with a timezone. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling * files will have a granularity of hours. * * @@ -61,44 +63,67 @@ private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH"; private final String formatString; + private final ZoneId zoneId; - private transient SimpleDateFormat dateFormatter; + private transient DateTimeFormatter dateTimeFormatter; /** - * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}. + * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"} using JVM's default timezone. */ public DateTimeBucketer() { this(DEFAULT_FORMAT_STRING); } /** - * Creates a new {@code DateTimeBucketer} with the given date/time format string. + * Creates a new {@code DateTimeBucketer} with the given date/time format string using JVM's default timezone. * - * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine + * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine * the bucket path. */ public DateTimeBucketer(String formatString) { - this.formatString = formatString; + this(formatString, ZoneId.systemDefault()); + } + + /** + * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"} using the given timezone. + * + * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket path. + */ + public DateTimeBucketer(ZoneId zoneId) { + this(DEFAULT_FORMAT_STRING, zoneId); + } + + /** + * Creates a new {@code DateTimeBucketer} with the given date/time format string using the given timezone. + * + * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine + * the bucket path. + * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket path. + */ + public DateTimeBucketer(String formatString, ZoneId zoneId) { + this.formatString = Preconditions.checkNotNull(formatString); + this.zoneId = Preconditions.checkNotNull(zoneId); - this.dateFormatter = new SimpleDateFormat(formatString); + this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(zoneId); } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - this.dateFormatter = new SimpleDateFormat(formatString); + this.dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId); } @Override public Path getBucketPath(Clock clock, Path basePath, T element) { - String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis())); + String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(clock.currentTimeMillis())); return new Path(basePath + "/" + newDateTimeString); } @Override public String toString() { return "DateTimeBucketer{" + - "formatString='" + formatString + '\'' + - '}'; + "formatString='" + formatString + '\'' + + ", zoneId=" + zoneId + + '}'; } } diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketerTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketerTest.java new file mode 100644 index 00000000000..3ce9f496c7f --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketerTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs.bucketing; + +import org.apache.flink.streaming.connectors.fs.Clock; + +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +import java.time.ZoneId; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link DateTimeBucketer}. + */ +public class DateTimeBucketerTest { + private static final long TEST_TIME_IN_MILLIS = 1533363082011L; + private static final Path TEST_PATH = new Path("test"); + + private static final Clock mockedClock = new MockedClock(); + + @Test + public void testGetBucketPathWithSpecifiedTimezone() { + DateTimeBucketer bucketer = new DateTimeBucketer(ZoneId.of("America/Los_Angeles")); + + assertEquals(new Path("test/2018-08-03--23"), bucketer.getBucketPath(mockedClock, TEST_PATH, null)); + } + + @Test + public void testGetBucketPathWithSpecifiedFormatString() { + DateTimeBucketer bucketer = new DateTimeBucketer("yyyy-MM-dd-HH", ZoneId.of("America/Los_Angeles")); + + assertEquals(new Path("test/2018-08-03-23"), bucketer.getBucketPath(mockedClock, TEST_PATH, null)); + } + + private static class MockedClock implements Clock { + + @Override + public long currentTimeMillis() { + return TEST_TIME_IN_MILLIS; + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java index 5d30f39fa2e..32e3b2a864b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java @@ -21,9 +21,11 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; +import org.apache.flink.util.Preconditions; -import java.text.SimpleDateFormat; -import java.util.Date; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; /** * A {@link BucketAssigner} that assigns to buckets based on current system time. @@ -37,14 +39,14 @@ * user provided format string. * * - * <p>{@link SimpleDateFormat} is used to derive a date string from the current system time and + * <p>{@link DateTimeFormatter} is used to derive a date string from the current system time and * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling * files will have a granularity of hours. * * <p>Example: * * <pre>{@code - * Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH"); + * BucketAssigner bucketAssigner = new DateTimeBucketAssigner("yyyy-MM-dd--HH"); * }</pre> * * <p>This will create for example the following bucket path: @@ -59,32 +61,55 @@ private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH"; private final String formatString; + private final ZoneId zoneId; - private transient SimpleDateFormat dateFormatter; + private transient DateTimeFormatter dateTimeFormatter; /** - * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}. + * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"}. */ public DateTimeBucketAssigner() { this(DEFAULT_FORMAT_STRING); } /** - * Creates a new {@code DateTimeBucketer} with the given date/time format string. + * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string. * * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine - * the bucket path. + * the bucket id. */ public DateTimeBucketAssigner(String formatString) { - this.formatString = formatString; + this(formatString, ZoneId.systemDefault()); + } + + /** + * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"} using the given timezone. + * + * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id. + */ + public DateTimeBucketAssigner(ZoneId zoneId) { + this(DEFAULT_FORMAT_STRING, zoneId); + } + + /** + * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string using the given timezone. + * + * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine + * the bucket path. + * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id. + */ + public DateTimeBucketAssigner(String formatString, ZoneId zoneId) { + this.formatString = Preconditions.checkNotNull(formatString); + this.zoneId = Preconditions.checkNotNull(zoneId); } @Override public String getBucketId(IN element, BucketAssigner.Context context) { - if (dateFormatter == null) { - dateFormatter = new SimpleDateFormat(formatString); + if (dateTimeFormatter == null) { + dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId); } - return dateFormatter.format(new Date(context.currentProcessingTime())); + + return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime())); } @Override @@ -94,6 +119,9 @@ public String getBucketId(IN element, BucketAssigner.Context context) { @Override public String toString() { - return "DateTimeBucketAssigner{formatString='" + formatString + '\'' + '}'; + return "DateTimeBucketAssigner{" + + "formatString='" + formatString + '\'' + + ", zoneId=" + zoneId + + '}'; } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssignerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssignerTest.java new file mode 100644 index 00000000000..b4fac1837d3 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssignerTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners; + +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.time.ZoneId; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link DateTimeBucketAssigner}. + */ +public class DateTimeBucketAssignerTest { + private static final long TEST_TIME_IN_MILLIS = 1533363082011L; + + private static final MockedContext mockedContext = new MockedContext(); + + @Test + public void testGetBucketPathWithSpecifiedTimezone() { + DateTimeBucketAssigner bucketAssigner = new DateTimeBucketAssigner(ZoneId.of("America/Los_Angeles")); + + assertEquals("2018-08-03--23", bucketAssigner.getBucketId(null, mockedContext)); + } + + @Test + public void testGetBucketPathWithSpecifiedFormatString() { + DateTimeBucketAssigner bucketAssigner = new DateTimeBucketAssigner("yyyy-MM-dd-HH", ZoneId.of("America/Los_Angeles")); + + assertEquals("2018-08-03-23", bucketAssigner.getBucketId(null, mockedContext)); + } + + private static class MockedContext implements BucketAssigner.Context { + @Override + public long currentProcessingTime() { + return TEST_TIME_IN_MILLIS; + } + + @Override + public long currentWatermark() { + throw new UnsupportedOperationException(); + } + + @Nullable + @Override + public Long timestamp() { + return null; + } + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --------------------------------------------------- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors > Affects Versions: 1.5.1, 1.6.0 > Reporter: Bowen Li > Assignee: Bowen Li > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)