To the first issue your are facing:

In BeamSQL, we tried to solve the similar requirement.

BeamSQL supports reading JSON format message from Pubsub, writing to
Bigquery and writing messages that fail to parse in another Pubsub topic.
BeamSQL uses the pre-processing transform to parse JSON payload, like what
you have done. In that transform, BeamSQL tags problematic messages by
PubsubMessageToRow.java#L86
<https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java#L86>,
and after that transform, it adds another transform to send tagged messages
to a separate topic PubsubIOJsonTable.java#L148
<:https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java#L148>
.


-Rui

On Fri, Nov 16, 2018 at 1:58 PM Jeff Klukas <jklu...@mozilla.com> wrote:

> I'm trying to write a robust pipeline that takes input from PubSub and
> writes to BigQuery. For every PubsubMessage that is not successfully
> written to BigQuery, I'd like to get the original PubsubMessage back and be
> able to write to an error output collection. I'm not sure this is quite
> possible, though.
>
> The first issue is that BigQueryIO's withFormatFunction doesn't seem to
> provide any error handling. If my formatFunction raises an exception, it
> will bubble up to a PipelineExecutionException and kill the job. I'd like
> to be able to catch the exception instead and send the original payload to
> an error output collection. To get around this, we're using a
> pre-processing transform to parse our JSON payload into a TableRow as a
> separate step, then calling BigQueryIO.writeTableRows (which is documented
> as something to avoid).
>
> Similarly, I'd like to be able to recover the original message if a
> failure occurs after formatFunction and BigQuery rejects the insert.
> WriteResult.getFailedInserts() initially seemed to do this, but it looks to
> always return TableRow rather than the original pre-formatFunction message.
>
> Also, I found that failed inserts by default raise an exception that stops
> processing. I found that I had to set
> InsertRetryPolicy.retryTransientErrors() in order to avoid failed inserts
> bubbling up to PipelineExecutionException.
>
> Are there details I'm missing of the existing API that would allow me to
> do the kind of error handling I'm talking about here?
> Is setting a non-default InsertRetryPolicy required for getFailedInserts,
> or is that a bug?
> Do others see a need for changes to the BigQueryIO.Write API to enable
> better error handling?
>

Reply via email to