I guess, an even more ideal approach would be something like this, which
also seems more doable:
def third_char_is_an_a(word):
if word[2] == 'a':
return [word]
return []
output = (p
| 'read' >>
ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
| 'recorder' >> CaptureDownstreamFailures('my_failure_key')
| 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| 'find_words_where_third_char_is_an_a' >>
(beam.FlatMap(third_char_is_an_a)))
output | 'failures' >> Failures('my_failure_key') >>
WriteToText('gs://my-bucket/failures')
output | 'write' >> WriteToText('gs://my-bucket/output')
On Thu, Jun 22, 2017 at 5:34 PM, Dmitry Demeshchuk <[email protected]>
wrote:
> Hi folks,
>
> I’ve been recently struggling with the following problem.
>
> Data is quirky. It can have unicode, it can have poor escaping, or can be
> truncated, etc. Sometimes it happens due to problems with the processing
> code, sometimes it’s the data producing code, sometimes both.
>
> That said, if my data pipeline fails somewhere, I’d like to dump the
> problematic piece of data somewhere for later analysis. Suppose we have a
> pipeline:
>
> def third_char_is_an_a(word):
> if word[2] == 'a':
> return [word]
> return []
>
> output = (p
> | 'read' >>
> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
> | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
> | 'find_words_where_third_char_is_an_a' >>
> (beam.FlatMap(third_char_is_an_a)))
>
> output | 'write' >> WriteToText('gs://my-bucket/output')
>
> This pipeline will be failing, because I’m an idiot, and English has some
> words shorter than 3 characters. What I’d like, however, is being able to
> easily record these failures. For example I can just rewrite the whole
> pipeline:
>
> def third_char_is_an_a(word):
> try:
> if word[2] == 'a':
> return []
> return []
> except Exception:
> return [word]
>
> output = (p
> | 'read' >>
> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
> | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
> | 'find_words_where_third_char_is_an_a' >>
> (beam.FlatMap(third_char_is_an_a)))
>
> output | 'write' >> WriteToText('gs://my-bucket/failed_words')
>
> If I wanted to still keep the succeeded results, I’d normally need to
> write something more complicated:
>
> def third_char_is_an_a(word):
> try:
> if word[2] == 'a':
> return [(1, word)]
> return []
> except Exception:
> return [(0, word)]
>
> output = (p
> | 'read' >>
> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
> | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
> | 'find_words_where_third_char_is_an_a' >>
> (beam.FlatMap(third_char_is_an_a))
>
> | 'partition' >> (beam.Partition(lambda x: x[0])))
>
> successes = output[1] | 'extract' >> beam.Map(lambda x: x[1])
> failures = output[0] | 'extract' >> beam.Map(lambda x: x[0])
>
> successes | 'write' >> WriteToText('gs://my-bucket/failed_words')
> failures | 'write' >> WriteToText('gs://my-bucket/failed_words')
>
> Would it be possible to instead make a generic PTransform named like
> DoOrReportBadData that allows doing something like this?
>
> def third_char_is_an_a(word):
> if word[2] == 'a':
> return [word]
> return []
>
> failure_sink = WriteToText('gs://my-bucket/failed_words')
>
> output = (p
> | 'read' >>
> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
> | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
> | 'find_words_where_third_char_is_an_a' >>
> DoOrRecordBadData(beam.FlatMap(third_char_is_an_a),
> failure_sink=WriteToText('gs://my-bucket/failed_words')))
>
> output | 'write' >> WriteToText('gs://my-bucket/output')
>
> I’ve been trying to think of a way to implement that for any arbitrary
> PTransform, but in vain. It’s easy enough to implement that for a DoFn,
> and maybe that’s what I should do for starters?
>
> Also, this begs the second question. Can we somehow report the failures
> back to the upstream step? Say, instead of recording the actual word that
> failed, I’d rather record the initial data. For example:
>
> def third_char_is_an_a(word):
> if word[2] == 'a':
> return [word]
> return []
>
> output = (p
> | 'read' >>
> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
> | 'reporter' >>
> ReportDownstreamFailures(WriteToText('gs://my-bucket/failed_words'))
> | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
> | 'find_words_where_third_char_is_an_a' >>
> (beam.FlatMap(third_char_is_an_a)))
>
> output | 'write' >> WriteToText('gs://my-bucket/output')
>
> My guess is that the second option isn’t possible, but I’m still learning
> Beam, so may be wrong on that. I think either option (but especially the
> second one) would be super useful down the road for stream processing of
> data, so that pipelines can have some sort of a dumping ground for
> problematic items (which can be then looked into by human beings), while
> overall the pipeline is still running.
>
> Any thoughts would be very appreciated.
>
> --
> Best regards,
> Dmitry Demeshchuk.
>
--
Best regards,
Dmitry Demeshchuk.