Hello,
I am trying a simple word count pipeline in a streaming environment using
TestStream (Python SDK). While it works with the DirectRunner, it fails on
the FlinkRunner with the following error. It looks like a type casting
issue.
Traceback (most recent call last):
File
"/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py",
line 78, in <module>
run()
File
"/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py",
line 74, in run
p.run().wait_until_finish()
File
"/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
line 576, in wait_until_finish
raise self._runtime_exception
RuntimeError: Pipeline
BeamApp-jaehyeon-0314203421-dfc96365_ba750d30-ff27-439d-a6ad-ce835f88fdf7
failed in state FAILED: java.lang.ClassCastException: class
java.lang.String cannot be cast to class [B (java.lang.String and [B are in
module java.base of loader 'bootstrap')
Can you please inform me how to fix it? Below shows the pipeline code.
Cheers,
Jaehyeon
import os
import datetime
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.coders import coders
from apache_beam.transforms import window
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import TimestampedValue
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
def read_file(filename: str, inputpath: str):
with open(os.path.join(inputpath, filename), "r") as f:
return f.readlines()
def tokenize(element: str):
return re.findall(r"[A-Za-z\']+", element)
def run():
parser = argparse.ArgumentParser(description="Beam pipeline arguments")
parser.add_argument(
"--inputs",
default="inputs",
help="Specify folder name that event records are saved",
)
parser.add_argument(
"--runner", default="DirectRunner", help="Specify Apache Beam
Runner"
)
opts = parser.parse_args()
# PARENT_DIR =
os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
options = PipelineOptions()
options.view_as(StandardOptions).runner = opts.runner
lines = [
"Lorem ipsum dolor sit amet, consectetuer adipiscing elit.
Vestibulum erat nulla, ullamcorper nec, rutrum non, nonummy ac, erat. Nulla
non lectus sed nisl molestie malesuada. Cras elementum. Integer in sapien.
Mauris elementum mauris vitae tortor. Aliquam ante. Cum sociis natoque
penatibus et magnis dis parturient montes, nascetur ridiculus mus. In enim
a arcu imperdiet malesuada. Neque porro quisquam est, qui dolorem ipsum
quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius
modi tempora incidunt ut labore et dolore magnam aliquam quaerat
voluptatem. Aliquam erat volutpat. Excepteur sint occaecat cupidatat non
proident, sunt in culpa qui officia deserunt mollit anim id est laborum. In
enim a arcu imperdiet malesuada. Class aptent taciti sociosqu ad litora
torquent per conubia nostra, per inceptos hymenaeos."
"Duis pulvinar. Integer pellentesque quam vel velit. Sed convallis
magna eu sem. Phasellus rhoncus. Aliquam erat volutpat. Quisque porta.
Maecenas fermentum, sem in pharetra pellentesque, velit turpis volutpat
ante, in pharetra metus odio a lectus. Fusce suscipit libero eget elit.
Curabitur vitae diam non enim vestibulum interdum. Nam quis nulla. Etiam
dui sem, fermentum vitae, sagittis id, malesuada in, quam. Aliquam ornare
wisi eu metus. Aenean vel massa quis mauris vehicula lacinia. Nam libero
tempore, cum soluta nobis est eligendi optio cumque nihil impedit quo minus
id quod maxime placeat facere possimus, omnis voluptas assumenda est, omnis
dolor repellendus."
]
# lines = read_file("lorem.txt", os.path.join(PARENT_DIR, "inputs"))
now = int(datetime.datetime.now().timestamp() * 1000)
test_stream = (
TestStream(coder=coders.StrUtf8Coder())
.add_elements(
[TimestampedValue(lines[i], now + 1000) for i in range(len(lines
))]
)
.advance_watermark_to_infinity()
)
p = beam.Pipeline(options=options)
(
p
| "Read stream" >> test_stream
| "Extract words" >> beam.FlatMap(tokenize)
| "Windowing"
>> beam.WindowInto(
window.GlobalWindows(),
trigger=AfterWatermark(),
accumulation_mode=AccumulationMode.DISCARDING,
)
| "Count per word" >> beam.combiners.Count.PerElement()
| beam.Map(print)
)
logging.getLogger().setLevel(logging.INFO)
logging.info("Building pipeline ...")
p.run().wait_until_finish()
if __name__ == "__main__":
run()