I guess, an even more ideal approach would be something like this, which
also seems more doable:

def third_char_is_an_a(word):
    if word[2] == 'a':
        return [word]
    return []

output = (p
        | 'read' >>
ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
        | 'recorder' >> CaptureDownstreamFailures('my_failure_key')
        | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
        | 'find_words_where_third_char_is_an_a' >>
(beam.FlatMap(third_char_is_an_a)))

output | 'failures' >> Failures('my_failure_key') >>
WriteToText('gs://my-bucket/failures')
output | 'write' >> WriteToText('gs://my-bucket/output')

​

On Thu, Jun 22, 2017 at 5:34 PM, Dmitry Demeshchuk <[email protected]>
wrote:

> Hi folks,
>
> I’ve been recently struggling with the following problem.
>
> Data is quirky. It can have unicode, it can have poor escaping, or can be
> truncated, etc. Sometimes it happens due to problems with the processing
> code, sometimes it’s the data producing code, sometimes both.
>
> That said, if my data pipeline fails somewhere, I’d like to dump the
> problematic piece of data somewhere for later analysis. Suppose we have a
> pipeline:
>
> def third_char_is_an_a(word):
>     if word[2] == 'a':
>         return [word]
>     return []
>
> output = (p
>         | 'read' >> 
> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
>         | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
>         | 'find_words_where_third_char_is_an_a' >> 
> (beam.FlatMap(third_char_is_an_a)))
>
> output | 'write' >> WriteToText('gs://my-bucket/output')
>
> This pipeline will be failing, because I’m an idiot, and English has some
> words shorter than 3 characters. What I’d like, however, is being able to
> easily record these failures. For example I can just rewrite the whole
> pipeline:
>
> def third_char_is_an_a(word):
>     try:
>         if word[2] == 'a':
>             return []
>         return []
>     except Exception:
>         return [word]
>
> output = (p
>         | 'read' >> 
> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
>         | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
>         | 'find_words_where_third_char_is_an_a' >> 
> (beam.FlatMap(third_char_is_an_a)))
>
> output | 'write' >> WriteToText('gs://my-bucket/failed_words')
>
> If I wanted to still keep the succeeded results, I’d normally need to
> write something more complicated:
>
> def third_char_is_an_a(word):
>     try:
>         if word[2] == 'a':
>             return [(1, word)]
>         return []
>     except Exception:
>         return [(0, word)]
>
> output = (p
>         | 'read' >> 
> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
>         | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
>         | 'find_words_where_third_char_is_an_a' >> 
> (beam.FlatMap(third_char_is_an_a))
>
>         | 'partition' >> (beam.Partition(lambda x: x[0])))
>
> successes = output[1] | 'extract' >> beam.Map(lambda x: x[1])
> failures = output[0] | 'extract' >> beam.Map(lambda x: x[0])
>
> successes | 'write' >> WriteToText('gs://my-bucket/failed_words')
> failures | 'write' >> WriteToText('gs://my-bucket/failed_words')
>
> Would it be possible to instead make a generic PTransform named like
> DoOrReportBadData that allows doing something like this?
>
> def third_char_is_an_a(word):
>     if word[2] == 'a':
>         return [word]
>     return []
>
> failure_sink = WriteToText('gs://my-bucket/failed_words')
>
> output = (p
>         | 'read' >> 
> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
>         | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
>         | 'find_words_where_third_char_is_an_a' >> 
> DoOrRecordBadData(beam.FlatMap(third_char_is_an_a), 
> failure_sink=WriteToText('gs://my-bucket/failed_words')))
>
> output | 'write' >> WriteToText('gs://my-bucket/output')
>
> I’ve been trying to think of a way to implement that for any arbitrary
> PTransform, but in vain. It’s easy enough to implement that for a DoFn,
> and maybe that’s what I should do for starters?
>
> Also, this begs the second question. Can we somehow report the failures
> back to the upstream step? Say, instead of recording the actual word that
> failed, I’d rather record the initial data. For example:
>
> def third_char_is_an_a(word):
>     if word[2] == 'a':
>         return [word]
>     return []
>
> output = (p
>         | 'read' >> 
> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
>         | 'reporter' >> 
> ReportDownstreamFailures(WriteToText('gs://my-bucket/failed_words'))
>         | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
>         | 'find_words_where_third_char_is_an_a' >> 
> (beam.FlatMap(third_char_is_an_a)))
>
> output | 'write' >> WriteToText('gs://my-bucket/output')
>
> My guess is that the second option isn’t possible, but I’m still learning
> Beam, so may be wrong on that. I think either option (but especially the
> second one) would be super useful down the road for stream processing of
> data, so that pipelines can have some sort of a dumping ground for
> problematic items (which can be then looked into by human beings), while
> overall the pipeline is still running.
>
> Any thoughts would be very appreciated.
> ​
> --
> Best regards,
> Dmitry Demeshchuk.
>



-- 
Best regards,
Dmitry Demeshchuk.

Reply via email to