Hello.
I would like to ask for help with resolving dependency issue for imported
module.
I have a directory structure as below and I am trying to import Frames
class from frames.py to main.py.
=========================================
quality-validation/
bin/setup.py
main.py
modules/
frames.py
<TRIMMED>
=========================================
However, when I run pipeline, I get below error at TaskManager.
=========================================
<TRIMMED>
File "apache_beam/runners/common.py", line 942, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 143, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 593, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 594, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 799, in
apache_beam.runners.common.DoFnRunner.receive
File "apache_beam/runners/common.py", line 805, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 872, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py",
line 419, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 803, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 465, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 918, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "/home/admin/quality-validation/bin/main.py", line 44, in process
ImportError: No module named 'frames' [while running
'combined:flat/ParDo(FlattenTagFilesFn)']
=========================================
I import modules at global context and also at top of the process
function .
=========================================
[main.py]
#
# Standard library imports
#
import logging
import pprint
import sys
sys.path.append("{0}/modules".format(sys.path[0]))
sys.path.append("{0}/modules/vendor".format(sys.path[0]))
#
# Related third party imports
#
import apache_beam as beam
#
# Local application/library specific imports
#
import utils
from pipeline_wrapper import pipelineWrapper
from tag_counts import TagCounts
from tags import Tags
<TRIMMED>
class FlattenTagFilesFn(beam.DoFn):
def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
beam.DoFn.__init__(self)
self.s3Bucket = s3Bucket
self.s3Creds = s3Creds
self.maxKeys = maxKeys
def process(self, elem):
import yaml
from frames import Frames
if not hasattr(self, 's3Client'):
import boto3
self.s3Client = boto3.client('s3',
aws_access_key_id=self.s3Creds[0],
aws_secret_access_key=self.s3Creds[1])
(key, info) = elem
preFrm = {}
resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
Key=info['pre'][0][0])
yaml1 = yaml.load(resp1['Body'])
for elem in yaml1['body']:
preFrm[ elem['frame_tag']['frame_no'] ] = elem
postFrm = {}
resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
Key=info['post'][0][0])
yaml2 = yaml.load(resp2['Body'])
for elem in yaml2['body']:
postFrm[ elem['frame_tag']['frame_no'] ] = elem
commonFrmNums =
set(list(preFrm.keys())).intersection(list(postFrm.keys()))
for f in commonFrmNums:
frames = Frames(
self.s3Bucket,
info['pre'][0][0], # Pre S3Key
info['post'][0][0], # Post S3Key
yaml1['head']['operator_id'], # Pre OperatorId
yaml2['head']['operator_id'], # Post OperatorId
preFrm[f], # Pre Frame Line
postFrm[f], # Post Frame Line
info['pre'][0][1], # Pre Last Modified
Time
info['post'][0][1]) # Post Last
Modified Time
yield (frames)
tagCounts = TagCounts(
self.s3Bucket,
yaml1, # Pre Yaml
yaml2, # Post Yaml
info['pre'][0][0], # Pre S3Key
info['post'][0][0], # Post S3Key
info['pre'][0][1], # Pre Last Modified Time
info['post'][0][1] ) # Post Last Modified Time
yield beam.pvalue.TaggedOutput('counts', tagCounts)
=========================================
My pipeline options are below. I tried with and without "setup_file" but
made no difference.
=========================================
options = PipelineOptions([
"--runner=PortableRunner",
"--environment_config={0}".format(self.__docker_registry),
"--environment_type=DOCKER",
"--experiments=beam_fn_api",
"--job_endpoint=localhost:8099"
])
options.view_as(SetupOptions).save_main_session = True
options.view_as(SetupOptions).setup_file =
'/home/admin/quality-validation/bin/setup.py'
=========================================
Is it possible to solve dependency in ParDo linked to external module when
using Apache Flink?
Thanks,
Yu Watanabe
--
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
[email protected]
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
Twitter icon] <https://twitter.com/yuwtennis>