Hello.

I would like to ask for help with resolving dependency issue for imported
module.

I have a directory structure as below and I am trying to import Frames
class from frames.py to main.py.
=========================================
quality-validation/
    bin/setup.py
          main.py
          modules/
            frames.py
           <TRIMMED>
=========================================

However, when I run pipeline, I get below error at TaskManager.
=========================================
<TRIMMED>
  File "apache_beam/runners/common.py", line 942, in
apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 143, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 593, in
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 594, in
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 799, in
apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 805, in
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 872, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py",
line 419, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 803, in
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 465, in
apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 918, in
apache_beam.runners.common._OutputProcessor.process_outputs
  File "/home/admin/quality-validation/bin/main.py", line 44, in process
ImportError: No module named 'frames' [while running
'combined:flat/ParDo(FlattenTagFilesFn)']
=========================================

I  import modules at global context  and also at top of the process
function .
=========================================
[main.py]
#
# Standard library imports
#
import logging
import pprint
import sys
sys.path.append("{0}/modules".format(sys.path[0]))
sys.path.append("{0}/modules/vendor".format(sys.path[0]))

#
# Related third party imports
#
import apache_beam as beam

#
# Local application/library specific imports
#
import utils
from pipeline_wrapper import pipelineWrapper
from tag_counts import TagCounts
from tags import Tags

<TRIMMED>

class FlattenTagFilesFn(beam.DoFn):
    def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
        beam.DoFn.__init__(self)

        self.s3Bucket = s3Bucket
        self.s3Creds  = s3Creds
        self.maxKeys  = maxKeys

    def process(self, elem):
        import yaml
        from frames import Frames

        if not hasattr(self, 's3Client'):
            import boto3
            self.s3Client = boto3.client('s3',
                                aws_access_key_id=self.s3Creds[0],
                                aws_secret_access_key=self.s3Creds[1])

        (key, info) = elem

        preFrm = {}
        resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
Key=info['pre'][0][0])
        yaml1 = yaml.load(resp1['Body'])

        for elem in yaml1['body']:
            preFrm[ elem['frame_tag']['frame_no'] ] = elem

        postFrm = {}
        resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
Key=info['post'][0][0])
        yaml2 = yaml.load(resp2['Body'])

        for elem in yaml2['body']:
            postFrm[ elem['frame_tag']['frame_no'] ] = elem

        commonFrmNums =
set(list(preFrm.keys())).intersection(list(postFrm.keys()))

        for f in commonFrmNums:
            frames = Frames(
                          self.s3Bucket,
                          info['pre'][0][0],            # Pre S3Key
                          info['post'][0][0],           # Post S3Key
                          yaml1['head']['operator_id'], # Pre OperatorId
                          yaml2['head']['operator_id'], # Post OperatorId
                          preFrm[f],                    # Pre Frame Line
                          postFrm[f],                   # Post Frame Line
                          info['pre'][0][1],            # Pre Last Modified
Time
                          info['post'][0][1])           # Post Last
Modified Time

            yield (frames)

        tagCounts = TagCounts(
                         self.s3Bucket,
                         yaml1,               # Pre Yaml
                         yaml2,               # Post Yaml
                         info['pre'][0][0],   # Pre S3Key
                         info['post'][0][0],  # Post S3Key
                         info['pre'][0][1],   # Pre Last Modified Time
                         info['post'][0][1] ) # Post Last Modified Time

        yield beam.pvalue.TaggedOutput('counts', tagCounts)
=========================================

My pipeline options are below. I tried with and without "setup_file" but
made no difference.
=========================================
        options = PipelineOptions([
                      "--runner=PortableRunner",

"--environment_config={0}".format(self.__docker_registry),
                      "--environment_type=DOCKER",
                      "--experiments=beam_fn_api",
                      "--job_endpoint=localhost:8099"
                  ])
        options.view_as(SetupOptions).save_main_session = True
        options.view_as(SetupOptions).setup_file =
'/home/admin/quality-validation/bin/setup.py'
=========================================

Is it possible to solve dependency in ParDo linked to external module when
using Apache Flink?

Thanks,
Yu Watanabe

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>

Reply via email to