Sorry I couldn't be more helpful at this moment. Created a JIRA for this issue: https://jira.apache.org/jira/browse/BEAM-7489
-Rui On Mon, Jun 3, 2019 at 8:14 PM dekl...@gmail.com <dekl...@gmail.com> wrote: > Yes, it works but I think it has the same problem. It's a lot slower so it > took me hours of running it, but by the end the memory usage was high and > the CPU about 100% so it seems to be the same problem. > > Worth noting perhaps that when I use the DirectRunner I have to turn > enforceImmutability off because of > https://issues.apache.org/jira/browse/BEAM-1714 > > On 2019/06/03 17:48:30, Rui Wang <ruw...@google.com> wrote: > > Ha sorry I was only reading screenshots but ignored your other comments. > So > > count fn indeed worked. > > > > Can I ask if your sql pipeline works on direct runner? > > > > > > -Rui > > > > On Mon, Jun 3, 2019 at 10:39 AM Rui Wang <ruw...@google.com> wrote: > > > > > BeamSQL actually only converts SELECT COUNT(*) query to the Java > pipeline > > > that calls Java's builtin Count[1] transform. > > > > > > Could you implement your pipeline by Count transform to see whether > this > > > memory issue still exists? By doing so we could narrow down problem a > bit. > > > If using Java directly without going through SQL code path and it > works, we > > > will know that BeamSQL does not generate a working pipeline for yoru > SELECT > > > COUNT(*) query. > > > > > > > > > [1]: > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L54 > > > > > > On Sat, Jun 1, 2019 at 9:30 AM dekl...@gmail.com <dekl...@gmail.com> > > > wrote: > > > > > >> Is using Beam SQL just massively more resource-intensive or is there a > > >> bug somewhere here (in my code or elsewhere)? (I'm using Flink runner) > > >> > > >> Here is my code: > > >> https://pastebin.com/nNtc9ZaG > > >> > > >> Here is the error I get (truncated at the end because it's so long and > > >> seemingly repetitive) when I run the SQL transform and my memory/CPU > usage > > >> skyrockets: > > >> https://pastebin.com/mywmkCQi > > >> > > >> For example, > > >> > > >> After several early firing triggers, 13-15% CPU, 1.5GB-2GB RAM, > > >> everything working fine: > > >> > > >> > `rowStream.apply(Combine.globally(Count.<Row>combineFn()).withoutDefaults()).apply(myPrint());` > > >> > > >> After a single early firing trigger, CPU usage shoots to 90%+, 4.7GB+ > > >> RAM, soon crashes: > > >> `rowStream.apply(SqlTransform.query("SELECT COUNT(*) FROM > > >> PCOLLECTION")).apply(myPrint());` > > >> > > >> I can't imagine this is expected behavior but maybe I'm just ignorant > of > > >> how SQL is implemented. > > >> > > >> Most of this code is just getting Schema Registry Avro Kafka messages > > >> into a Row stream. There have been discussions on the mailing list > recently > > >> about how to do that. This is the best I could do. If that part is > > >> incorrect I'd be glad to know. > > >> > > >> Any help appreciated. Thank you. > > >> > > >> > > >