Hello. I would like to ask for help with resolving dependency issue for imported module.
I have a directory structure as below and I am trying to import Frames class from frames.py to main.py. ========================================= quality-validation/ bin/setup.py main.py modules/ frames.py <TRIMMED> ========================================= However, when I run pipeline, I get below error at TaskManager. ========================================= <TRIMMED> File "apache_beam/runners/common.py", line 942, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 799, in apache_beam.runners.common.DoFnRunner.receive File "apache_beam/runners/common.py", line 805, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 872, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py", line 419, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 803, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 465, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 918, in apache_beam.runners.common._OutputProcessor.process_outputs File "/home/admin/quality-validation/bin/main.py", line 44, in process ImportError: No module named 'frames' [while running 'combined:flat/ParDo(FlattenTagFilesFn)'] ========================================= I import modules at global context and also at top of the process function . ========================================= [main.py] # # Standard library imports # import logging import pprint import sys sys.path.append("{0}/modules".format(sys.path[0])) sys.path.append("{0}/modules/vendor".format(sys.path[0])) # # Related third party imports # import apache_beam as beam # # Local application/library specific imports # import utils from pipeline_wrapper import pipelineWrapper from tag_counts import TagCounts from tags import Tags <TRIMMED> class FlattenTagFilesFn(beam.DoFn): def __init__(self, s3Bucket, s3Creds, maxKeys=1000): beam.DoFn.__init__(self) self.s3Bucket = s3Bucket self.s3Creds = s3Creds self.maxKeys = maxKeys def process(self, elem): import yaml from frames import Frames if not hasattr(self, 's3Client'): import boto3 self.s3Client = boto3.client('s3', aws_access_key_id=self.s3Creds[0], aws_secret_access_key=self.s3Creds[1]) (key, info) = elem preFrm = {} resp1 = self.s3Client.get_object(Bucket=self.s3Bucket, Key=info['pre'][0][0]) yaml1 = yaml.load(resp1['Body']) for elem in yaml1['body']: preFrm[ elem['frame_tag']['frame_no'] ] = elem postFrm = {} resp2 = self.s3Client.get_object(Bucket=self.s3Bucket, Key=info['post'][0][0]) yaml2 = yaml.load(resp2['Body']) for elem in yaml2['body']: postFrm[ elem['frame_tag']['frame_no'] ] = elem commonFrmNums = set(list(preFrm.keys())).intersection(list(postFrm.keys())) for f in commonFrmNums: frames = Frames( self.s3Bucket, info['pre'][0][0], # Pre S3Key info['post'][0][0], # Post S3Key yaml1['head']['operator_id'], # Pre OperatorId yaml2['head']['operator_id'], # Post OperatorId preFrm[f], # Pre Frame Line postFrm[f], # Post Frame Line info['pre'][0][1], # Pre Last Modified Time info['post'][0][1]) # Post Last Modified Time yield (frames) tagCounts = TagCounts( self.s3Bucket, yaml1, # Pre Yaml yaml2, # Post Yaml info['pre'][0][0], # Pre S3Key info['post'][0][0], # Post S3Key info['pre'][0][1], # Pre Last Modified Time info['post'][0][1] ) # Post Last Modified Time yield beam.pvalue.TaggedOutput('counts', tagCounts) ========================================= My pipeline options are below. I tried with and without "setup_file" but made no difference. ========================================= options = PipelineOptions([ "--runner=PortableRunner", "--environment_config={0}".format(self.__docker_registry), "--environment_type=DOCKER", "--experiments=beam_fn_api", "--job_endpoint=localhost:8099" ]) options.view_as(SetupOptions).save_main_session = True options.view_as(SetupOptions).setup_file = '/home/admin/quality-validation/bin/setup.py' ========================================= Is it possible to solve dependency in ParDo linked to external module when using Apache Flink? Thanks, Yu Watanabe -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>