Apache Beam pipelines have two parts two them. There is code that describes the pipeline shape and what transforms it contains (block 1 and results.apply(...)) and then there is the code that represents those transforms (MapFn in your case) and is executed remotely.
I would take a look at the Beam programming guide to become more familiar with the different components of a pipeline and how code is serialized and sent to workers[1]. 1: https://beam.apache.org/documentation/programming-guide/ On Mon, Jun 29, 2020 at 11:31 PM Sheng Yang <liff...@163.com> wrote: > Thanks Austin, Luke replying my message: > > I did some experiments, these are my code snippets. > > Manen: > > <beam.version>2.21.0</beam.version> > > <dependency> > <groupId>com.google.cloud</groupId> > <artifactId>google-cloud-spanner-jdbc</artifactId> > <version>1.15.0</version> > </dependency> > > <dependency> > <groupId>com.google.cloud</groupId> > <artifactId>google-cloud-spanner</artifactId> > <version>1.56.0</version> > </dependency> > > > Java code: > > public class SpannerJdbcToCsvText { > > private static final Logger LOG = > LoggerFactory.getLogger(SpannerJdbcToCsvText.class); > > public interface SpannerToTextOptions > extends PipelineOptions, > SpannerReadOptions, > JavascriptTextTransformerOptions, > FilesystemWriteOptions { > > } > > public static void main(String[] args) { > LOG.info("Starting pipeline setup"); > PipelineOptionsFactory.register(SpannerToTextOptions.class); > SpannerToTextOptions options = > > PipelineOptionsFactory.fromArgs(args).withValidation().as(SpannerToTextOptions.class); > > FileSystems.setDefaultPipelineOptions(options); > Pipeline pipeline = Pipeline.create(options); > > // ----- block 1 start--------------------- > // block 1 will print out in logs in my local mac desktop. > List<String> list = new ArrayList<>(); > try { > String projectId = "projectId"; > String instanceId = "instanceId"; > String databaseId = "databaseId"; > > String credentialsFile = "/my/mac/local/path/credentials.json"; > > try (Connection connection = > DriverManager.getConnection( > String.format( > > "jdbc:cloudspanner:/projects/%s/instances/%s/databases/%s?credentials=%s", > projectId, instanceId, databaseId, > credentialsFile))) { > try (Statement statement = connection.createStatement()) { > try (ResultSet rs = statement.executeQuery("SELECT name from > t2")) { > while (rs.next()) { > list.add(rs.getString(1)); > LOG.info("print outside get value: " + rs.getString(1)); > } > } > } > } > } > catch(Exception e) { > LOG.error("", e); > } > > PCollection<String> results = > pipeline.apply(Create.of(list).withType(TypeDescriptor.of(String.class))).setCoder(StringUtf8Coder.of()); > // ----- block 1 end--------------------- > > // ----- block 2 start--------------------- > // block 2 will print in server logs > results.apply("print value", ParDo.of(new MapFn())); > // ----- block 2 end--------------------- > > pipeline.run(); > LOG.info("Completed pipeline setup"); > } > } > > > > I ran the program like this: > > mvn compile exec:java \ > > -Dexec.mainClass=com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText \ > -Dexec.args="--runner=DataflowRunner \ > --region=us-central1" > > > There logs printed out in my local console: > > org.apache.beam.runners.dataflow.DataflowRunner - > PipelineOptions.filesToStage was not specified. Defaulting tofiles from the > classpath: will stage 351 files. Enable logging at DEBUG level to see which > files will be staged. > com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText - print outside get > value: myname > com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText - print outside get > value: 2 > com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText - print outside get > value: 3 > com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText - print outside get > value: 4 > org.apache.beam.runners.dataflow.DataflowRunner - Executing pipeline on > the Dataflow Service, which will have billing implications related to > Google Compute Engine usage and other Google Cloud Services. > org.apache.beam.runners.dataflow.util.PackageUtil - Uploading 351 files > from PipelineOptions.filesToStage to staging location to prepare for > execution. > org.apache.beam.runners.dataflow.util.PackageUtil - Uploading > /Users/shengyang/ws/Stubhub-DataPlatform.dataworks-ingestion/target/classes > to > gs://dataflow-staging-us-central1-661544897337/temp/staging/classes-KrjSD-Y0s4i28kG-XmiBiw.jar > > > There logs printed in gcp servers > > > 2020-06-30 09:44:57.483 HKT > Finished processing stage F0 with 0 errors in 0.28 seconds > 2020-06-30 09:44:59.600 HKT > Starting MapTask stage s01 > 2020-06-30 09:45:00.916 HKT > in mapfn - get value:myname > 2020-06-30 09:45:00.934 HKT > Finished processing stage s01 with 0 errors in 1.333 seconds > 2020-06-30 09:45:03.025 HKT > Starting MapTask stage s01 > 2020-06-30 09:45:03.046 HKT > in mapfn - get value:4 > 2020-06-30 09:45:03.047 HKT > Finished processing stage s01 with 0 errors in 0.022 seconds > 2020-06-30 09:45:05.148 HKT > Starting MapTask stage s01 > 2020-06-30 09:45:05.166 HKT > in mapfn - get value:2 > 2020-06-30 09:45:05.176 HKT > Finished processing stage s01 with 0 errors in 0.028 seconds > > > Why Spanner JDBC call happens (in block 1) in my local machine during > compile phase? while MapFn (in block 2) happens in server side, I expect > all of them happen in server side. > > > > At 2020-06-30 00:17:51, "Luke Cwik" <lc...@google.com> wrote: > > The intent is that you grant permissions to the account that is running > the Dataflow job to the resources you want it to access in project B before > you start the pipeline. This allows for much finer grain access control and > the ability to revoke permissions without having to disable an entire > account. > > I would take a look at the general IAM and security documentation within > GCP[1] or open up a support case with GCP requesting guidance. > > 1: https://cloud.google.com/iam > > On Sun, Jun 28, 2020 at 8:56 AM Austin Bennett < > whatwouldausti...@gmail.com> wrote: > >> I havent tried yet, but looks like the connection string asks for the >> project to be specified. Based on that (and cross project working for >> other circumstances), I would imagine it will work, but...? Give it a try! >> >> One tricky place might be ensuring proper permissions, in both projects >> (and without being too open). >> >> On Sat, Jun 27, 2020, 5:46 AM Sheng Yang <liff...@163.com> wrote: >> >>> Hi, >>> >>> I am working on Beam using Dataflow engine. Recently I am working on >>> reading spanner data from different project. Say I run my Beam dataflow job >>> in GCP project A, but the Spanner is in GCP project B. I searched all the >>> documents, but can't find any documentation about SpannerIO reading data >>> with the custom credential key files. Right now I am considering JdbcIO >>> because it accepts custom credential as parameters and spanner also have >>> jdbc api[1]. >>> Do I have something wrong in my description? Or am I considering the >>> correct approach? >>> >>> String url = "jdbc:cloudspanner:/projects/my_project_id/" >>> >>> + "instances/my_instance_id/" >>> + "databases/my_database_name" >>> + "?credentials=/home/cloudspanner-keys/my-key.json" >>> + ";autocommit=false";try (Connection connection = >>> DriverManager.getConnection(url)) { >>> try(ResultSet rs = connection.createStatement() >>> .executeQuery("SELECT SingerId, AlbumId, MarketingBudget FROM >>> Albums")) { >>> while(rs.next()) { >>> Long singerId = rs.getLong(1); >>> } >>> } >>> } >>> >>> >>> [1]: https://github.com/googleapis/java-spanner-jdbc >>> >>> Thanks, >>> Sheng >>> >>> >>> >>> >> > > >