Hi Pierre, This example is written based on the syntax of release-1.12 that is about to be released, and the test passed. In release-1.12, input_type can be omitted and expression can be used directly. If you are using release-1.11, you only need to modify the grammar of udf used slightly according to the udf documentation[1].
The flink table connector supports avro format, please refer to the document[2]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format Best, Xingbo Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月3日周四 上午2:57写道: > Hi Xingbo, > > Nice ! This looks a bit hacky, but shows that it can be done ;) > > I just got an exception preventing me running your code, apparently from > udf.py: > > TypeError: Invalid input_type: input_type should be DataType but contains > None > > Can you pls check again ? > If the schema is defined is a .avsc file, do we have to parse it and > rebuild those syntax (ddl and udf) and or is there an existing component > that could be used ? > > Thanks a lot ! > > Best, > > > Le mer. 2 déc. 2020 à 04:50, Xingbo Huang <hxbks...@gmail.com> a écrit : > >> Hi Pierre, >> >> I wrote a PyFlink implementation, you can see if it meets your needs: >> >> >> from pyflink.datastream import StreamExecutionEnvironment >> from pyflink.table import StreamTableEnvironment, EnvironmentSettings, >> DataTypes >> from pyflink.table.udf import udf >> >> >> def test(): >> env = StreamExecutionEnvironment.get_execution_environment() >> env.set_parallelism(1) >> t_env = StreamTableEnvironment.create(env, >> >> environment_settings=EnvironmentSettings.new_instance() >> >> .in_streaming_mode().use_blink_planner().build()) >> >> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", >> '80m') >> >> # 10k nested columns >> num_field = 10_000 >> fields = ['f%s INT' % i for i in range(num_field)] >> field_str = ','.join(fields) >> t_env.execute_sql(f""" >> CREATE TABLE source_table ( >> f0 BIGINT, >> f1 DECIMAL(32,2), >> f2 ROW<${field_str}>, >> f3 TIMESTAMP(3) >> ) WITH ( >> 'connector' = 'datagen', >> 'number-of-rows' = '2' >> ) >> """) >> >> t_env.execute_sql(f""" >> CREATE TABLE print_table ( >> f0 BIGINT, >> f1 DECIMAL(32,2), >> f2 ROW<${field_str}>, >> f3 TIMESTAMP(3) >> ) WITH ( >> 'connector' = 'print' >> ) >> """) >> result_type = DataTypes.ROW( >> [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in >> range(num_field)]) >> >> func = udf(lambda x: x, result_type=result_type) >> >> source = t_env.from_path("source_table") >> result = source.select(source.f0, source.f1, func(source.f2), >> source.f3) >> result.execute_insert("print_table") >> >> >> if __name__ == '__main__': >> test() >> >> >> Best, >> Xingbo >> >> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 下午6:10写道: >> >>> Hi Xingbo, >>> >>> That would mean giving up on using Flink (table) features on the content >>> of the parsed JSON objects, so definitely a big loss. Let me know if I >>> missed something. >>> >>> Thanks ! >>> >>> Le mar. 1 déc. 2020 à 07:26, Xingbo Huang <hxbks...@gmail.com> a écrit : >>> >>>> Hi Pierre, >>>> >>>> Have you ever thought of declaring your entire json as a string field >>>> in `Table` and putting the parsing work in UDF? >>>> >>>> Best, >>>> Xingbo >>>> >>>> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 上午4:13写道: >>>> >>>>> Hi Xingbo, >>>>> >>>>> Many thanks for your follow up. Yes you got it right. >>>>> So using Table API and a ROW object for the nested output of my UDF, >>>>> and since types are mandatory, I guess this boils down to: >>>>> - How to nicely specify the types for the 100k fields : shall I use >>>>> TypeInformation [1] or better retrieve it from Schema Registry [2] ? >>>>> - Do I have to put NULL values for all the fields that don't have a >>>>> value in my JSON ? >>>>> - Will the resulting Table be "sparse" and suffer performance >>>>> limitations ? >>>>> Let me know if Table API and ROW are the right candidates here, or if >>>>> other better alternatives exist. >>>>> As said I'd be glad to apply some downstream transformations using >>>>> key,value access (and possibly some Table <-> Pandas operations). Hope >>>>> that >>>>> doesn't make it a too long wish list ;) >>>>> >>>>> Thanks a lot ! >>>>> >>>>> Best regards, >>>>> >>>>> [1] >>>>> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly >>>>> [2] >>>>> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html >>>>> >>>>> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang <hxbks...@gmail.com> a >>>>> écrit : >>>>> >>>>>> Hi Pierre, >>>>>> >>>>>> Sorry for the late reply. >>>>>> Your requirement is that your `Table` has a `field` in `Json` format >>>>>> and its key has reached 100k, and then you want to use such a `field` as >>>>>> the input/output of `udf`, right? As to whether there is a limit on the >>>>>> number of nested key, I am not quite clear. Other contributors with >>>>>> experience in this area may have answers. On the part of `Python UDF`, if >>>>>> the type of key or value of your `Map` is `Any`, we do not support it >>>>>> now. >>>>>> You need to specify a specific type. For more information, please refer >>>>>> to >>>>>> the related document[1]. >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html >>>>>> >>>>>> Best, >>>>>> Xingbo >>>>>> >>>>>> 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>> 写道: >>>>>> >>>>>> Hello Wei, Dian, Xingbo, >>>>>> >>>>>> Not really sure when it is appropriate to knock on the door of the >>>>>> community ;) >>>>>> I just wanted to mention that your feedback on the above topic will >>>>>> be highly appreciated as it will condition the choice of framework on our >>>>>> side for the months to come, and potentially help the community to cover >>>>>> sparse data with Flink. >>>>>> >>>>>> Thanks a lot ! >>>>>> >>>>>> Have a great week-end >>>>>> >>>>>> Best, >>>>>> >>>>>> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer < >>>>>> pierre.oberhol...@gmail.com> a écrit : >>>>>> >>>>>>> Hi Wei, >>>>>>> >>>>>>> Thanks for the hint. May I please follow up by adding more context >>>>>>> and ask for your guidance. >>>>>>> >>>>>>> In case the bespoken Map[String,Any] object returned by Scala: >>>>>>> >>>>>>> - Has a defined schema (incl. nested) with up to 100k (!) different >>>>>>> possible keys >>>>>>> - Has only some portion of the keys populated for each record >>>>>>> - Is convertible to JSON >>>>>>> - Has to undergo downstream processing in Flink and/or Python UDF >>>>>>> with key value access >>>>>>> - Has to be ultimately stored in a Kafka/AVRO sink >>>>>>> >>>>>>> How would you declare the types explicitly in such a case ? >>>>>>> >>>>>>> Thanks for your support ! >>>>>>> >>>>>>> Pierre >>>>>>> >>>>>>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a >>>>>>> écrit : >>>>>>> >>>>>>>> Hi Pierre, >>>>>>>> >>>>>>>> Currently there is no type hint like ‘Map[String, Any]’. The >>>>>>>> recommended way is declaring your type more explicitly. >>>>>>>> >>>>>>>> If you insist on doing this, you can try to declaring a RAW data >>>>>>>> type for java.util.HashMap [1], but you may encounter some troubles [2] >>>>>>>> related to the kryo serializers. >>>>>>>> >>>>>>>> Best, >>>>>>>> Wei >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw >>>>>>>> [2] >>>>>>>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class >>>>>>>> >>>>>>>> >>>>>>>> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>>>> 写道: >>>>>>>> >>>>>>>> Hi Wei, >>>>>>>> >>>>>>>> It works ! Thanks a lot for your support. >>>>>>>> I hadn't tried this last combination for option 1, and I had wrong >>>>>>>> syntax for option 2. >>>>>>>> >>>>>>>> So to summarize.. >>>>>>>> >>>>>>>> Methods working: >>>>>>>> - Current: DataTypeHint in UDF definition + SQL for UDF registering >>>>>>>> - Outdated: override getResultType in UDF definition >>>>>>>> + t_env.register_java_function for UDF registering >>>>>>>> >>>>>>>> Type conversions working: >>>>>>>> - scala.collection.immutable.Map[String,String] => >>>>>>>> org.apache.flink.types.Row => ROW<STRING,STRING> >>>>>>>> - scala.collection.immutable.Map[String,String] => >>>>>>>> java.util.Map[String,String] => MAP<STRING,STRING> >>>>>>>> >>>>>>>> Any hint for Map[String,Any] ? >>>>>>>> >>>>>>>> Best regards, >>>>>>>> >>>>>>>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> a >>>>>>>> écrit : >>>>>>>> >>>>>>>>> Hi Pierre, >>>>>>>>> >>>>>>>>> Those 2 approaches all work in my local machine, this is my code: >>>>>>>>> >>>>>>>>> Scala UDF: >>>>>>>>> >>>>>>>>> package com.dummy >>>>>>>>> >>>>>>>>> import org.apache.flink.api.common.typeinfo.TypeInformation >>>>>>>>> import org.apache.flink.table.annotation.DataTypeHint >>>>>>>>> import org.apache.flink.table.api.Types >>>>>>>>> import org.apache.flink.table.functions.ScalarFunction >>>>>>>>> import org.apache.flink.types.Row >>>>>>>>> >>>>>>>>> /** >>>>>>>>> * The scala UDF. >>>>>>>>> */ >>>>>>>>> class dummyMap extends ScalarFunction { >>>>>>>>> >>>>>>>>> // If the udf would be registered by the SQL statement, you need >>>>>>>>> add this typehint >>>>>>>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>>>>>>> def eval(): Row = { >>>>>>>>> >>>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> // If the udf would be registered by the method >>>>>>>>> 'register_java_function', you need override this >>>>>>>>> // method. >>>>>>>>> override def getResultType(signature: Array[Class[_]]): >>>>>>>>> TypeInformation[_] = { >>>>>>>>> // The type of the return values should be TypeInformation >>>>>>>>> Types.ROW(Array("s", "t"), >>>>>>>>> Array[TypeInformation[_]](Types.STRING(), Types.STRING())) >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> Python code: >>>>>>>>> >>>>>>>>> from pyflink.datastream import StreamExecutionEnvironment >>>>>>>>> from pyflink.table import StreamTableEnvironment >>>>>>>>> >>>>>>>>> s_env = StreamExecutionEnvironment.get_execution_environment() >>>>>>>>> st_env = StreamTableEnvironment.create(s_env) >>>>>>>>> >>>>>>>>> # load the scala udf jar file, the path should be modified to yours >>>>>>>>> # or your can also load the jar file via other approaches >>>>>>>>> st_env.get_config().get_configuration().set_string("pipeline.jars", >>>>>>>>> "file:///Users/zhongwei/the-dummy-udf.jar") >>>>>>>>> >>>>>>>>> # register the udf via >>>>>>>>> st_env.execute_sql("CREATE FUNCTION dummyMap AS >>>>>>>>> 'com.dummy.dummyMap' LANGUAGE SCALA") >>>>>>>>> # or register via the method >>>>>>>>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap") >>>>>>>>> >>>>>>>>> # prepare source and sink >>>>>>>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], >>>>>>>>> ['a', 'b', 'c']) >>>>>>>>> st_env.execute_sql("""create table mySink ( >>>>>>>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>>>>>>> ) with ( >>>>>>>>> 'connector' = 'print' >>>>>>>>> )""") >>>>>>>>> >>>>>>>>> # execute query >>>>>>>>> >>>>>>>>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result() >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Wei >>>>>>>>> >>>>>>>>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>>>>> 写道: >>>>>>>>> >>>>>>>>> Hi Wei, >>>>>>>>> >>>>>>>>> True, I'm using the method you mention, but glad to change. >>>>>>>>> I tried your suggestion instead, but got a similar error. >>>>>>>>> >>>>>>>>> Thanks for your support. That is much more tedious than I thought. >>>>>>>>> >>>>>>>>> *Option 1 - SQL UDF* >>>>>>>>> >>>>>>>>> *SQL UDF* >>>>>>>>> create_func_ddl = """ >>>>>>>>> CREATE FUNCTION dummyMap >>>>>>>>> AS 'com.dummy.dummyMap' LANGUAGE SCALA >>>>>>>>> """ >>>>>>>>> >>>>>>>>> t_env.execute_sql(create_func_ddl) >>>>>>>>> >>>>>>>>> *Error* >>>>>>>>> Py4JJavaError: An error occurred while calling o672.execute. >>>>>>>>> : org.apache.flink.table.api.TableException: Result field does not >>>>>>>>> match requested type. Requested: Row(s: String, t: String); Actual: >>>>>>>>> GenericType<org.apache.flink.types.Row> >>>>>>>>> >>>>>>>>> *Option 2 *- *Overriding getResultType* >>>>>>>>> >>>>>>>>> Back to the old registering method, but overriding getResultType: >>>>>>>>> >>>>>>>>> t_env.register_java_function("dummyMap","com.dummy.dummyMap") >>>>>>>>> >>>>>>>>> *Scala UDF* >>>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>>> >>>>>>>>> def eval(): Row = { >>>>>>>>> >>>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> override def getResultType(signature: Array[Class[_]]): >>>>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>>>>>>>> } >>>>>>>>> >>>>>>>>> *Error (on compilation)* >>>>>>>>> >>>>>>>>> [error] dummyMap.scala:66:90: overloaded method value ROW with >>>>>>>>> alternatives: >>>>>>>>> [error] (x$1: >>>>>>>>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType >>>>>>>>> <and> >>>>>>>>> [error] ()org.apache.flink.table.types.DataType <and> >>>>>>>>> [error] (x$1: >>>>>>>>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType >>>>>>>>> [error] cannot be applied to >>>>>>>>> (org.apache.flink.table.types.DataType, >>>>>>>>> org.apache.flink.table.types.DataType) >>>>>>>>> [error] override def getResultType(signature: Array[Class[_]]): >>>>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>>>>>>>> [error] >>>>>>>>> ^ >>>>>>>>> [error] one error found >>>>>>>>> [error] (Compile / compileIncremental) Compilation failed >>>>>>>>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01 >>>>>>>>> >>>>>>>>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> >>>>>>>>> a écrit : >>>>>>>>> >>>>>>>>>> Hi Pierre, >>>>>>>>>> >>>>>>>>>> I guess your UDF is registered by the method >>>>>>>>>> 'register_java_function' which uses the old type system. In this >>>>>>>>>> situation >>>>>>>>>> you need to override the 'getResultType' method instead of adding >>>>>>>>>> type >>>>>>>>>> hint. >>>>>>>>>> >>>>>>>>>> You can also try to register your UDF via the "CREATE FUNCTION" >>>>>>>>>> sql statement, which accepts the type hint. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Wei >>>>>>>>>> >>>>>>>>>> 在 2020年11月17日,19:29,Pierre Oberholzer < >>>>>>>>>> pierre.oberhol...@gmail.com> 写道: >>>>>>>>>> >>>>>>>>>> Hi Wei, >>>>>>>>>> >>>>>>>>>> Thanks for your suggestion. Same error. >>>>>>>>>> >>>>>>>>>> *Scala UDF* >>>>>>>>>> >>>>>>>>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>")) >>>>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>>>> def eval(): Row = { >>>>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> Best regards, >>>>>>>>>> >>>>>>>>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com> >>>>>>>>>> a écrit : >>>>>>>>>> >>>>>>>>>>> Hi Pierre, >>>>>>>>>>> >>>>>>>>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t >>>>>>>>>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s >>>>>>>>>>> STRING,t >>>>>>>>>>> STRING>”))' >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Wei >>>>>>>>>>> >>>>>>>>>>> 在 2020年11月17日,15:45,Pierre Oberholzer < >>>>>>>>>>> pierre.oberhol...@gmail.com> 写道: >>>>>>>>>>> >>>>>>>>>>> Hi Dian, Community, >>>>>>>>>>> >>>>>>>>>>> (bringing the thread back to wider audience) >>>>>>>>>>> >>>>>>>>>>> As you suggested, I've tried to use DataTypeHint with Row instead >>>>>>>>>>> of Map but also this simple case leads to a type mismatch >>>>>>>>>>> between UDF and Table API. >>>>>>>>>>> I've also tried other Map objects from Flink >>>>>>>>>>> (table.data.MapData, flink.types.MapValue, >>>>>>>>>>> flink.table.api.DataTypes.MAP) >>>>>>>>>>> in addition to Java (java.util.Map) in combination with >>>>>>>>>>> DataTypeHint, without success. >>>>>>>>>>> N.B. I'm using version 1.11. >>>>>>>>>>> >>>>>>>>>>> Am I doing something wrong or am I facing limitations in the >>>>>>>>>>> toolkit ? >>>>>>>>>>> >>>>>>>>>>> Thanks in advance for your support ! >>>>>>>>>>> >>>>>>>>>>> Best regards, >>>>>>>>>>> >>>>>>>>>>> *Scala UDF* >>>>>>>>>>> >>>>>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>>>>> >>>>>>>>>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>>>>>>>>> def eval(): Row = { >>>>>>>>>>> >>>>>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>>>>> >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> *Table DDL* >>>>>>>>>>> >>>>>>>>>>> my_sink_ddl = f""" >>>>>>>>>>> create table mySink ( >>>>>>>>>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>>>>>>>>> ) with ( >>>>>>>>>>> ... >>>>>>>>>>> ) >>>>>>>>>>> """ >>>>>>>>>>> >>>>>>>>>>> *Error* >>>>>>>>>>> >>>>>>>>>>> Py4JJavaError: An error occurred while calling o2.execute. >>>>>>>>>>> : org.apache.flink.table.api.ValidationException: Field types of >>>>>>>>>>> query result and registered TableSink >>>>>>>>>>> `default_catalog`.`default_database`.`mySink` do not match. >>>>>>>>>>> Query result schema: [output_of_my_scala_udf: >>>>>>>>>>> GenericType<org.apache.flink.types.Row>] >>>>>>>>>>> TableSink schema: [output_of_my_scala_udf: Row(s: String, t: >>>>>>>>>>> String)] >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer < >>>>>>>>>>> pierre.oberhol...@gmail.com> a écrit : >>>>>>>>>>> >>>>>>>>>>>> Thanks Dian, but same error when using explicit returned type: >>>>>>>>>>>> >>>>>>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>>>>>> >>>>>>>>>>>> def eval() : util.Map[java.lang.String,java.lang.String] = { >>>>>>>>>>>> >>>>>>>>>>>> val states = Map("key1" -> "val1", "key2" -> "val2") >>>>>>>>>>>> >>>>>>>>>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]] >>>>>>>>>>>> >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> >>>>>>>>>>>> a écrit : >>>>>>>>>>>> >>>>>>>>>>>>> You need to explicitly defined the result type the UDF. You >>>>>>>>>>>>> could refer to [1] for more details if you are using Flink 1.11. >>>>>>>>>>>>> If you are >>>>>>>>>>>>> using other versions of Flink, you need to refer to the >>>>>>>>>>>>> corresponding >>>>>>>>>>>>> documentation. >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide >>>>>>>>>>>>> >>>>>>>>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer < >>>>>>>>>>>>> pierre.oberhol...@gmail.com> 写道: >>>>>>>>>>>>> >>>>>>>>>>>>> ScalarFunction >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Pierre >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Pierre >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Pierre >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Pierre >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Pierre >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Pierre >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Pierre >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Pierre >>>>> >>>> -- >>> Pierre >>> >> > > -- > Pierre >