[ 
https://issues.apache.org/jira/browse/FLINK-11706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangpeibin updated FLINK-11706:
-------------------------------
    Description: 
The goal is to implement a KeyedStream API to sum with *the field which is 
array*.

The example code with like:
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Long, Integer[]>> src = env.fromCollection(Arrays.asList(
   new Tuple2<>(1L, new Integer[]{2, 4}),
   new Tuple2<>(1L, new Integer[]{3, 6}),
   new Tuple2<>(1L, new Integer[]{4, 8}),
   new Tuple2<>(2L, new Integer[]{2, 4}),
   new Tuple2<>(2L, new Integer[]{3, 6}),
   new Tuple2<>(2L, new Integer[]{4, 8})
));

src.keyBy(0)
.sum(1}) 
.print();
env.execute();{code}
the expected output should be 
{code:java}
(1,[2, 4]) 
(1,[5, 10])
(1,[9, 18])
(2,[2, 4])
(2,[5, 10])
(2,[9, 18])
{code}
right now this job will throw out exception:
{code:java}
java.lang.RuntimeException: DataStream cannot be summed because the class 
Integer[] does not support the + operator.
{code}

  was:
The goal is to implement a KeyedStream API to sum with *the field which is 
array*.

The example code with like:
{code}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Long, Integer[]>> src = env.fromCollection(Arrays.asList(
   new Tuple2<>(1L, new Integer[]{2, 4}),
   new Tuple2<>(1L, new Integer[]{3, 6}),
   new Tuple2<>(1L, new Integer[]{4, 8}),
   new Tuple2<>(2L, new Integer[]{2, 4}),
   new Tuple2<>(2L, new Integer[]{3, 6}),
   new Tuple2<>(2L, new Integer[]{4, 8})
));

src.keyBy(0)
.sum(1}) 
.print();
env.execute();
{code}
right now this job will throw out exception:
{code:java}
java.lang.RuntimeException: DataStream cannot be summed because the class 
Integer[] does not support the + operator.
{code}


> Add the SumFunction to support KeyedStream.sum with field which is array 
> -------------------------------------------------------------------------
>
>                 Key: FLINK-11706
>                 URL: https://issues.apache.org/jira/browse/FLINK-11706
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: wangpeibin
>            Assignee: wangpeibin
>            Priority: Minor
>              Labels: pull-request-available
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The goal is to implement a KeyedStream API to sum with *the field which is 
> array*.
> The example code with like:
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<Tuple2<Long, Integer[]>> src = env.fromCollection(Arrays.asList(
>    new Tuple2<>(1L, new Integer[]{2, 4}),
>    new Tuple2<>(1L, new Integer[]{3, 6}),
>    new Tuple2<>(1L, new Integer[]{4, 8}),
>    new Tuple2<>(2L, new Integer[]{2, 4}),
>    new Tuple2<>(2L, new Integer[]{3, 6}),
>    new Tuple2<>(2L, new Integer[]{4, 8})
> ));
> src.keyBy(0)
> .sum(1}) 
> .print();
> env.execute();{code}
> the expected output should be 
> {code:java}
> (1,[2, 4]) 
> (1,[5, 10])
> (1,[9, 18])
> (2,[2, 4])
> (2,[5, 10])
> (2,[9, 18])
> {code}
> right now this job will throw out exception:
> {code:java}
> java.lang.RuntimeException: DataStream cannot be summed because the class 
> Integer[] does not support the + operator.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to