Hi folks,

I’ve been recently struggling with the following problem.

Data is quirky. It can have unicode, it can have poor escaping, or can be
truncated, etc. Sometimes it happens due to problems with the processing
code, sometimes it’s the data producing code, sometimes both.

That said, if my data pipeline fails somewhere, I’d like to dump the
problematic piece of data somewhere for later analysis. Suppose we have a
pipeline:

def third_char_is_an_a(word):
    if word[2] == 'a':
        return [word]
    return []

output = (p
        | 'read' >>
ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
        | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
        | 'find_words_where_third_char_is_an_a' >>
(beam.FlatMap(third_char_is_an_a)))

output | 'write' >> WriteToText('gs://my-bucket/output')

This pipeline will be failing, because I’m an idiot, and English has some
words shorter than 3 characters. What I’d like, however, is being able to
easily record these failures. For example I can just rewrite the whole
pipeline:

def third_char_is_an_a(word):
    try:
        if word[2] == 'a':
            return []
        return []
    except Exception:
        return [word]

output = (p
        | 'read' >>
ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
        | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
        | 'find_words_where_third_char_is_an_a' >>
(beam.FlatMap(third_char_is_an_a)))

output | 'write' >> WriteToText('gs://my-bucket/failed_words')

If I wanted to still keep the succeeded results, I’d normally need to write
something more complicated:

def third_char_is_an_a(word):
    try:
        if word[2] == 'a':
            return [(1, word)]
        return []
    except Exception:
        return [(0, word)]

output = (p
        | 'read' >>
ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
        | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
        | 'find_words_where_third_char_is_an_a' >>
(beam.FlatMap(third_char_is_an_a))

        | 'partition' >> (beam.Partition(lambda x: x[0])))

successes = output[1] | 'extract' >> beam.Map(lambda x: x[1])
failures = output[0] | 'extract' >> beam.Map(lambda x: x[0])

successes | 'write' >> WriteToText('gs://my-bucket/failed_words')
failures | 'write' >> WriteToText('gs://my-bucket/failed_words')

Would it be possible to instead make a generic PTransform named like
DoOrReportBadData that allows doing something like this?

def third_char_is_an_a(word):
    if word[2] == 'a':
        return [word]
    return []

failure_sink = WriteToText('gs://my-bucket/failed_words')

output = (p
        | 'read' >>
ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
        | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
        | 'find_words_where_third_char_is_an_a' >>
DoOrRecordBadData(beam.FlatMap(third_char_is_an_a),
failure_sink=WriteToText('gs://my-bucket/failed_words')))

output | 'write' >> WriteToText('gs://my-bucket/output')

I’ve been trying to think of a way to implement that for any arbitrary
PTransform, but in vain. It’s easy enough to implement that for a DoFn, and
maybe that’s what I should do for starters?

Also, this begs the second question. Can we somehow report the failures
back to the upstream step? Say, instead of recording the actual word that
failed, I’d rather record the initial data. For example:

def third_char_is_an_a(word):
    if word[2] == 'a':
        return [word]
    return []

output = (p
        | 'read' >>
ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
        | 'reporter' >>
ReportDownstreamFailures(WriteToText('gs://my-bucket/failed_words'))
        | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
        | 'find_words_where_third_char_is_an_a' >>
(beam.FlatMap(third_char_is_an_a)))

output | 'write' >> WriteToText('gs://my-bucket/output')

My guess is that the second option isn’t possible, but I’m still learning
Beam, so may be wrong on that. I think either option (but especially the
second one) would be super useful down the road for stream processing of
data, so that pipelines can have some sort of a dumping ground for
problematic items (which can be then looked into by human beings), while
overall the pipeline is still running.

Any thoughts would be very appreciated.
​
-- 
Best regards,
Dmitry Demeshchuk.

Reply via email to