Hi,

I currently try to stream data into BigQuery table using Beam. As retry policy 
for failed tuples I’m using InsertRetryPolicy.retryTransientErrors() (s.b.). 
When looking on the code my expectation was, that that rows written, which 
caused an error with the reason “invalid” will not be retried. However what I’m 
observing is, that these rows are retried over and over again.

Are my assumptions wrong? I’m doing something wrong, or is this a bug?

Any help?

Thanks,

Carsten


Code Snippet:
p.apply(BigQueryIO.writeTableRows()
                .to(new DatePartitionedTableSpecifier(tableReference, "tracking 
data"))
                .withSchema(schema)
                
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                // write all failed inserts to a DMQ
                .getFailedInserts().apply(MapElements.via(new 
SimpleFunction<TableRow, PubsubMessage>() {
    public PubsubMessage apply(final TableRow _row) {
            try {
                return new 
PubsubMessage(JacksonFactory.getDefaultInstance().toByteArray(_row),
                        Collections.<String, String>emptyMap());
            } catch (IOException e) {
                throw new RuntimeException("failed to write to DMQ", e);
            }
        }
})).apply(PubsubIO.writeMessages().to("projects/gameduell-bits-bigquery-poc/topics/dmq"));

Error:
(1a04bdb0d43aca9c): java.lang.RuntimeException: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 
400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 
400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 
400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 
400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 
400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 
400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)


Attachment: signature.asc
Description: Message signed with OpenPGP

Reply via email to