Hi, I currently try to stream data into BigQuery table using Beam. As retry policy for failed tuples I’m using InsertRetryPolicy.retryTransientErrors() (s.b.). When looking on the code my expectation was, that that rows written, which caused an error with the reason “invalid” will not be retried. However what I’m observing is, that these rows are retried over and over again.
Are my assumptions wrong? I’m doing something wrong, or is this a bug?
Any help?
Thanks,
Carsten
Code Snippet:
p.apply(BigQueryIO.writeTableRows()
.to(new DatePartitionedTableSpecifier(tableReference, "tracking
data"))
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
// write all failed inserts to a DMQ
.getFailedInserts().apply(MapElements.via(new
SimpleFunction<TableRow, PubsubMessage>() {
public PubsubMessage apply(final TableRow _row) {
try {
return new
PubsubMessage(JacksonFactory.getDefaultInstance().toByteArray(_row),
Collections.<String, String>emptyMap());
} catch (IOException e) {
throw new RuntimeException("failed to write to DMQ", e);
}
}
})).apply(PubsubIO.writeMessages().to("projects/gameduell-bits-bigquery-poc/topics/dmq"));
Error:
(1a04bdb0d43aca9c): java.lang.RuntimeException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad
Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException:
400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad
Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException:
400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad
Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException:
400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad
Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException:
400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad
Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException:
400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad
Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException:
400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
signature.asc
Description: Message signed with OpenPGP
