Hi AU,

> The problem with this approach is that I'm looking for a standard FlatMap 
> anonymous function that could return every time: 1. different number of 
> elements within the Array and 2. the data type can be random likewise. I mean 
> is not fixed the whole time then my TypeInformation return would fix every 
> execution.

As far as I know, there is no such standard flatMap function. The table 
definition requires a fixed number of columns, and even if Flink can infer 
column types, it also requires that the column types are fixed. For the case 
you said, the number of columns in the table should be the possible maximum 
number of elements. If the number of elements is not enough, you should pad all 
columns defined by the table and then return.  For case where elements in the 
same column may have different types, you can convert them to a uniform column 
type defined by the table, or customize a type that can handle these different 
types of elements.



Best,
Haibo

At 2019-08-07 23:05:51, "Andres Angel" <ingenieroandresan...@gmail.com> wrote:

Hello Victor ,


You are totally right , so now this turn into is Flink capable to handle these 
cases where would be required define the type info in the row and the Table 
will infer the columns separated by comma or something similar?


thanks
AU


On Wed, Aug 7, 2019 at 10:33 AM Victor Wong <jiasheng.w...@outlook.com> wrote:


Hi Andres,

 

I’d like to share my thoughts:

When you register a “Table”, you need to specify its “schema”, so how can you 
register the table when the number of elements/columns and data types are both 
nondeterministic.

Correct me if I misunderstood your meaning.

 

Best,

Victor

 

From: Andres Angel <ingenieroandresan...@gmail.com>
Date: Wednesday, August 7, 2019 at 9:55 PM
To: Haibo Sun <sunhaib...@163.com>
Cc: user <user@flink.apache.org>
Subject: Re: FlatMap returning Row<> based on ArrayList elements()

 

Hello everyone, let me be more precis on what I'm looking for at the end 
because your example is right and very accurate in the way about how to turn an 
array into a Row() object.

I have done it seamlessly:

 

out.collect(Row.of(pelements.toArray()));

 

Then I printed and the outcome is as expected:

 

5d2df2c2e7370c7843dad9ca,359731,13333196,156789925,619381

 

Now I need to register this DS as a table and here is basically how I'm 
planning to do it:

 

tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');

 

However, this returns an error on the DS registration due to I need to specify 
the RowTypeInfo. Here is the big deal because yes I know I would be able to use 
something like :

 

 

TypeInformation<?>[] types= {

BasicTypeInfo.STRING_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO};

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

DataStream ds = previousds.flatMap(new FlatMapFunction<List<Integer>, Row>() {

@Override

public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {

out.collect(Row.of(value.toArray(new Integer[0])));

}

}).return(types);

 

 

The problem with this approach is that I'm looking for a standard FlatMap 
anonymous function that could return every time: 1. different number of 
elements within the Array and 2. the data type can be random likewise. I mean 
is not fixed the whole time then my TypeInformation return would fix every 
execution.

 

How could I approach this?

 

thanks so much

AU

 

 

On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun <sunhaib...@163.com> wrote:

Hi Andres Angel,

 

I guess people don't understand your problem (including me). I don't know if 
the following sample code is what you want, if not, can you describe the 
problem more clearly?

 

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.fromElements(Arrays.asList(1, 2, 3, 4, 5))

.flatMap(new FlatMapFunction<List<Integer>, Row>() {

@Override

public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {

out.collect(Row.of(value.toArray(new Integer[0])));

}

}).print();

 

env.execute("test job");

 

Best,

Haibo


At 2019-07-30 02:30:27, "Andres Angel" <ingenieroandresan...@gmail.com> wrote:



Hello everyone,

 

I need to parse into an anonymous function an input data to turn it into 
several Row elements. Originally I would have done something like 
Row.of(1,2,3,4) but these elements can change on the flight as part of my 
function. This is why I have decided to store them in a list and right now it 
looks something like this:

 

 

Now, I need to return my out Collector it Row<> based on this elements. I 
checked on the Flink documentation but the Lambda functions are not supported : 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , 
Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as 
Row.of(myTuple):

 

                    Tuple mytuple = Tuple.newInstance(5);
                    for (int i = 0; i < pelements.size(); i++) {
                        mytuple.setField(pelements.get(i), i);
                    }
                    out.collect(Row.of(mytuple));

 

 

However , it doesnt work because this is being parsed s 1 element for  sqlQuery 
step. how could I do something like:

 

pelements.forEach(n->out.collect(Row.of(n)));

 

Thanks so much

Reply via email to