Thanks Matthias, one less thing to worry about in the future :)

--
Alessandro Tagliapietra


On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Just a side note. There is currently work in progress on
> https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the
> configuration problem for Serdes.
>
> -Matthias
>
> On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote:
> > Hi Bruno,
> > thanks a lot for checking the code, regarding the SpecificAvroSerde I've
> > found that using
> >
> > final Serde<InputList> valueSpecificAvroSerde = new
> SpecificAvroSerde<>();
> > final Map<String, String> serdeConfig =
> > Collections.singletonMap("schema.registry.url", "http://localhost:8081
> ");
> > valueSpecificAvroSerde.configure(serdeConfig, false);
> >
> > and then in aggregate()
> >
> > Materialized.with(Serdes.String(), valueSpecificAvroSerde)
> >
> > fixed the issue.
> >
> > Thanks in advance for the windowing help, very appreciated.
> > In the meantime I'll try to make some progress on the rest.
> >
> > Have a great weekend
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> > On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna <br...@confluent.io>
> wrote:
> >
> >> Hi Alessandro,
> >>
> >> I had a look at your code. Regarding your question whether you use the
> >> SpecificAvroSerde correctly, take a look at the following documentation:
> >>
> >>
> https://docs.confluent.io/current/streams/developer-guide/datatypes.html
> >>
> >> I haven't had the time yet to take a closer look at your problems with
> the
> >> aggregation. I will have a look next week.
> >>
> >> Have a nice weekend,
> >> Bruno
> >>
> >> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra <
> >> tagliapietra.alessan...@gmail.com> wrote:
> >>
> >>> So I've started with a new app with the archetype:generate as in
> >>> https://kafka.apache.org/22/documentation/streams/tutorial
> >>>
> >>> I've pushed a sample repo here: https://github.com/alex88/kafka-test
> >>> The avro schemas are a Metric with 2 fields: timestamp and production
> >> and a
> >>> MetricList with a list of records (Metric) to be able to manually do
> the
> >>> aggregation.
> >>> Right now the aggregation is simple just for the purpose of the sample
> >> repo
> >>> and to easily see if we're getting wrong values.
> >>>
> >>> What I wanted to achieve is:
> >>>  - have a custom generator that generates 1 message per second with
> >>> production = 1 with 1 ore more separate message keys which in my case
> are
> >>> the sensor IDs generating the data
> >>>  - a filter that removes out of order messages by having a state that
> >>> stores key (sensorID) -> last timestamp
> >>>  - a window operation that for this example just sums the values in
> each
> >> 10
> >>> seconds windows
> >>>
> >>> To show where I'm having issues I've setup multiple branches for the
> >> repo:
> >>>  - *issue-01 <https://github.com/alex88/kafka-test/tree/issue-01>* is
> >> the
> >>> one I had initially "Failed to flush state store
> >>> KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve using
> >>>
> >>>
> >>
> https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store
> >>>  - *issue-02 <https://github.com/alex88/kafka-test/tree/issue-02>* is
> >> the
> >>> one after I've tried to solve above problem with the materializer
> (maybe
> >>> the SpecificAvroSerde is wrong?)
> >>>  - *issue-03 <https://github.com/alex88/kafka-test/tree/issue-03>*
> after
> >>> fixing issue-02 (by using groupByKey(Grouped.with(Serdes.String(), new
> >>> SpecificAvroSerde<>()))) everything seems to be working, if you let
> both
> >>> the producer and stream running, you'll see that the stream receives 10
> >>> messages (with the timestamp incrementing 1 second for each message)
> like
> >>> this:
> >>>
> >>> S1 with filtered metric{"timestamp": 160000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 161000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 162000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 163000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 164000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 165000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 166000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 167000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 168000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 169000, "production": 1}
> >>>
> >>> and at the 10 seconds interval something like:
> >>>
> >>> S1 with computed metric {"timestamp": 160000, "production": 10}
> >>> S1 with computed metric {"timestamp": 170000, "production": 10}
> >>> S1 with computed metric {"timestamp": 180000, "production": 10}
> >>>
> >>> and so on...
> >>> Now there are two problems, after stopping and restarting the stream
> >>> processor (by sending SIGINT via IntelliJ since I start the class main
> >> with
> >>> it) it happens:
> >>>  - sometimes the aggregated count is wrong, if I have it start
> windowing
> >>> for 7 seconds (e.g. seconds 11-17), restart the stream, after restart
> it
> >>> might just emit a value for the new 3 missing seconds (seconds 18-20)
> and
> >>> the aggregated value is 3 not 10
> >>>  - sometimes the window outputs twice, in the example where I restart
> the
> >>> stream processor I might get as output
> >>>
> >>> S1 with filtered metric{"timestamp": 154000, "production": 1}
> >>> S1 with computed metric {"timestamp": 160000, "production": 5}
> >>> S1 with filtered metric{"timestamp": 155000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 156000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 157000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 158000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 159000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 160000, "production": 1}
> >>> S1 with filtered metric{"timestamp": 161000, "production": 1}
> >>> S1 with computed metric {"timestamp": 160000, "production": 10}
> >>> S1 with filtered metric{"timestamp": 162000, "production": 1}
> >>>
> >>> as you can see, window for timestamp 160000 is duplicated
> >>>
> >>> Is this because the window state isn't persisted across restarts?
> >>> My ultimate goal is to have the window part emit only once and resume
> >>> processing across restarts, while avoiding processing out of order data
> >>> (that's the purpose of the TimestampIncrementalFilter)
> >>>
> >>> Thank you in advance
> >>>
> >>> --
> >>> Alessandro Tagliapietra
> >>>
> >>>
> >>> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra <
> >>> tagliapietra.alessan...@gmail.com> wrote:
> >>>
> >>>> Hi Bruno,
> >>>>
> >>>> I'm using the confluent docker images 5.2.1, so kafka 2.2.
> >>>> Anyway I'll try to make a small reproduction repo with all the
> >> different
> >>>> cases soon.
> >>>>
> >>>> Thank you
> >>>>
> >>>> --
> >>>> Alessandro Tagliapietra
> >>>>
> >>>>
> >>>> On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <br...@confluent.io>
> >>> wrote:
> >>>>
> >>>>> Hi Alessandro,
> >>>>>
> >>>>> What version of Kafka do you use?
> >>>>>
> >>>>> Could you please give a more detailed example for the issues with the
> >>> two
> >>>>> keys you see?
> >>>>>
> >>>>> Could the following bug be related to the duplicates you see?
> >>>>>
> >>>>>
> >>>>>
> >>>
> >>
> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22
> >>>>>
> >>>>> How do you restart the processor?
> >>>>>
> >>>>> Best,
> >>>>> Bruno
> >>>>>
> >>>>> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
> >>>>> tagliapietra.alessan...@gmail.com> wrote:
> >>>>>
> >>>>>> Thank you Bruno,
> >>>>>>
> >>>>>> I'll look into those, however average is just a simple thing I'm
> >>> trying
> >>>>>> right now just to get an initial windowing flow working.
> >>>>>> In the future I'll probably still need the actual values for other
> >>>>>> calculations. We won't have more than 60 elements per window for
> >> sure.
> >>>>>>
> >>>>>> So far to not manually serialize/deserialize the array list I've
> >>>>> created an
> >>>>>> Avro model with an array field containing the values.
> >>>>>> I had issues with suppress as explained here
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
> >>>>>>
> >>>>>> but I got that working.
> >>>>>> So far everything seems to be working, except a couple things:
> >>>>>>  - if I generate data with 1 key, I correctly get a value each 10
> >>>>> seconds,
> >>>>>> if I later start generating data with another key (while key 1 is
> >>> still
> >>>>>> generating) the windowing emits a value only after the timestamp of
> >>> key
> >>>>> 2
> >>>>>> reaches the last generated window
> >>>>>>  - while generating data, if I restart the processor as soon as it
> >>>>> starts
> >>>>>> it sometimes generates 2 aggregates for the same window even if I'm
> >>>>> using
> >>>>>> the suppress
> >>>>>>
> >>>>>> Anyway, I'll look into your link and try to find out the cause of
> >>> these
> >>>>>> issues, probably starting from scratch with a simpler example
> >>>>>>
> >>>>>> Thank you for your help!
> >>>>>>
> >>>>>> --
> >>>>>> Alessandro Tagliapietra
> >>>>>>
> >>>>>> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <br...@confluent.io>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi Alessandro,
> >>>>>>>
> >>>>>>> Have a look at this Kafka Usage Pattern for computing averages
> >>> without
> >>>>>>> using an ArrayList.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
> >>>>>>> ?
> >>>>>>>
> >>>>>>> The advantages of this pattern over the ArrayList approach is the
> >>>>> reduced
> >>>>>>> space needed to compute the aggregate. Note that you will still
> >> need
> >>>>> to
> >>>>>>> implement a SerDe. However, the SerDe should be a bit easier to
> >>>>> implement
> >>>>>>> than a SerDe for an ArrayList.
> >>>>>>>
> >>>>>>> Hope that helps.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Bruno
> >>>>>>>
> >>>>>>> On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
> >>>>>>> tagliapietra.alessan...@gmail.com> wrote:
> >>>>>>>
> >>>>>>>> Sorry but it seemed harder than I thought,
> >>>>>>>>
> >>>>>>>> to have the custom aggregation working I need to get an
> >> ArrayList
> >>> of
> >>>>>> all
> >>>>>>>> the values in the window, so far my aggregate DSL method creates
> >>> an
> >>>>>>>> ArrayList on the initializer and adds each value to the list in
> >>> the
> >>>>>>>> aggregator.
> >>>>>>>> Then I think I'll have to provide a serder to change the output
> >>>>> type of
> >>>>>>>> that method.
> >>>>>>>> I was looking at
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> >>>>>>>> but
> >>>>>>>> that seems more towards a list of longs and already uses
> >>> longSerde.
> >>>>>>>> I'm currently trying to implement another avro model that has a
> >>>>> field
> >>>>>> of
> >>>>>>>> type array so I can use the regular avro serializer to implement
> >>>>> this.
> >>>>>>>> Should I create my own serdes instead or is this the right way?
> >>>>>>>>
> >>>>>>>> Thank you in advance
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Alessandro Tagliapietra
> >>>>>>>>
> >>>>>>>> On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> >>>>>>>> tagliapietra.alessan...@gmail.com> wrote:
> >>>>>>>>
> >>>>>>>>> Thank you Bruno and Matthias,
> >>>>>>>>>
> >>>>>>>>> I've modified the transformer to implement the
> >>>>>> ValueTransformerWithKey
> >>>>>>>>> interface and everything is working fine.
> >>>>>>>>> I've now to window the data and manually aggregate each window
> >>>>> data
> >>>>>>> since
> >>>>>>>>> I've to do some averages and sum of differences.
> >>>>>>>>> So far I've just having some issues with message types since
> >> I'm
> >>>>>>> changing
> >>>>>>>>> the data type when aggregating the window but I think it's an
> >>> easy
> >>>>>>>> problem.
> >>>>>>>>>
> >>>>>>>>> Thank you again
> >>>>>>>>> Best
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Alessandro Tagliapietra
> >>>>>>>>>
> >>>>>>>>> On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <
> >>>>> br...@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Alessandro,
> >>>>>>>>>>
> >>>>>>>>>> the `TransformSupplier` is internally wrapped with a
> >>>>>>>> `ProcessorSupplier`,
> >>>>>>>>>> so the statement
> >>>>>>>>>>
> >>>>>>>>>> `transform` is essentially equivalent to adding the
> >> Transformer
> >>>>> via
> >>>>>>>>>> Topology#addProcessor() to your processor topology
> >>>>>>>>>>
> >>>>>>>>>> is correct.
> >>>>>>>>>>
> >>>>>>>>>> If you do not change the key, you should definitely use one
> >> of
> >>>>> the
> >>>>>>>>>> overloads of `transformValues` to avoid internal data
> >>>>>> redistribution.
> >>>>>>> In
> >>>>>>>>>> your case the overload with `ValueTransformerWithKeySupplier`
> >>> as
> >>>>>>>> suggested
> >>>>>>>>>> by Matthias would fit.
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Bruno
> >>>>>>>>>>
> >>>>>>>>>> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
> >>>>>>> matth...@confluent.io
> >>>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> There is also `ValueTransformerWithKey` that gives you
> >>>>> read-only
> >>>>>>> acess
> >>>>>>>>>>> to the key.
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>> On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> >>>>>>>>>>>> Hi Bruno,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thank you for the quick answer.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'm actually trying to do that since it seems there is
> >>>>> really no
> >>>>>>> way
> >>>>>>>>>> to
> >>>>>>>>>>>> have it use `Processor<K, V>`.
> >>>>>>>>>>>> I just wanted (if that would've made any sense) to use
> >> the
> >>>>>>> Processor
> >>>>>>>>>> in
> >>>>>>>>>>>> both DSL and non-DSL pipelines.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Anyway, regarding `transformValues()` I don't think I can
> >>>>> use it
> >>>>>>> as
> >>>>>>>> I
> >>>>>>>>>>> need
> >>>>>>>>>>>> the message key since that is the discriminating value
> >> for
> >>>>> the
> >>>>>>>> filter
> >>>>>>>>>> (I
> >>>>>>>>>>>> want to exclude old values per sensor ID so per message
> >>> key)
> >>>>>>>>>>>>
> >>>>>>>>>>>> Right now I've this
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> >>>>>>>>>>>> and
> >>>>>>>>>>>> i'm using it with `transform()` .
> >>>>>>>>>>>>
> >>>>>>>>>>>> One thing I've found confusing is this
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> >>>>>>>>>>>>
> >>>>>>>>>>>> transform is essentially equivalent to adding the
> >>> Transformer
> >>>>>> via
> >>>>>>>>>>>>> Topology#addProcessor() to yourprocessor topology
> >>>>>>>>>>>>> <
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> >>>>>>>>>>>>
> >>>>>>>>>>>>> .
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> is it? Doesn't `transform` need a TransformSupplier while
> >>>>>>>>>> `addProcessor`
> >>>>>>>>>>>> uses a ProcessorSupplier?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thank you again for your help
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> Alessandro Tagliapietra
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <
> >>>>>> br...@confluent.io
> >>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Alessandro,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Have you considered using `transform()` (actually in
> >> your
> >>>>> case
> >>>>>>> you
> >>>>>>>>>>> should
> >>>>>>>>>>>>> use `transformValues()`) instead of `.process()`?
> >>>>> `transform()`
> >>>>>>> and
> >>>>>>>>>>>>> `transformValues()` are stateful operations similar to
> >>>>>> `.process`
> >>>>>>>> but
> >>>>>>>>>>> they
> >>>>>>>>>>>>> return a `KStream`. On a `KStream` you can then apply a
> >>>>>> windowed
> >>>>>>>>>>>>> aggregation.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hope that helps.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra
> >> <
> >>>>>>>>>>>>> tagliapietra.alessan...@gmail.com> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi there,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I'm just starting with Kafka and I'm trying to create a
> >>>>> stream
> >>>>>>>>>>> processor
> >>>>>>>>>>>>>> that in multiple stages:
> >>>>>>>>>>>>>>  - filters messages using a kv store so that only
> >>> messages
> >>>>>> with
> >>>>>>>>>> higher
> >>>>>>>>>>>>>> timestamp gets processed
> >>>>>>>>>>>>>>  - aggregates the message metrics by minute giving e.g.
> >>> the
> >>>>>> avg
> >>>>>>> of
> >>>>>>>>>>> those
> >>>>>>>>>>>>>> metrics in that minute
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The message is simple, the key is the sensor ID and the
> >>>>> value
> >>>>>> is
> >>>>>>>>>> e.g. {
> >>>>>>>>>>>>>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I've started by creating a processor to use the kv
> >> store
> >>>>> and
> >>>>>>>> filter
> >>>>>>>>>> old
> >>>>>>>>>>>>>> messages:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Then I was trying to implement windowing, I saw very
> >> nice
> >>>>>>>> windowing
> >>>>>>>>>>>>>> examples for the DSL but none for the Processor API
> >>> (only a
> >>>>>>> small
> >>>>>>>>>>>>> reference
> >>>>>>>>>>>>>> to the windowed store), can someone point me in the
> >> right
> >>>>>>>> direction?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Now, since I wasn't able to find any example I tried to
> >>> use
> >>>>>> the
> >>>>>>>> DSL
> >>>>>>>>>> but
> >>>>>>>>>>>>>> haven't found a way to use my processor with it, I saw
> >>> this
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> >>>>>>>>>>>>>> but
> >>>>>>>>>>>>>> it explains mostly transformers not processors. I also
> >>> saw
> >>>>>> after
> >>>>>>>>>> that
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> example usage of the processor but `.process(...)`
> >>> returns
> >>>>>> void,
> >>>>>>>> so
> >>>>>>>>>> I
> >>>>>>>>>>>>>> cannot have a KStream from a processor?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thank you all in advance
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> Alessandro Tagliapietra
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>
>

Reply via email to