Hi Xingbo,

Nice ! This looks a bit hacky, but shows that it can be done ;)

I just got an exception preventing me running your code, apparently from
udf.py:

TypeError: Invalid input_type: input_type should be DataType but contains
None

Can you pls check again ?
If the schema is defined is a .avsc file, do we have to parse it and
rebuild those syntax (ddl and udf) and or is there an existing component
that could be used ?

Thanks a lot !

Best,


Le mer. 2 déc. 2020 à 04:50, Xingbo Huang <hxbks...@gmail.com> a écrit :

> Hi Pierre,
>
> I wrote a PyFlink implementation, you can see if it meets your needs:
>
>
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
> DataTypes
> from pyflink.table.udf import udf
>
>
> def test():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.set_parallelism(1)
>     t_env = StreamTableEnvironment.create(env,
>
> environment_settings=EnvironmentSettings.new_instance()
>
> .in_streaming_mode().use_blink_planner().build())
>
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>                                                           '80m')
>
>     # 10k nested columns
>     num_field = 10_000
>     fields = ['f%s INT' % i for i in range(num_field)]
>     field_str = ','.join(fields)
>     t_env.execute_sql(f"""
>             CREATE TABLE source_table (
>             f0 BIGINT,
>             f1 DECIMAL(32,2),
>             f2 ROW<${field_str}>,
>             f3 TIMESTAMP(3)
>         ) WITH (
>           'connector' = 'datagen',
>           'number-of-rows' = '2'
>         )
>     """)
>
>     t_env.execute_sql(f"""
>         CREATE TABLE print_table (
>          f0 BIGINT,
>          f1 DECIMAL(32,2),
>          f2 ROW<${field_str}>,
>          f3 TIMESTAMP(3)
>         ) WITH (
>          'connector' = 'print'
>         )
>     """)
>     result_type = DataTypes.ROW(
>         [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
> range(num_field)])
>
>     func = udf(lambda x: x, result_type=result_type)
>
>     source = t_env.from_path("source_table")
>     result = source.select(source.f0, source.f1, func(source.f2),
> source.f3)
>     result.execute_insert("print_table")
>
>
> if __name__ == '__main__':
>     test()
>
>
>  Best,
>  Xingbo
>
> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 下午6:10写道:
>
>> Hi Xingbo,
>>
>> That would mean giving up on using Flink (table) features on the content
>> of the parsed JSON objects, so definitely a big loss. Let me know if I
>> missed something.
>>
>> Thanks !
>>
>> Le mar. 1 déc. 2020 à 07:26, Xingbo Huang <hxbks...@gmail.com> a écrit :
>>
>>> Hi Pierre,
>>>
>>> Have you ever thought of declaring your entire json as a string field in
>>> `Table` and putting the parsing work in UDF?
>>>
>>> Best,
>>> Xingbo
>>>
>>> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 上午4:13写道:
>>>
>>>> Hi Xingbo,
>>>>
>>>> Many thanks for your follow up. Yes you got it right.
>>>> So using Table API and a ROW object for the nested output of my UDF,
>>>> and since types are mandatory, I guess this boils down to:
>>>> - How to nicely specify the types for the 100k fields : shall I use
>>>> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
>>>> - Do I have to put NULL values for all the fields that don't have a
>>>> value in my JSON ?
>>>> - Will the resulting Table be "sparse" and suffer performance
>>>> limitations ?
>>>> Let me know if Table API and ROW are the right candidates here, or if
>>>> other better alternatives exist.
>>>> As said I'd be glad to apply some downstream transformations using
>>>> key,value access (and possibly some Table <-> Pandas operations). Hope that
>>>> doesn't make it a too long wish list ;)
>>>>
>>>> Thanks a lot !
>>>>
>>>> Best regards,
>>>>
>>>> [1]
>>>> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
>>>> [2]
>>>> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html
>>>>
>>>> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang <hxbks...@gmail.com> a
>>>> écrit :
>>>>
>>>>> Hi Pierre,
>>>>>
>>>>> Sorry for the late reply.
>>>>> Your requirement is that your `Table` has a `field` in `Json` format
>>>>> and its key has reached 100k, and then you want to use such a `field` as
>>>>> the input/output of `udf`, right? As to whether there is a limit on the
>>>>> number of nested key, I am not quite clear. Other contributors with
>>>>> experience in this area may have answers. On the part of `Python UDF`, if
>>>>> the type of key or value of your `Map` is `Any`, we do not support it now.
>>>>> You need to specify a specific type. For more information, please refer to
>>>>> the related document[1].
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html
>>>>>
>>>>> Best,
>>>>> Xingbo
>>>>>
>>>>> 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>> 写道:
>>>>>
>>>>> Hello Wei, Dian, Xingbo,
>>>>>
>>>>> Not really sure when it is appropriate to knock on the door of the
>>>>> community ;)
>>>>> I just wanted to mention that your feedback on the above topic will be
>>>>> highly appreciated as it will condition the choice of framework on our 
>>>>> side
>>>>> for the months to come, and potentially help the community to cover sparse
>>>>> data with Flink.
>>>>>
>>>>> Thanks a lot !
>>>>>
>>>>> Have a great week-end
>>>>>
>>>>> Best,
>>>>>
>>>>> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer <
>>>>> pierre.oberhol...@gmail.com> a écrit :
>>>>>
>>>>>> Hi Wei,
>>>>>>
>>>>>> Thanks for the hint. May I please follow up by adding more context
>>>>>> and ask for your guidance.
>>>>>>
>>>>>> In case the bespoken Map[String,Any] object returned by Scala:
>>>>>>
>>>>>> - Has a defined schema (incl. nested) with up to 100k (!) different
>>>>>> possible keys
>>>>>> - Has only some portion of the keys populated for each record
>>>>>> - Is convertible to JSON
>>>>>> - Has to undergo downstream processing in Flink and/or Python UDF
>>>>>> with key value access
>>>>>> - Has to be ultimately stored in a Kafka/AVRO sink
>>>>>>
>>>>>> How would you declare the types explicitly in such a case ?
>>>>>>
>>>>>> Thanks for your support !
>>>>>>
>>>>>> Pierre
>>>>>>
>>>>>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a
>>>>>> écrit :
>>>>>>
>>>>>>> Hi Pierre,
>>>>>>>
>>>>>>> Currently there is no type hint like ‘Map[String, Any]’. The
>>>>>>> recommended way is declaring your type more explicitly.
>>>>>>>
>>>>>>> If you insist on doing this, you can try to declaring a RAW data
>>>>>>> type for java.util.HashMap [1], but you may encounter some troubles [2]
>>>>>>> related to the kryo serializers.
>>>>>>>
>>>>>>> Best,
>>>>>>> Wei
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
>>>>>>> [2]
>>>>>>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>>>>>>>
>>>>>>>
>>>>>>> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>>> 写道:
>>>>>>>
>>>>>>> Hi Wei,
>>>>>>>
>>>>>>> It works ! Thanks a lot for your support.
>>>>>>> I hadn't tried this last combination for option 1, and I had wrong
>>>>>>> syntax for option 2.
>>>>>>>
>>>>>>> So to summarize..
>>>>>>>
>>>>>>> Methods working:
>>>>>>> - Current: DataTypeHint in UDF definition + SQL for UDF registering
>>>>>>> - Outdated: override getResultType in UDF definition
>>>>>>> + t_env.register_java_function for UDF registering
>>>>>>>
>>>>>>> Type conversions working:
>>>>>>> - scala.collection.immutable.Map[String,String] =>
>>>>>>> org.apache.flink.types.Row => ROW<STRING,STRING>
>>>>>>> - scala.collection.immutable.Map[String,String] =>
>>>>>>> java.util.Map[String,String] => MAP<STRING,STRING>
>>>>>>>
>>>>>>> Any hint for Map[String,Any] ?
>>>>>>>
>>>>>>> Best regards,
>>>>>>>
>>>>>>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> a
>>>>>>> écrit :
>>>>>>>
>>>>>>>> Hi Pierre,
>>>>>>>>
>>>>>>>> Those 2 approaches all work in my local machine, this is my code:
>>>>>>>>
>>>>>>>> Scala UDF:
>>>>>>>>
>>>>>>>> package com.dummy
>>>>>>>>
>>>>>>>> import org.apache.flink.api.common.typeinfo.TypeInformation
>>>>>>>> import org.apache.flink.table.annotation.DataTypeHint
>>>>>>>> import org.apache.flink.table.api.Types
>>>>>>>> import org.apache.flink.table.functions.ScalarFunction
>>>>>>>> import org.apache.flink.types.Row
>>>>>>>>
>>>>>>>> /**
>>>>>>>>   * The scala UDF.
>>>>>>>>   */
>>>>>>>> class dummyMap extends ScalarFunction {
>>>>>>>>
>>>>>>>>   // If the udf would be registered by the SQL statement, you need add 
>>>>>>>> this typehint
>>>>>>>>   @DataTypeHint("ROW<s STRING,t STRING>")
>>>>>>>>   def eval(): Row = {
>>>>>>>>
>>>>>>>>     Row.of(java.lang.String.valueOf("foo"), 
>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   // If the udf would be registered by the method 
>>>>>>>> 'register_java_function', you need override this
>>>>>>>>   // method.
>>>>>>>>   override def getResultType(signature: Array[Class[_]]): 
>>>>>>>> TypeInformation[_] = {
>>>>>>>>     // The type of the return values should be TypeInformation
>>>>>>>>     Types.ROW(Array("s", "t"), 
>>>>>>>> Array[TypeInformation[_]](Types.STRING(), Types.STRING()))
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> Python code:
>>>>>>>>
>>>>>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>>>>>> from pyflink.table import StreamTableEnvironment
>>>>>>>>
>>>>>>>> s_env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>> st_env = StreamTableEnvironment.create(s_env)
>>>>>>>>
>>>>>>>> # load the scala udf jar file, the path should be modified to yours
>>>>>>>> # or your can also load the jar file via other approaches
>>>>>>>> st_env.get_config().get_configuration().set_string("pipeline.jars",
>>>>>>>> "file:///Users/zhongwei/the-dummy-udf.jar")
>>>>>>>>
>>>>>>>> # register the udf via
>>>>>>>> st_env.execute_sql("CREATE FUNCTION dummyMap AS
>>>>>>>> 'com.dummy.dummyMap' LANGUAGE SCALA")
>>>>>>>> # or register via the method
>>>>>>>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
>>>>>>>>
>>>>>>>> # prepare source and sink
>>>>>>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')],
>>>>>>>> ['a', 'b', 'c'])
>>>>>>>> st_env.execute_sql("""create table mySink (
>>>>>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>>>>>     ) with (
>>>>>>>>         'connector' = 'print'
>>>>>>>>     )""")
>>>>>>>>
>>>>>>>> # execute query
>>>>>>>>
>>>>>>>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Wei
>>>>>>>>
>>>>>>>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>>>> 写道:
>>>>>>>>
>>>>>>>> Hi Wei,
>>>>>>>>
>>>>>>>> True, I'm using the method you mention, but glad to change.
>>>>>>>> I tried your suggestion instead, but got a similar error.
>>>>>>>>
>>>>>>>> Thanks for your support. That is much more tedious than I thought.
>>>>>>>>
>>>>>>>> *Option 1 - SQL UDF*
>>>>>>>>
>>>>>>>> *SQL UDF*
>>>>>>>> create_func_ddl = """
>>>>>>>> CREATE FUNCTION dummyMap
>>>>>>>>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
>>>>>>>> """
>>>>>>>>
>>>>>>>> t_env.execute_sql(create_func_ddl)
>>>>>>>>
>>>>>>>> *Error*
>>>>>>>> Py4JJavaError: An error occurred while calling o672.execute.
>>>>>>>> : org.apache.flink.table.api.TableException: Result field does not
>>>>>>>> match requested type. Requested: Row(s: String, t: String); Actual:
>>>>>>>> GenericType<org.apache.flink.types.Row>
>>>>>>>>
>>>>>>>> *Option 2 *- *Overriding getResultType*
>>>>>>>>
>>>>>>>> Back to the old registering method, but overriding getResultType:
>>>>>>>>
>>>>>>>> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
>>>>>>>>
>>>>>>>> *Scala UDF*
>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>
>>>>>>>>   def eval(): Row = {
>>>>>>>>
>>>>>>>>       Row.of(java.lang.String.valueOf("foo"),
>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   override def getResultType(signature: Array[Class[_]]):
>>>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>>>>>>>> }
>>>>>>>>
>>>>>>>> *Error (on compilation)*
>>>>>>>>
>>>>>>>> [error] dummyMap.scala:66:90: overloaded method value ROW with
>>>>>>>> alternatives:
>>>>>>>> [error]   (x$1:
>>>>>>>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
>>>>>>>> <and>
>>>>>>>> [error]   ()org.apache.flink.table.types.DataType <and>
>>>>>>>> [error]   (x$1:
>>>>>>>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
>>>>>>>> [error]  cannot be applied to
>>>>>>>> (org.apache.flink.table.types.DataType,
>>>>>>>> org.apache.flink.table.types.DataType)
>>>>>>>> [error]   override def getResultType(signature: Array[Class[_]]):
>>>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>>>>>>>> [error]
>>>>>>>>                               ^
>>>>>>>> [error] one error found
>>>>>>>> [error] (Compile / compileIncremental) Compilation failed
>>>>>>>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
>>>>>>>>
>>>>>>>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> a
>>>>>>>> écrit :
>>>>>>>>
>>>>>>>>> Hi Pierre,
>>>>>>>>>
>>>>>>>>> I guess your UDF is registered by the method
>>>>>>>>> 'register_java_function' which uses the old type system. In this 
>>>>>>>>> situation
>>>>>>>>> you need to override the 'getResultType' method instead of adding type
>>>>>>>>> hint.
>>>>>>>>>
>>>>>>>>> You can also try to register your UDF via the "CREATE FUNCTION"
>>>>>>>>> sql statement, which accepts the type hint.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Wei
>>>>>>>>>
>>>>>>>>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>>>>> 写道:
>>>>>>>>>
>>>>>>>>> Hi Wei,
>>>>>>>>>
>>>>>>>>> Thanks for your suggestion. Same error.
>>>>>>>>>
>>>>>>>>> *Scala UDF*
>>>>>>>>>
>>>>>>>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
>>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>>   def eval(): Row = {
>>>>>>>>>     Row.of(java.lang.String.valueOf("foo"),
>>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>>   }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Best regards,
>>>>>>>>>
>>>>>>>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com>
>>>>>>>>> a écrit :
>>>>>>>>>
>>>>>>>>>> Hi Pierre,
>>>>>>>>>>
>>>>>>>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t
>>>>>>>>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s 
>>>>>>>>>> STRING,t
>>>>>>>>>> STRING>”))'
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Wei
>>>>>>>>>>
>>>>>>>>>> 在 2020年11月17日,15:45,Pierre Oberholzer <
>>>>>>>>>> pierre.oberhol...@gmail.com> 写道:
>>>>>>>>>>
>>>>>>>>>> Hi Dian, Community,
>>>>>>>>>>
>>>>>>>>>> (bringing the thread back to wider audience)
>>>>>>>>>>
>>>>>>>>>> As you suggested, I've tried to use DataTypeHint with Row instead
>>>>>>>>>> of Map but also this simple case leads to a type mismatch
>>>>>>>>>> between UDF and Table API.
>>>>>>>>>> I've also tried other Map objects from Flink (table.data.MapData,
>>>>>>>>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to 
>>>>>>>>>> Java
>>>>>>>>>> (java.util.Map) in combination with DataTypeHint,
>>>>>>>>>> without success.
>>>>>>>>>> N.B. I'm using version 1.11.
>>>>>>>>>>
>>>>>>>>>> Am I doing something wrong or am I facing limitations in the
>>>>>>>>>> toolkit ?
>>>>>>>>>>
>>>>>>>>>> Thanks in advance for your support !
>>>>>>>>>>
>>>>>>>>>> Best regards,
>>>>>>>>>>
>>>>>>>>>> *Scala UDF*
>>>>>>>>>>
>>>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>>>
>>>>>>>>>>  @DataTypeHint("ROW<s STRING,t STRING>")
>>>>>>>>>>  def eval(): Row = {
>>>>>>>>>>
>>>>>>>>>>     Row.of(java.lang.String.valueOf("foo"),
>>>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>>>
>>>>>>>>>>   }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> *Table DDL*
>>>>>>>>>>
>>>>>>>>>> my_sink_ddl = f"""
>>>>>>>>>>     create table mySink (
>>>>>>>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>>>>>>>     ) with (
>>>>>>>>>>         ...
>>>>>>>>>>     )
>>>>>>>>>> """
>>>>>>>>>>
>>>>>>>>>> *Error*
>>>>>>>>>>
>>>>>>>>>> Py4JJavaError: An error occurred while calling o2.execute.
>>>>>>>>>> : org.apache.flink.table.api.ValidationException: Field types of
>>>>>>>>>> query result and registered TableSink
>>>>>>>>>> `default_catalog`.`default_database`.`mySink` do not match.
>>>>>>>>>> Query result schema: [output_of_my_scala_udf:
>>>>>>>>>> GenericType<org.apache.flink.types.Row>]
>>>>>>>>>> TableSink schema:    [output_of_my_scala_udf: Row(s: String, t:
>>>>>>>>>> String)]
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
>>>>>>>>>> pierre.oberhol...@gmail.com> a écrit :
>>>>>>>>>>
>>>>>>>>>>> Thanks Dian, but same error when using explicit returned type:
>>>>>>>>>>>
>>>>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>>>>
>>>>>>>>>>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>>>>>>>>>
>>>>>>>>>>>     val states = Map("key1" -> "val1", "key2" -> "val2")
>>>>>>>>>>>
>>>>>>>>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>>>>>>>>>
>>>>>>>>>>>   }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> a
>>>>>>>>>>> écrit :
>>>>>>>>>>>
>>>>>>>>>>>> You need to explicitly defined the result type the UDF. You
>>>>>>>>>>>> could refer to [1] for more details if you are using Flink 1.11. 
>>>>>>>>>>>> If you are
>>>>>>>>>>>> using other versions of Flink, you need to refer to the 
>>>>>>>>>>>> corresponding
>>>>>>>>>>>> documentation.
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>>>>>>>>>>
>>>>>>>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <
>>>>>>>>>>>> pierre.oberhol...@gmail.com> 写道:
>>>>>>>>>>>>
>>>>>>>>>>>> ScalarFunction
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Pierre
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Pierre
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Pierre
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Pierre
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Pierre
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Pierre
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Pierre
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Pierre
>>>>
>>> --
>> Pierre
>>
>

-- 
Pierre

Reply via email to