It throws by if (!type.isTupleType()) { throw new InvalidProgramException("Specifying keys via field positions is only valid " + "for tuple data types. Type: " + type); }
So, like Saikat saying, you may need a stream with tuples, because public interface ReduceFunction<T> extends Function, Serializable { /** * The core method of ReduceFunction, combining two values into one value of the same type. * The reduce function is consecutively applied to all values of a group until only a single value remains. * * @param value1 The first value to combine. * @param value2 The second value to combine. * @return The combined value of both input values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ T reduce(T value1, T value2) throws Exception; } Thanks Chenguang He Graduate Student Department of Computer Science Iowa State University On April 18, 2016 at 1:30:19 PM, Saikat Maitra (saikat.mai...@gmail.com) wrote: Hello Jitendra, I am new to Flink community but may have seen this issue earlier. Can you try to use DataStream<Tuple1<Integer>> instead of KeyedStream integers4 Regards Saikat On Mon, Apr 18, 2016 at 7:37 PM, Jitendra Agrawal < jitendra.agra...@northgateps.com> wrote: > Hi Team, > > Problem Description : When I was calling *reduce()* method on keyedStream > object then getting Ecxeption as > > "* org.apache.flink.api.common.InvalidProgramException: Specifying keys via > field positions is only valid for tuple data types. Type: Integer*". > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > KeyedStream integers4 = env.fromElements(1,2,3).keyBy(0); > DataStream doubleIntegers4 = integers4.reduce(new > ReduceFunction<Integer>() { > > @Override > public Integer reduce(Integer arg0, Integer arg1) throws Exception { > return arg0 + arg1; > } > }); > > > can you please tell me how to solve this exception? or can you provide any > link or example where I get these complete Solution.. > > your help will be appreciated.. > > Thanks & Regards > > *Jitendra Agrawal * > > Mumbai > > 9167539602 > > -- > This email is sent on behalf of Northgate Public Services (UK) Limited and > its associated companies including Rave Technologies (India) Pvt Limited > (together "Northgate Public Services") and is strictly confidential and > intended solely for the addressee(s). > If you are not the intended recipient of this email you must: (i) not > disclose, copy or distribute its contents to any other person nor use its > contents in any way or you may be acting unlawfully; (ii) contact > Northgate Public Services immediately on +44(0)1908 264500 quoting the name > of the sender and the addressee then delete it from your system. > Northgate Public Services has taken reasonable precautions to ensure that > no viruses are contained in this email, but does not accept any > responsibility once this email has been transmitted. You should scan > attachments (if any) for viruses. > > Northgate Public Services (UK) Limited, registered in England and Wales > under number 00968498 with a registered address of Peoplebuilding 2, > Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 > 4NN. Rave Technologies (India) Pvt Limited, registered in India under > number 117068 with a registered address of 2nd Floor, Ballard House, Adi > Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001. >