[ 
https://issues.apache.org/jira/browse/BEAM-14267?focusedWorklogId=756678&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-756678
 ]

ASF GitHub Bot logged work on BEAM-14267:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Apr/22 20:33
            Start Date: 13/Apr/22 20:33
    Worklog Time Spent: 10m 
      Work Description: Abacn commented on code in PR #17305:
URL: https://github.com/apache/beam/pull/17305#discussion_r849875918


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.storage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.google.common.io.ByteStreams;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class GcsMatchIT {
+  /** Integration test for TextIO.MatchAll watching for file updates in gcs 
filesystem */
+  @Test
+  public void testGcsMatchContinuously() throws InterruptedException {
+    TestPipelineOptions options =
+        TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
+    assertNotNull(options.getTempRoot());
+    options.setTempLocation(options.getTempRoot() + 
"/testGcsMatchContinuouslyTest");
+    GcsOptions gcsOptions = options.as(GcsOptions.class);
+    String dstFolderName =
+        gcsOptions.getGcpTempLocation()
+            + String.format(
+                
"/GcsMatchIT-%tF-%<tH-%<tM-%<tS-%<tL.testGcsMatchContinuously.copy/", new 
Date());
+    final GcsPath watchPath = GcsPath.fromUri(dstFolderName);
+
+    Pipeline p = Pipeline.create(options);
+
+    PCollection<Metadata> matchAllUpdatedMetadata =
+        p.apply("create for matchAll updated files", 
Create.of(watchPath.resolve("*").toString()))
+            .apply(
+                "matchAll updated",
+                FileIO.matchAll()
+                    .continuously(
+                        Duration.millis(250),
+                        
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)),
+                        true));
+
+    // Copy the files to the "watch" directory;
+    Thread writer =
+        new Thread(
+            () -> {
+              try {
+                Thread.sleep(1000);

Review Comment:
   This test basically duplicates 
`org.apache.beam.sdk.io.testMatchWatchForNewFiles` and I am aware of that 
adding sleep makes the test time consuming. Also curious to see if there is a 
good idea





Issue Time Tracking
-------------------

    Worklog Id:     (was: 756678)
    Time Spent: 2h 10m  (was: 2h)

> Update watchForNewFiles to allow reading already read files with a new 
> timestamp
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-14267
>                 URL: https://issues.apache.org/jira/browse/BEAM-14267
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-files
>            Reporter: Yi Hu
>            Assignee: Yi Hu
>            Priority: P2
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> In TextIO and AvroIO, we have a configuration option called watchForNewFiles, 
> and in FileIO.MatchConfiguration, we have an option called watchInterval. 
> Right now, these match any files according to the filtering criteria, and 
> then periodically check for new files. A file is determined to be new if it 
> has a different filename than a file that has already been read.
> We want to add an option to choose to consider a file new if it has a 
> different timestamp from an existing file, even if the file itself has the 
> same name.
> See the following design doc for more detail:
> [https://docs.google.com/document/d/1xnacyLGNh6rbPGgTAh5D1gZVR8rHUBsMMRV3YkvlL08/edit?usp=sharing&resourcekey=0-be0uF-DdmwAz6Vg4Li9FNw]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to