[ https://issues.apache.org/jira/browse/BEAM-14267?focusedWorklogId=756678&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-756678 ]
ASF GitHub Bot logged work on BEAM-14267: ----------------------------------------- Author: ASF GitHub Bot Created on: 13/Apr/22 20:33 Start Date: 13/Apr/22 20:33 Worklog Time Spent: 10m Work Description: Abacn commented on code in PR #17305: URL: https://github.com/apache/beam/pull/17305#discussion_r849875918 ########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.Watch; +import org.apache.beam.sdk.util.MimeTypes; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class GcsMatchIT { + /** Integration test for TextIO.MatchAll watching for file updates in gcs filesystem */ + @Test + public void testGcsMatchContinuously() throws InterruptedException { + TestPipelineOptions options = + TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); + assertNotNull(options.getTempRoot()); + options.setTempLocation(options.getTempRoot() + "/testGcsMatchContinuouslyTest"); + GcsOptions gcsOptions = options.as(GcsOptions.class); + String dstFolderName = + gcsOptions.getGcpTempLocation() + + String.format( + "/GcsMatchIT-%tF-%<tH-%<tM-%<tS-%<tL.testGcsMatchContinuously.copy/", new Date()); + final GcsPath watchPath = GcsPath.fromUri(dstFolderName); + + Pipeline p = Pipeline.create(options); + + PCollection<Metadata> matchAllUpdatedMetadata = + p.apply("create for matchAll updated files", Create.of(watchPath.resolve("*").toString())) + .apply( + "matchAll updated", + FileIO.matchAll() + .continuously( + Duration.millis(250), + Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)), + true)); + + // Copy the files to the "watch" directory; + Thread writer = + new Thread( + () -> { + try { + Thread.sleep(1000); Review Comment: This test basically duplicates `org.apache.beam.sdk.io.testMatchWatchForNewFiles` and I am aware of that adding sleep makes the test time consuming. Also curious to see if there is a good idea Issue Time Tracking ------------------- Worklog Id: (was: 756678) Time Spent: 2h 10m (was: 2h) > Update watchForNewFiles to allow reading already read files with a new > timestamp > -------------------------------------------------------------------------------- > > Key: BEAM-14267 > URL: https://issues.apache.org/jira/browse/BEAM-14267 > Project: Beam > Issue Type: New Feature > Components: io-java-files > Reporter: Yi Hu > Assignee: Yi Hu > Priority: P2 > Time Spent: 2h 10m > Remaining Estimate: 0h > > In TextIO and AvroIO, we have a configuration option called watchForNewFiles, > and in FileIO.MatchConfiguration, we have an option called watchInterval. > Right now, these match any files according to the filtering criteria, and > then periodically check for new files. A file is determined to be new if it > has a different filename than a file that has already been read. > We want to add an option to choose to consider a file new if it has a > different timestamp from an existing file, even if the file itself has the > same name. > See the following design doc for more detail: > [https://docs.google.com/document/d/1xnacyLGNh6rbPGgTAh5D1gZVR8rHUBsMMRV3YkvlL08/edit?usp=sharing&resourcekey=0-be0uF-DdmwAz6Vg4Li9FNw] > -- This message was sent by Atlassian Jira (v8.20.1#820001)