How to submit beam python pipeline to GKE flink cluster

2023-02-03 Thread P Singh
Hi Team,

I have set up a flink cluster on GKE and am trying to submit a beam
pipeline with below options. I was able to run this on a local machine but
I don't understand what would be the environment_config? What should I do?
what to put here instead of localhost:5

Please help.
options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_version=1.14",
"--flink_master=localhost:8081",
"--environment_type=EXTERNAL", #EXTERNAL
"--environment_config=localhost:5",
])


Re: [EXTERNAL] Re: Re: ElasticsearchIO write to trigger a percolator

2023-02-03 Thread Kaggal, Vinod C. (Vinod C.), M.S. via user
I appreciate your help! I do see that 2.41.0 returns a PCollectionTuple, this 
is helpful.

Thank you!
Vinod

From: Evan Galpin 
Date: Thursday, February 2, 2023 at 6:21 PM
To: user@beam.apache.org , Kaggal, Vinod C. (Vinod C), 
M.S. 
Cc: Chamikara Jayalath 
Subject: [EXTERNAL] Re: Re: ElasticsearchIO write to trigger a percolator
I believe that 2.41.0 is the "oldest" safe version[1] to use as there were 
initially some bugs introduced when migrating from PDone to outputting the 
write results.

[1] 
https://github.com/apache/beam/commit/2cb2ee2ba3b5efb0f08880a9325f092485b3ccf2

On Thu, Feb 2, 2023 at 3:16 PM Kaggal, Vinod C. (Vinod C.), M.S. via user 
mailto:user@beam.apache.org>> wrote:
Thank you for the response!

We are currently using :
2.37.0
2.33.0

Is there a compatible elasticsearchio version that may work with our beam 
version?

Thank you!
Vinod

From: Chamikara Jayalath mailto:chamik...@google.com>>
Date: Thursday, February 2, 2023 at 6:01 PM
To: user@beam.apache.org 
mailto:user@beam.apache.org>>, Kaggal, Vinod C. (Vinod 
C), M.S. mailto:kaggal.vi...@mayo.edu>>
Subject: [EXTERNAL] Re: ElasticsearchIO write to trigger a percolator


On Thu, Feb 2, 2023 at 1:56 PM Kaggal, Vinod C. (Vinod C.), M.S. via user 
mailto:user@beam.apache.org>> wrote:
Hello! Thank you for all the hard work on implementing these useful libraries.

Background: We have been using Apache Storm in production for some time (over 8 
years) and have recently switched over to Beam. One of the topologies that we 
had in Storm was to ingest data, index to elastic (write) and right after the 
write to elastic we would trigger a percolator query for a downstream task to 
be triggered. The DAGs using Flux allowed us to chain these steps ( [read from 
source] -> [process] -> [index] -> [percolator] -> [trigger]).

Beam:
We are able to accomplish much of that using beam. However, using 
ElasticsearchIO, the index step results in a PDone and hence we cannot chain 
the remaining tasks ( [percolator] -> [trigger other events]).

The question I have is this: is there a way to trigger a percolator query after 
writing a document to elastic using ElasticsearchIO.Write()?

Which version of Beam are you using ?
Seems like for recent versions of Beam, we actually return PCollections from 
ElasticsearchIO sink so this should not be an issue.

https://github.com/apache/beam/blob/24f40aab7aa4c4f9eea6dc90c0baa22bb17a962e/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L1842

Thanks,
Cham


Your thoughts would be appreciated!
Vinod



Re: How to submit beam python pipeline to GKE flink cluster

2023-02-03 Thread Talat Uyarer via user
Hi,

Do you use Flink operator or manually deployed session cluster ?

Thanks

On Fri, Feb 3, 2023, 4:32 AM P Singh  wrote:

> Hi Team,
>
> I have set up a flink cluster on GKE and am trying to submit a beam
> pipeline with below options. I was able to run this on a local machine but
> I don't understand what would be the environment_config? What should I do?
> what to put here instead of localhost:5
>
> Please help.
> options = PipelineOptions([
> "--runner=FlinkRunner",
> "--flink_version=1.14",
> "--flink_master=localhost:8081",
> "--environment_type=EXTERNAL", #EXTERNAL
> "--environment_config=localhost:5",
> ])
>


[Question] [CassandraIO]Using a query

2023-02-03 Thread Adam Scott
HI All,

does anyone have an example of using CassandraIO to query a table?

The following mentions "Alternatively, one may use
CassandraIO.readAll()
.withCoder(SerializableCoder.of(Person.class)) to query a subset of the
Cassandra database by creating a PCollection of CassandraIO.Read
each with their own query or RingRange."
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/cassandra/CassandraIO.html

Hoping there would be an example of this.

TIA,
Adam


Re: How to submit beam python pipeline to GKE flink cluster

2023-02-03 Thread Robert Bradshaw via user
You should be able to omit the environment_type and environment_config
variables and they will be populated automatically. For running
locally, the flink_master parameter is not needed either (one will be
started up automatically).

On Fri, Feb 3, 2023 at 12:51 PM Talat Uyarer via user
 wrote:
>
> Hi,
>
> Do you use Flink operator or manually deployed session cluster ?
>
> Thanks
>
> On Fri, Feb 3, 2023, 4:32 AM P Singh  wrote:
>>
>> Hi Team,
>>
>> I have set up a flink cluster on GKE and am trying to submit a beam pipeline 
>> with below options. I was able to run this on a local machine but I don't 
>> understand what would be the environment_config? What should I do? what to 
>> put here instead of localhost:5
>>
>> Please help.
>> options = PipelineOptions([
>> "--runner=FlinkRunner",
>> "--flink_version=1.14",
>> "--flink_master=localhost:8081",
>> "--environment_type=EXTERNAL", #EXTERNAL
>> "--environment_config=localhost:5",
>> ])


Re: [Question] [CassandraIO]Using a query

2023-02-03 Thread Vincent Marquez
There are some examples in the test code that should be easy enough to
follow.

Here is an example of just querying the entire table:

https://github.com/apache/beam/blob/master/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java#L460

Here's an example of using readAll to only pull certain keys:

https://github.com/apache/beam/blob/master/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java#L381




*~Vincent*


On Fri, Feb 3, 2023 at 12:59 PM Adam Scott  wrote:

> HI All,
>
> does anyone have an example of using CassandraIO to query a table?
>
> The following mentions "Alternatively, one may use 
> CassandraIO.readAll()
> .withCoder(SerializableCoder.of(Person.class)) to query a subset of the
> Cassandra database by creating a PCollection of CassandraIO.Read
> each with their own query or RingRange."
>
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/cassandra/CassandraIO.html
>
> Hoping there would be an example of this.
>
> TIA,
> Adam
>
>
>
>


Re: Beam SQL Alias issue while using With Clause

2023-02-03 Thread Talat Uyarer via user
Hi Andrew,

Thank you for your MR. I am parricated to help us to solve the issue. I
rerun our tests and they are partially passing now with your fix.  However,
there is one more issue with the WITH clause.

When i run following query somehow beam lost type of column

WITH tempTable AS (SELECT * FROM PCOLLECTION WHERE
PCOLLECTION.`user_info`.`name` = 'User1') SELECT * FROM tempTable

I havent test on Beam Master. I run with your latest patch on our code
base. This is the output

14:00:30.095 [Test worker] INFO
 o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQL:
WITH `tempTable` AS (SELECT `PCOLLECTION`.`id`, `PCOLLECTION`.`value`,
`PCOLLECTION`.`user_info`
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
WHERE `PCOLLECTION`.`user_info`.`name` = 'User1') (SELECT `tempTable`.`id`,
`tempTable`.`value`, `tempTable`.`user_info`
FROM `tempTable` AS `tempTable`)
14:00:30.106 [Test worker] DEBUG
o.a.b.v.calcite.v1_28_0.org.apache.calcite.sql2rel - Plan after converting
SqlNode to RelNode
LogicalProject(id=[$0], value=[$1], user_info=[$2])
  LogicalFilter(condition=[=($2.name, 'User1')])
BeamIOSourceRel(table=[[beam, PCOLLECTION]])

14:00:30.107 [Test worker] DEBUG
o.a.b.v.calcite.v1_28_0.org.apache.calcite.sql2rel - Plan after converting
SqlNode to RelNode
LogicalProject(id=[$0], value=[$1], user_info=[$2])
  LogicalFilter(condition=[=($2.name, 'User1')])
BeamIOSourceRel(table=[[beam, PCOLLECTION]])

14:00:30.109 [Test worker] INFO
 o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQLPlan>
LogicalProject(id=[$0], value=[$1], user_info=[ROW($2)])
  LogicalFilter(condition=[=($2.name, 'User1')])
LogicalProject(id=[$0], value=[$1], name=[$2.name])
  BeamIOSourceRel(table=[[beam, PCOLLECTION]])

14:00:30.173 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER =
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081;
COST = {inf}
14:00:30.173 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule
[BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE)] rels [#27]
14:00:30.173 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#118: Apply rule
[BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE)] to
[rel#27:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, PCOLLECTION])]
14:00:30.174 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Transform to: rel#41
via BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE)
14:00:30.175 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#118 generated 1
successors:
[rel#41:BeamEnumerableConverter.ENUMERABLE(input=BeamIOSourceRel#27)]
14:00:30.175 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER =
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081;
COST = {inf}
14:00:30.175 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule
[ProjectToCalcRule] rels [#33]
14:00:30.175 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#136: Apply rule
[ProjectToCalcRule] to
[rel#33:LogicalProject.NONE(input=RelSubset#32,inputs=0..1,exprs=[$2.name])]
14:00:30.177 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Transform to: rel#44
via ProjectToCalcRule
14:00:30.178 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#136 generated 1
successors:
[rel#44:LogicalCalc.NONE(input=RelSubset#32,expr#0..2={inputs},expr#3=$
t2.name,proj#0..1={exprs},2=$t3)]
14:00:30.178 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER =
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081;
COST = {inf}
14:00:30.178 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule
[FilterToCalcRule] rels [#35]
14:00:30.178 [Test worker] DEBUG
o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#160: Apply rule
[FilterToCalcRule] to
[rel#35:LogicalFilter.NONE(input=RelSubset#34,condition==($2.name,
'User1'))]

fieldList must not be null, type = VARCHAR
java.lang.AssertionError: fieldList must not be null, type = VARCHAR
at
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.type.RelDataTypeImpl.getFieldList(RelDataTypeImpl.java:164)
at
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexFieldAccess.checkValid(RexFieldAccess.java:76)
at
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexFieldAccess.(RexFieldAccess.java:64)
at
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:208)
at
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:911)
at
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuil

Re: How to submit beam python pipeline to GKE flink cluster

2023-02-03 Thread P Singh
Hi,

I have done this manually. though It seems the beam version issues, I am
using 2.43. Is it compatible with flink 1.14 and python 3.10. Most of the
threads indicating beam doesn't really support latest versions.

On Sat, 4 Feb 2023 at 02:51, Robert Bradshaw via user 
wrote:

> You should be able to omit the environment_type and environment_config
> variables and they will be populated automatically. For running
> locally, the flink_master parameter is not needed either (one will be
> started up automatically).
>
> On Fri, Feb 3, 2023 at 12:51 PM Talat Uyarer via user
>  wrote:
> >
> > Hi,
> >
> > Do you use Flink operator or manually deployed session cluster ?
> >
> > Thanks
> >
> > On Fri, Feb 3, 2023, 4:32 AM P Singh 
> wrote:
> >>
> >> Hi Team,
> >>
> >> I have set up a flink cluster on GKE and am trying to submit a beam
> pipeline with below options. I was able to run this on a local machine but
> I don't understand what would be the environment_config? What should I do?
> what to put here instead of localhost:5
> >>
> >> Please help.
> >> options = PipelineOptions([
> >> "--runner=FlinkRunner",
> >> "--flink_version=1.14",
> >> "--flink_master=localhost:8081",
> >> "--environment_type=EXTERNAL", #EXTERNAL
> >> "--environment_config=localhost:5",
> >> ])
>