Sorry I couldn't be more helpful at this moment. Created a JIRA for this
issue: https://jira.apache.org/jira/browse/BEAM-7489

-Rui

On Mon, Jun 3, 2019 at 8:14 PM dekl...@gmail.com <dekl...@gmail.com> wrote:

> Yes, it works but I think it has the same problem. It's a lot slower so it
> took me hours of running it, but by the end the memory usage was high and
> the CPU about 100% so it seems to be the same problem.
>
> Worth noting perhaps that when I use the DirectRunner I have to turn
> enforceImmutability off because of
> https://issues.apache.org/jira/browse/BEAM-1714
>
> On 2019/06/03 17:48:30, Rui Wang <ruw...@google.com> wrote:
> > Ha sorry I was only reading screenshots but ignored your other comments.
> So
> > count fn indeed worked.
> >
> > Can I ask if your sql pipeline works on direct runner?
> >
> >
> > -Rui
> >
> > On Mon, Jun 3, 2019 at 10:39 AM Rui Wang <ruw...@google.com> wrote:
> >
> > > BeamSQL actually only converts SELECT COUNT(*) query to the Java
> pipeline
> > > that calls Java's builtin Count[1] transform.
> > >
> > > Could you implement your pipeline by Count transform to see whether
> this
> > > memory issue still exists? By doing so we could narrow down problem a
> bit.
> > > If using Java directly without going through SQL code path and it
> works, we
> > > will know that BeamSQL does not generate a working pipeline for yoru
> SELECT
> > > COUNT(*) query.
> > >
> > >
> > > [1]:
> > >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L54
> > >
> > > On Sat, Jun 1, 2019 at 9:30 AM dekl...@gmail.com <dekl...@gmail.com>
> > > wrote:
> > >
> > >> Is using Beam SQL just massively more resource-intensive or is there a
> > >> bug somewhere here (in my code or elsewhere)? (I'm using Flink runner)
> > >>
> > >> Here is my code:
> > >> https://pastebin.com/nNtc9ZaG
> > >>
> > >> Here is the error I get (truncated at the end because it's so long and
> > >> seemingly repetitive) when I run the SQL transform and my memory/CPU
> usage
> > >> skyrockets:
> > >> https://pastebin.com/mywmkCQi
> > >>
> > >> For example,
> > >>
> > >> After several early firing triggers, 13-15% CPU, 1.5GB-2GB RAM,
> > >> everything working fine:
> > >>
> > >>
> `rowStream.apply(Combine.globally(Count.<Row>combineFn()).withoutDefaults()).apply(myPrint());`
> > >>
> > >> After a single early firing trigger, CPU usage shoots to 90%+, 4.7GB+
> > >> RAM, soon crashes:
> > >> `rowStream.apply(SqlTransform.query("SELECT COUNT(*) FROM
> > >> PCOLLECTION")).apply(myPrint());`
> > >>
> > >> I can't imagine this is expected behavior but maybe I'm just ignorant
> of
> > >> how SQL is implemented.
> > >>
> > >> Most of this code is just getting Schema Registry Avro Kafka messages
> > >> into a Row stream. There have been discussions on the mailing list
> recently
> > >> about how to do that. This is the best I could do. If that part is
> > >> incorrect I'd be glad to know.
> > >>
> > >> Any help appreciated. Thank you.
> > >>
> > >>
> >
>

Reply via email to