Aleksey Plekhanov created IGNITE-14588:
------------------------------------------

             Summary: Calcite integration: Wrong processing of nested aggregates
                 Key: IGNITE-14588
                 URL: https://issues.apache.org/jira/browse/IGNITE-14588
             Project: Ignite
          Issue Type: Bug
            Reporter: Aleksey Plekhanov
            Assignee: Aleksey Plekhanov


The wrong plan is created when nested aggregates are used. 
For example, this query: 
{{SELECT avg(salary) FROM (SELECT avg(salary) as salary FROM employer UNION ALL 
SELECT salary FROM employer)}}
Generates such a plan:
{noformat}
IgniteReduceHashAggregate(group=[{}], AVG(SALARY)=[AVG($0)])
  IgniteExchange(distribution=[single])
    IgniteMapHashAggregate(group=[{}], AVG(SALARY)=[AVG($0)])
      IgniteUnionAll(all=[true])
        IgniteSingleHashAggregate(group=[{}], SALARY=[AVG($0)])
          IgniteIndexScan(table=[[PUBLIC, EMPLOYER]], index=[_key_PK], 
requiredColumns=[{3}])
        IgniteIndexScan(table=[[PUBLIC, EMPLOYER]], index=[_key_PK], 
requiredColumns=[{3}])
{noformat}

With this plan, in subquery data is aggregated locally on nodes and can produce 
the wrong results.
For example:
{code:java}
    @Test
    public void aggregateNested() throws Exception {
        String cacheName = "employer";

        IgniteCache<Integer, Employer> employer = client.getOrCreateCache(new 
CacheConfiguration<Integer, Employer>()
            .setName(cacheName)
            .setSqlSchema("PUBLIC")
            .setIndexedTypes(Integer.class, Employer.class)
            .setBackups(2)
        );

        awaitPartitionMapExchange(true, true, null);

        List<Integer> keysNode0 = primaryKeys(grid(0).cache(cacheName), 2);
        List<Integer> keysNode1 = primaryKeys(grid(1).cache(cacheName), 1);

        employer.putAll(ImmutableMap.of(
            keysNode0.get(0), new Employer("Igor", 1d),
            keysNode0.get(1), new Employer("Roman", 2d) ,
            keysNode1.get(0), new Employer("Nikolay", 3d)
        ));

        QueryEngine engine = Commons.lookupComponent(grid(1).context(), 
QueryEngine.class);

        List<FieldsQueryCursor<List<?>>> qry = engine.query(null, "PUBLIC",
            "SELECT avg(salary) FROM " +
                "(SELECT avg(salary) as salary FROM employer UNION ALL SELECT 
salary FROM employer)");

        assertEquals(1, qry.size());

        List<List<?>> rows = qry.get(0).getAll();
        assertEquals(1, rows.size());
        assertEquals(2d, F.first(F.first(rows)));
    }
{code}

With this reproducer we should get 2 as a result (avg(1, 2, 3) = 2, avg(2, 1, 
2, 3) = 2), but actual result is 2.1 (avg(1, 2) = 1.5, avg (3) = 3, avg(1.5, 3, 
1, 2, 3) = 2.1).

Root cause: default {{passThroughDistribution}} is not suitable for "reduce 
aggregate" and "single aggregate" nodes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to