Hi Aljoscha,

I like that you are pushing in this direction. However, IMHO you
misinterpreter the current approach. It does not assume that tuples
arrive in-order; the current approach has no notion about a
pre-defined-order (for example, the order in which the event are
created). There is only the notion of "arrival-order" at the operator.
From this "arrival-order" perspective, the result are correct(!).

Windowing in the current approach means for example, "sum up an
attribute of all events you *received* in the last 5 seconds". That is a
different meaning that "sum up an attribute of all event that *occurred*
in the last 5 seconds". Both queries are valid and Flink should support
both IMHO.


-Matthias



On 06/25/2015 03:03 PM, Aljoscha Krettek wrote:
> Yes, now this also processes about 3 mio Elements (Window Size 5 sec, Slide
> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio.
> 
> Performance is not my main concern, however. My concern is that the current
> model assumes elements to arrive in order, which is simply not true.
> 
> In your code you have these lines for specifying the window:
> .window(Time.of(1l, TimeUnit.SECONDS))
> .every(Time.of(1l, TimeUnit.SECONDS))
> 
> Although this semantically specifies a tumbling window of size 1 sec I'm
> afraid it uses the sliding window logic internally (because of the
> .every()).
> 
> In my tests I only have the first line.
> 
> 
> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <gga...@gmail.com> wrote:
> 
>> I'm very sorry, I had a bug in the InversePreReducer. It should be
>> fixed now. Can you please run it again?
>>
>> I also tried to reproduce some of your performance numbers, but I'm
>> getting only less than 1/10th of yours. For example, in the Tumbling
>> case, Current/Reduce produces only ~100000 for me. Do you have any
>> idea what I could be doing wrong? My code:
>> http://pastebin.com/zbEjmGhk
>> I am running it on a 2 GHz Core i7.
>>
>> Best regards,
>> Gabor
>>
>>
>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
>>> Hi,
>>> I also ran the tests on top of PR 856 (inverse reducer) now. The results
>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple source, all
>>> the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 sec)
>>> (Theoretically there would be 5000 tuples in 5 seconds but this is due to
>>> overhead). These are the results for the inverse reduce optimisation:
>>> (Tuple 0,38)
>>> (Tuple 0,829)
>>> (Tuple 0,1625)
>>> (Tuple 0,2424)
>>> (Tuple 0,3190)
>>> (Tuple 0,3198)
>>> (Tuple 0,-339368)
>>> (Tuple 0,-1315725)
>>> (Tuple 0,-2932932)
>>> (Tuple 0,-5082735)
>>> (Tuple 0,-7743256)
>>> (Tuple 0,75701046)
>>> (Tuple 0,642829470)
>>> (Tuple 0,2242018381)
>>> (Tuple 0,5190708618)
>>> (Tuple 0,10060360311)
>>> (Tuple 0,-94254951)
>>> (Tuple 0,-219806321293)
>>> (Tuple 0,-1258895232699)
>>> (Tuple 0,-4074432596329)
>>>
>>> One line is one emitted window count. This is what happens when I remove
>>> the Thread.sleep(1):
>>> (Tuple 0,660676)
>>> (Tuple 0,2553733)
>>> (Tuple 0,3542696)
>>> (Tuple 0,1)
>>> (Tuple 0,1107035)
>>> (Tuple 0,2549491)
>>> (Tuple 0,4100387)
>>> (Tuple 0,-8406583360092)
>>> (Tuple 0,-8406582150743)
>>> (Tuple 0,-8406580427190)
>>> (Tuple 0,-8406580427190)
>>> (Tuple 0,-8406580427190)
>>> (Tuple 0,6847279255682044995)
>>> (Tuple 0,6847279255682044995)
>>> (Tuple 0,-5390528042713628318)
>>> (Tuple 0,-5390528042711551780)
>>> (Tuple 0,-5390528042711551780)
>>>
>>> So at some point the pre-reducer seems to go haywire and does not recover
>>> from it. The good thing is that it does produce results now, where the
>>> previous Current/Reduce would simply hang and not produce any output.
>>>
>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> Aljoscha, can you please try the performance test of Current/Reduce
>>>> with the InversePreReducer in PR 856? (If you just call sum, it will
>>>> use an InversePreReducer.) It would be an interesting test, because
>>>> the inverse function optimization really depends on the stream being
>>>> ordered, and I think it has the potential of being faster then
>>>> Next/Reduce. Especially if the window size is much larger than the
>>>> slide size.
>>>>
>>>> Best regards,
>>>> Gabor
>>>>
>>>>
>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
>>>>> I think I'll have to elaborate a bit so I created a proof-of-concept
>>>>> implementation of my Ideas and ran some throughput measurements to
>>>>> alleviate concerns about performance.
>>>>>
>>>>> First, though, I want to highlight again why the current approach does
>>>> not
>>>>> work with out-of-order elements (which, again, occur constantly due to
>>>> the
>>>>> distributed nature of the system). This is the example I posted
>> earlier:
>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks
>>>> like
>>>>> this:
>>>>>
>>>>> +--+
>>>>> | | Source
>>>>> +--+
>>>>> |
>>>>> +-----+
>>>>> | |
>>>>> | +--+
>>>>> | | | Identity Map
>>>>> | +--+
>>>>> | |
>>>>> +-----+
>>>>> |
>>>>> +--+
>>>>> | | Window
>>>>> +--+
>>>>> |
>>>>> |
>>>>> +--+
>>>>> | | Sink
>>>>> +--+
>>>>>
>>>>> So all it does is pass the elements through an identity map and then
>>>> merge
>>>>> them again before the window operator. The source emits ascending
>>>> integers
>>>>> and the window operator has a custom timestamp extractor that uses the
>>>>> integer itself as the timestamp and should create windows of size 4
>> (that
>>>>> is elements with timestamp 0-3 are one window, the next are the
>> elements
>>>>> with timestamp 4-8, and so on). Since the topology basically doubles
>> the
>>>>> elements form the source I would expect to get these windows:
>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3
>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8
>>>>>
>>>>> The output is this, however:
>>>>> Window: 0, 1, 2, 3,
>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
>>>>> Window: 8, 9, 10, 11,
>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
>>>>> Window: 16, 17, 18, 19,
>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
>>>>> Window: 24, 25, 26, 27,
>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
>>>>>
>>>>> The reason is that the elements simply arrive out-of-order. Imagine
>> what
>>>>> would happen if the elements actually arrived with some delay from
>>>>> different operations.
>>>>>
>>>>> Now, on to the performance numbers. The proof-of-concept I created is
>>>>> available here:
>>>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The
>>>> basic
>>>>> idea is that sources assign the current timestamp when emitting
>> elements.
>>>>> They also periodically emit watermarks that tell us that no elements
>> with
>>>>> an earlier timestamp will be emitted. The watermarks propagate through
>>>> the
>>>>> operators. The window operator looks at the timestamp of an element
>> and
>>>>> puts it into the buffer that corresponds to that window. When the
>> window
>>>>> operator receives a watermark it will look at the in-flight windows
>>>>> (basically the buffers) and emit those windows where the window-end is
>>>>> before the watermark.
>>>>>
>>>>> For measuring throughput I did the following: The source emits tuples
>> of
>>>>> the form ("tuple", 1) in an infinite loop. The window operator sums up
>>>> the
>>>>> tuples, thereby counting how many tuples the window operator can
>> handle
>>>> in
>>>>> a given time window. There are two different implementations for the
>>>>> summation: 1) simply summing up the values in a mapWindow(), there you
>>>> get
>>>>> a List of all tuples and simple iterate over it. 2) using sum(1),
>> which
>>>> is
>>>>> implemented as a reduce() (that uses the pre-reducer optimisations).
>>>>>
>>>>> These are the performance numbers (Current is the current
>> implementation,
>>>>> Next is my proof-of-concept):
>>>>>
>>>>> Tumbling (1 sec):
>>>>>  - Current/Map: 1.6 mio
>>>>>  - Current/Reduce: 2 mio
>>>>>  - Next/Map: 2.2 mio
>>>>>  - Next/Reduce: 4 mio
>>>>>
>>>>> Sliding (5 sec, slide 1 sec):
>>>>>  - Current/Map: ca 3 mio (fluctuates a lot)
>>>>>  - Current/Reduce: No output
>>>>>  - Next/Map: ca 4 mio (fluctuates)
>>>>>  - Next/Reduce: 10 mio
>>>>>
>>>>> The Next/Reduce variant can basically scale indefinitely with window
>> size
>>>>> because the internal state does not rely on the number of elements
>> (it is
>>>>> just the current sum). The pre-reducer for sliding elements cannot
>> handle
>>>>> the amount of tuples, it produces no output. For the two Map variants
>> the
>>>>> performance fluctuates because they always keep all the elements in an
>>>>> internal buffer before emission, this seems to tax the garbage
>> collector
>>>> a
>>>>> bit and leads to random pauses.
>>>>>
>>>>> One thing that should be noted is that I had to disable the
>> fake-element
>>>>> emission thread, otherwise the Current versions would deadlock.
>>>>>
>>>>> So, I started working on this because I thought that out-of-order
>>>>> processing would be necessary for correctness. And it is certainly,
>> But
>>>> the
>>>>> proof-of-concept also shows that performance can be greatly improved.
>>>>>
>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>>>>
>>>>>> I agree lets separate these topics from each other so we can get
>> faster
>>>>>> resolution.
>>>>>>
>>>>>> There is already a state discussion in the thread we started with
>> Paris.
>>>>>>
>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <ktzou...@apache.org>
>>>>> wrote:
>>>>>>
>>>>>>> I agree with supporting out-of-order out of the box :-), even if
>> this
>>>>> means
>>>>>>> a major refactoring. This is the right time to refactor the
>> streaming
>>>>> API
>>>>>>> before we pull it out of beta. I think that this is more important
>>>> than
>>>>> new
>>>>>>> features in the streaming API, which can be prioritized once the
>> API
>>>> is
>>>>> out
>>>>>>> of beta (meaning, that IMO this is the right time to stall PRs
>> until
>>>> we
>>>>>>> agree on the design).
>>>>>>>
>>>>>>> There are three sections in the document: windowing, state, and
>> API.
>>>> How
>>>>>>> convoluted are those with each other? Can we separate the
>> discussion
>>>> or
>>>>> do
>>>>>>> we need to discuss those all together? I think part of the
>> difficulty
>>>> is
>>>>>>> that we are discussing three design choices at once.
>>>>>>>
>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <
>> ted.dunn...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Out of order is ubiquitous in the real-world.  Typically, what
>>>>> happens is
>>>>>>>> that businesses will declare a maximum allowable delay for
>> delayed
>>>>>>>> transactions and will commit to results when that delay is
>> reached.
>>>>>>>> Transactions that arrive later than this cutoff are collected
>>>>> specially
>>>>>>> as
>>>>>>>> corrections which are reported/used when possible.
>>>>>>>>
>>>>>>>> Clearly, ordering can also be violated during processing, but if
>> the
>>>>> data
>>>>>>>> is originally out of order the situation can't be repaired by any
>>>>>>> protocol
>>>>>>>> fixes that prevent transactions from becoming disordered but has
>> to
>>>>>>> handled
>>>>>>>> at the data level.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
>>>> aljos...@apache.org
>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I also don't like big changes but sometimes they are necessary.
>>>> The
>>>>>>>> reason
>>>>>>>>> why I'm so adamant about out-of-order processing is that
>>>>> out-of-order
>>>>>>>>> elements are not some exception that occurs once in a while;
>> they
>>>>> occur
>>>>>>>>> constantly in a distributed system. For example, in this:
>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the
>>>> resulting
>>>>>>>>> windows
>>>>>>>>> are completely bogus because the current windowing system
>> assumes
>>>>>>>> elements
>>>>>>>>> to globally arrive in order, which is simply not true. (The
>>>> example
>>>>>>> has a
>>>>>>>>> source that generates increasing integers. Then these pass
>>>> through a
>>>>>>> map
>>>>>>>>> and are unioned with the original DataStream before a window
>>>>> operator.)
>>>>>>>>> This simulates elements arriving from different operators at a
>>>>>>> windowing
>>>>>>>>> operator. The example is also DOP=1, I imagine this to get
>> worse
>>>>> with
>>>>>>>>> higher DOP.
>>>>>>>>>
>>>>>>>>> What do you mean by costly? As I said, I have a
>> proof-of-concept
>>>>>>>> windowing
>>>>>>>>> operator that can handle out-or-order elements. This is an
>> example
>>>>>>> using
>>>>>>>>> the current Flink API:
>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
>>>>>>>>> (It is an infinite source of tuples and a 5 second window
>> operator
>>>>> that
>>>>>>>>> counts the tuples.) The first problem is that this code
>> deadlocks
>>>>>>> because
>>>>>>>>> of the thread that emits fake elements. If I disable the fake
>>>>> element
>>>>>>>> code
>>>>>>>>> it works, but the throughput using my mockup is 4 times higher
>> .
>>>> The
>>>>>>> gap
>>>>>>>>> widens dramatically if the window size increases.
>>>>>>>>>
>>>>>>>>> So, it actually increases performance (unless I'm making a
>> mistake
>>>>> in
>>>>>>> my
>>>>>>>>> explorations) and can handle elements that arrive out-of-order
>>>>> (which
>>>>>>>>> happens basically always in a real-world windowing use-cases).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <se...@apache.org>
>>>> wrote:
>>>>>>>>>
>>>>>>>>>> What I like a lot about Aljoscha's proposed design is that we
>>>>> need no
>>>>>>>>>> different code for "system time" vs. "event time". It only
>>>>> differs in
>>>>>>>>> where
>>>>>>>>>> the timestamps are assigned.
>>>>>>>>>>
>>>>>>>>>> The OOP approach also gives you the semantics of total
>> ordering
>>>>>>> without
>>>>>>>>>> imposing merges on the streams.
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote:
>>>>>>>>>>
>>>>>>>>>>> I agree that there should be multiple alternatives the
>> user(!)
>>>>> can
>>>>>>>>>>> choose from. Partial out-of-order processing works for
>>>> many/most
>>>>>>>>>>> aggregates. However, if you consider
>> Event-Pattern-Matching,
>>>>> global
>>>>>>>>>>> ordering in necessary (even if the performance penalty
>> might
>>>> be
>>>>>>>> high).
>>>>>>>>>>>
>>>>>>>>>>> I would also keep "system-time windows" as an alternative
>> to
>>>>>>> "source
>>>>>>>>>>> assigned ts-windows".
>>>>>>>>>>>
>>>>>>>>>>> It might also be interesting to consider the following
>> paper
>>>> for
>>>>>>>>>>> overlapping windows: "Resource sharing in continuous
>>>>> sliding-window
>>>>>>>>>>> aggregates"
>>>>>>>>>>>
>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
>>>>>>>>>>>> Hey
>>>>>>>>>>>>
>>>>>>>>>>>> I think we should not block PRs unnecessarily if your
>>>>> suggested
>>>>>>>>> changes
>>>>>>>>>>>> might touch them at some point.
>>>>>>>>>>>>
>>>>>>>>>>>> Also I still think we should not put everything in the
>>>>> Datastream
>>>>>>>>>> because
>>>>>>>>>>>> it will be a huge mess.
>>>>>>>>>>>>
>>>>>>>>>>>> Also we need to agree on the out of order processing,
>>>> whether
>>>>> we
>>>>>>>> want
>>>>>>>>>> it
>>>>>>>>>>>> the way you proposed it(which is quite costly). Another
>>>>>>> alternative
>>>>>>>>>>>> approach there which fits in the current windowing is to
>>>>> filter
>>>>>>> out
>>>>>>>>> if
>>>>>>>>>>>> order events and apply a special handling operator on
>> them.
>>>>> This
>>>>>>>>> would
>>>>>>>>>> be
>>>>>>>>>>>> fairly lightweight.
>>>>>>>>>>>>
>>>>>>>>>>>> My point is that we need to consider some alternative
>>>>> solutions.
>>>>>>>> And
>>>>>>>>> we
>>>>>>>>>>>> should not block contributions along the way.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers
>>>>>>>>>>>> Gyula
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
>>>>>>>>> aljos...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> The reason I posted this now is that we need to think
>> about
>>>>> the
>>>>>>>> API
>>>>>>>>>> and
>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor
>> (inverse
>>>>>>> reduce)
>>>>>>>>> and
>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on DataStream).
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the windowing, I think that the current model does
>> not
>>>>> work
>>>>>>>> for
>>>>>>>>>>>>> out-of-order processing. Therefore, the whole windowing
>>>>>>>>> infrastructure
>>>>>>>>>>> will
>>>>>>>>>>>>> basically have to be redone. Meaning also that any work
>> on
>>>>> the
>>>>>>>>>>>>> pre-aggregators or optimizations that we do now becomes
>>>>> useless.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the API, I proposed to restructure the interactions
>>>>> between
>>>>>>>> all
>>>>>>>>>> the
>>>>>>>>>>>>> different *DataStream classes and grouping/windowing.
>> (See
>>>>> API
>>>>>>>>> section
>>>>>>>>>>> of
>>>>>>>>>>>>> the doc I posted.)
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
>>>> gyula.f...@gmail.com
>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good
>>>> initiative.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I added some comments to the respective sections
>> (where I
>>>>> didnt
>>>>>>>>> fully
>>>>>>>>>>>>> agree
>>>>>>>>>>>>>> :).).
>>>>>>>>>>>>>> At some point I think it would be good to have a public
>>>>> hangout
>>>>>>>>>> session
>>>>>>>>>>>>> on
>>>>>>>>>>>>>> this, which could make a more dynamic discussion.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta
>> (időpont:
>>>>>>> 2015.
>>>>>>>>> jún.
>>>>>>>>>>>>> 22.,
>>>>>>>>>>>>>> H, 21:34):
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> with people proposing changes to the streaming part I
>>>> also
>>>>>>>> wanted
>>>>>>>>> to
>>>>>>>>>>>>>> throw
>>>>>>>>>>>>>>> my hat into the ring. :D
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> During the last few months, while I was getting
>>>> acquainted
>>>>>>> with
>>>>>>>>> the
>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had
>> about
>>>>> how
>>>>>>>>> things
>>>>>>>>>>>>> could
>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent
>>>> shape
>>>>>>> now,
>>>>>>>>> so
>>>>>>>>>>>>>> please
>>>>>>>>>>>>>>> have a look if you are interested in this:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This mostly covers:
>>>>>>>>>>>>>>>  - Timestamps assigned at sources
>>>>>>>>>>>>>>>  - Out-of-order processing of elements in window
>>>> operators
>>>>>>>>>>>>>>>  - API design
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Please let me know what you think. Comment in the
>>>> document
>>>>> or
>>>>>>>> here
>>>>>>>>>> in
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> mailing list.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have a PR in the makings that would introduce source
>>>>>>>> timestamps
>>>>>>>>>> and
>>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked a
>>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>>> windowing system that is able to process out-of-order
>>>>> elements
>>>>>>>>>> using a
>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient
>>>>>>>>>>>>> pre-aggregations.)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to