Re: long lived standalone job session cluster in kubernetes

2019-02-11 Thread Till Rohrmann
Hi Heath,

I just learned that people from Alibaba already made some good progress
with FLINK-9953. I'm currently talking to them in order to see how we can
merge this contribution into Flink as fast as possible. Since I'm quite
busy due to the upcoming release I hope that other community members will
help out with the reviewing once the PRs are opened.

Cheers,
Till

On Fri, Feb 8, 2019 at 8:50 PM Heath Albritton  wrote:

> Has any progress been made on this?  There are a number of folks in
> the community looking to help out.
>
>
> -H
>
> On Wed, Dec 5, 2018 at 10:00 AM Till Rohrmann 
> wrote:
> >
> > Hi Derek,
> >
> > there is this issue [1] which tracks the active Kubernetes integration.
> Jin Sun already started implementing some parts of it. There should also be
> some PRs open for it. Please check them out.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-9953
> >
> > Cheers,
> > Till
> >
> > On Wed, Dec 5, 2018 at 6:39 PM Derek VerLee 
> wrote:
> >>
> >> Sounds good.
> >>
> >> Is someone working on this automation today?
> >>
> >> If not, although my time is tight, I may be able to work on a PR for
> getting us started down the path Kubernetes native cluster mode.
> >>
> >>
> >> On 12/4/18 5:35 AM, Till Rohrmann wrote:
> >>
> >> Hi Derek,
> >>
> >> what I would recommend to use is to trigger the cancel with savepoint
> command [1]. This will create a savepoint and terminate the job execution.
> Next you simply need to respawn the job cluster which you provide with the
> savepoint to resume from.
> >>
> >> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint
> >>
> >> Cheers,
> >> Till
> >>
> >> On Tue, Dec 4, 2018 at 10:30 AM Andrey Zagrebin <
> and...@data-artisans.com> wrote:
> >>>
> >>> Hi Derek,
> >>>
> >>> I think your automation steps look good.
> >>> Recreating deployments should not take long
> >>> and as you mention, this way you can avoid unpredictable old/new
> version collisions.
> >>>
> >>> Best,
> >>> Andrey
> >>>
> >>> > On 4 Dec 2018, at 10:22, Dawid Wysakowicz 
> wrote:
> >>> >
> >>> > Hi Derek,
> >>> >
> >>> > I am not an expert in kubernetes, so I will cc Till, who should be
> able
> >>> > to help you more.
> >>> >
> >>> > As for the automation for similar process I would recommend having a
> >>> > look at dA platform[1] which is built on top of kubernetes.
> >>> >
> >>> > Best,
> >>> >
> >>> > Dawid
> >>> >
> >>> > [1] https://data-artisans.com/platform-overview
> >>> >
> >>> > On 30/11/2018 02:10, Derek VerLee wrote:
> >>> >>
> >>> >> I'm looking at the job cluster mode, it looks great and I and
> >>> >> considering migrating our jobs off our "legacy" session cluster and
> >>> >> into Kubernetes.
> >>> >>
> >>> >> I do need to ask some questions because I haven't found a lot of
> >>> >> details in the documentation about how it works yet, and I gave up
> >>> >> following the the DI around in the code after a while.
> >>> >>
> >>> >> Let's say I have a deployment for the job "leader" in HA with ZK,
> and
> >>> >> another deployment for the taskmanagers.
> >>> >>
> >>> >> I want to upgrade the code or configuration and start from a
> >>> >> savepoint, in an automated way.
> >>> >>
> >>> >> Best I can figure, I can not just update the deployment resources in
> >>> >> kubernetes and allow the containers to restart in an arbitrary
> order.
> >>> >>
> >>> >> Instead, I expect sequencing is important, something along the lines
> >>> >> of this:
> >>> >>
> >>> >> 1. issue savepoint command on leader
> >>> >> 2. wait for savepoint
> >>> >> 3. destroy all leader and taskmanager containers
> >>> >> 4. deploy new leader, with savepoint url
> >>> >> 5. deploy new taskmanagers
> >>> >>
> >>> >>
> >>> >> For example, I imagine old taskmanagers (with an old version of my
> >>> >> job) attaching to the new leader and causing a problem.
> >>> >>
> >>> >> Does that sound right, or am I overthinking it?
> >>> >>
> >>> >> If not, has anyone tried implementing any automation for this yet?
> >>> >>
> >>> >
> >>>
>


Re: Running JobManager as Deployment instead of Job

2019-02-11 Thread Till Rohrmann
Hi Vishal,

you can also keep the same cluster id when cancelling a job with savepoint
and then resuming a new job from it. Terminating the job should clean up
all state in Zk.

Cheers,
Till

On Fri, Feb 8, 2019 at 11:26 PM Vishal Santoshi 
wrote:

> In one case however, we do want to retain the same cluster id ( think
> ingress on k8s  and thus SLAs with external touch points ) but it is
> essentially a new job ( added an incompatible change but at the interface
> level it retains the same contract ) , the only way seems to be to remove
> the chroot/subcontext from ZK , and relaunch , essentially deleting ant
> vestiges of the previous incarnation. And that is fine if that is indeed
> the process.
>
>
> On Fri, Feb 8, 2019 at 7:58 AM Till Rohrmann  wrote:
>
>> If you keep the same cluster id, the upgraded job should pick up
>> checkpoints from the completed checkpoint store. However, I would recommend
>> to take a savepoint and resume from this savepoint because then you can
>> also specify that you allow non restored state, for example.
>>
>> Cheers,
>> Till
>>
>> On Fri, Feb 8, 2019 at 11:20 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Is the rationale of using a jobID 00* also roughly the same. As in a
>>> Flink job cluster is a single job and thus a single job id suffices ?  I am
>>> more wondering about the case when we are doing a compatible changes to a
>>> job and want to resume ( given we are in HA mode and thus have a
>>> chroot/subcontext on ZK for the job cluster ) ,  it would make no sense to
>>> give a brand new job id ?
>>>
>>> On Thu, Feb 7, 2019 at 4:42 AM Till Rohrmann 
>>> wrote:
>>>
 Hi Sergey,

 the rationale why we are using a K8s job instead of a deployment is
 that a Flink job cluster should terminate after it has successfully
 executed the Flink job. This is unlike a session cluster which should run
 forever and for which a K8s deployment would be better suited.

 If in your use case a K8s deployment would better work, then I would
 suggest to change the `job-cluster-job.yaml` accordingly.

 Cheers,
 Till

 On Tue, Feb 5, 2019 at 4:12 PM Sergey Belikov 
 wrote:

> Hi,
>
> my team is currently experimenting with Flink running in Kubernetes
> (job cluster setup). And we found out that with JobManager being deployed
> as "Job" we can't just simply update certain values in job's yaml, e.g.
> spec.template.spec.containers.image (
> https://github.com/kubernetes/kubernetes/issues/48388#issuecomment-319493817).
> This causes certain troubles in our CI/CD pipelines so we are thinking
> about using "Deployment" instead of "Job".
>
> With that being said I'm wondering what was the motivation behind
> using "Job" resource for deploying JobManager? And are there any pitfalls
> related to using Deployment and not Job for JobManager?
>
> Thank you in advance.
> --
> Best regards,
> Sergey Belikov
>



Re: Sliding window buffering on restart without save point

2019-02-11 Thread Konstantin Knauf
Hi William,

first of all, I would like to give you two pointers regarding state
migration:

* If you set UUIDs for all operators you can change the topology of your
job without breaking the compatibility with the savepoint [1]. State will
be matched to the operator with the same UUID.
* In Flink 1.7.x (In noticed, you run 1.6.2, but still ;)) the community
has improved the state migration capabilities of Flink quite a lot [2]. In
particular, state migration will work out-of-the-box with Avro Types as
long as schema changes are backward compatible in the Avro-sense, i.e.
adding a field with a default value would not be a problem anymore.  In
case of a window, the state type is determined by either the type of your
events (no pre-aggregation) or your aggregation type (with
pre-aggregation).

If you follow these suggestions, you might be able to avoid a good amount
of the savepoint incompatibilities. The strategy for the remaining cases
depends, of course. Do you use an idempotent sink, i.e. is it ok if the job
emits the same (correct) result twice? More general, what happens with the
results of the aggregations?

Depending on your requirements
https://issues.apache.org/jira/browse/FLINK-11458 might also be able to
help you in the future.

Cheers,

Konstantin

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/production_ready.html#set-uuids-for-operators
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html
On Fri, Feb 8, 2019 at 9:52 AM shater93 
wrote:

> Hello,
>
> I am having a Flink pipeline processing data in several
> overlapping(sliding)
> windows such that they span [t_i, t_i + T], where t_i is the window
> starting
> time and T is the window size. The overlap is such that t_(I+1) - t_i = T/6
> (i.e on every window size there is 6 overlapping windows).
>
> When deploying in my CI/CD process to Kubernetes, there are sometimes
> serialisation problems due to change of Flink DAG, checkpoint states and
> etc
> as a symptom of, for instance a change of definition of these classes
> (adding/removing a field). This leads to the fact that the process cannot
> start from the save point that I am saving during a deploy. How could this
> be managed in an efficient way? I understand that the way I am using
> windowing is not optimal here so lets not focus on those solutions.
>
> Currently, my only approach is:
> * Shutdown the streaming process in a controlled manner (replying the
> running version with new configs, terminating the stream when events are
> arriving after a certain timepjoint)
> * After termination, move the time-point ( offset, I am using Kafka)
> backwards in time, in this case T + eps to allow rebuffering of the
> windows.
> * Start the servicer reading from the new timepjoint, but not emitting any
> output events until it has passed a defined time-point (in this case the
> time-point of termination).
>
> Do you have any suggestions on how to improve this process?
>
> Best regards and thanks in advance for any input,
> William
>
>
>  Flink Version: 1.6.2
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: HA HDFS

2019-02-11 Thread Konstantin Knauf
Hi Steve,

HDFS can be used as checkpoint storage and plays a crucial role in Flink's
fault tolerance mechanism. HDFS alone will not suffice to get a Flink HA
setup, though. You also need Zookeeper for JobManager HA.

Flink configuration:

high-availability: zookeeper
high-availability.zookeeper.quorum: :
high-availability.cluster-id: /cluster_one # important: customize per cluster
high-availability.storageDir: hdfs:///flink/recovery


Regarding the steps to setup checkpointing to HDFS. You don't need to set
the "fs.default-scheme". As "state.backend" you use either "rocksdb" or
"filesystem". They both use HFDS for checkpoints. You can also set
"env.hadoop.conf.dir" in the Flink configuration as opposed to the
environment variable. The environment variable will then be set by the
Flink scripts.

Of course, in a basic Kubernetes setup TaskManagers and JobManager pods
should both be controlled by a Deployment, so that they are brought up
again after a fault.

Hope this helps, and please let me know if run into any issues.

Best,

Konstantin


On Wed, Feb 6, 2019 at 7:47 PM Steven Nelson 
wrote:

> I am working on a POC High Availability installation of Flink on top of
> Kubernetes with HDFS as a data storage location. I am not finding much
> documentation on doing this, or I am finding the documentation in parts and
> maybe getting it put together correctly. I think it falls between being an
> HDFS thing and a Flink thing.
>
> I am deploying to Kubernetes using the flink:1.7.0-hadoop27-scala_2.11
> container off of docker hub.
>
> I think these are the things I need to do
> 1) Setup an hdfs-site.xml file per
> https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html#Deployment
> 2) Set the HADOOP_CONF_DIR environment variable to the location of that
> file per
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#hdfs
> 3) Create a flink-conf.yaml file that looks something like
> fs.default-scheme: hdfs://
> state.backend: rocksdb
> state.savepoints.dir: hdfs://flink/savepoints
> state.checkpoints.dir: hdfs://flink/checkpoints
> 4) Dance a little jig when it works.
>
> Has anyone set this up? If so, am I missing anything?
>
> -Steve
>
>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: Broadcast state before events stream consumption

2019-02-11 Thread Konstantin Knauf
Hi Chirag, Hi Vadim,

from the top of my head, I see two options here:

* Buffer the "fast" stream inside the KeyedBroadcastProcessFunction until
relevant (whatever this means in your use case) broadcast events have
arrived. Advantage: operationally easy, events are emitted as early as
possible. Disadvantage: state size might become very large, depending on
the nature of the broadcast stream it might be hard to know, when the
"relevant broadcast events have arrived".

* Start your job and only consume the broadcast stream (by configuration).
Once the stream is "fully processed", i.e. has caught up, take a savepoint.
Finally, start the job from this savepoint with the correct "fast" stream.
There is a small race condition between taking the savepoint and restarting
the job, which might matter (or not) depending on your use case.

This topic is related to event-time alignment in sources, which has been
actively discussed in the community in the past and we might be able to
solve this in a similar way in the future.

Cheers,

Konstantin

On Fri, Feb 8, 2019 at 5:48 PM Chirag Dewan  wrote:

> Hi Vadim,
>
> I would be interested in this too.
>
> Presently, I have to read my lookup source in the *open *method and keep
> it in a cache. By doing that I cannot make use of the broadcast state until
> ofcourse the first emit comes on the *Broadcast *stream.
>
> The problem with waiting the event stream is the lack of knowledge that I
> have read all the data from the lookup source. There is no possibility of
> having a special marker in the data as well for my use case.
>
> So pre loading the data seems to be the only option right now.
>
> Thanks,
>
> Chirag
>
>
>
> On Friday, 8 February, 2019, 7:45:37 pm IST, Vadim Vararu <
> vadim.var...@adswizz.com> wrote:
>
>
> Hi all,
>
> I need to use the broadcast state mechanism (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html)
> for the next scenario.
>
> I have a reference data stream (slow) and an events stream (fast running)
> and I want to do a kind of lookup in the reference stream for each
> event. The broadcast state mechanism seems to fit perfect the scenario.
>
> From documentation:
> *As an example where broadcast state can emerge as a natural fit, one can
> imagine a low-throughput stream containing a set of rules which we want to
> evaluate against all elements coming from another stream.*
>
> However, I am not sure what is the correct way to delay the consumption of
> the fast running stream until the slow one is fully read (in case of a
> file) or until a marker is emitted (in case of some other source). Is there
> any way to accomplish that? It doesn't seem to be a rare use case.
>
> Thanks, Vadim.
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: HA HDFS

2019-02-11 Thread Vishal Santoshi
One more thing I had to do.

In HA set up, the TMs are not able to resolve the job manager's random port
through the jobmanager.rpc.port

setting.
Setup high-availability.jobmanager.port as a predefined port in your
flink-conf.yaml and expose the port via job-manager-deployment and
job-manager-service resources as well.

On Mon, Feb 11, 2019 at 4:09 AM Konstantin Knauf 
wrote:

> Hi Steve,
>
> HDFS can be used as checkpoint storage and plays a crucial role in Flink's
> fault tolerance mechanism. HDFS alone will not suffice to get a Flink HA
> setup, though. You also need Zookeeper for JobManager HA.
>
> Flink configuration:
>
> high-availability: zookeeper
> high-availability.zookeeper.quorum: :
> high-availability.cluster-id: /cluster_one # important: customize per cluster
> high-availability.storageDir: hdfs:///flink/recovery
>
>
> Regarding the steps to setup checkpointing to HDFS. You don't need to set
> the "fs.default-scheme". As "state.backend" you use either "rocksdb" or
> "filesystem". They both use HFDS for checkpoints. You can also set
> "env.hadoop.conf.dir" in the Flink configuration as opposed to the
> environment variable. The environment variable will then be set by the
> Flink scripts.
>
> Of course, in a basic Kubernetes setup TaskManagers and JobManager pods
> should both be controlled by a Deployment, so that they are brought up
> again after a fault.
>
> Hope this helps, and please let me know if run into any issues.
>
> Best,
>
> Konstantin
>
>
> On Wed, Feb 6, 2019 at 7:47 PM Steven Nelson 
> wrote:
>
>> I am working on a POC High Availability installation of Flink on top of
>> Kubernetes with HDFS as a data storage location. I am not finding much
>> documentation on doing this, or I am finding the documentation in parts and
>> maybe getting it put together correctly. I think it falls between being an
>> HDFS thing and a Flink thing.
>>
>> I am deploying to Kubernetes using the flink:1.7.0-hadoop27-scala_2.11
>> container off of docker hub.
>>
>> I think these are the things I need to do
>> 1) Setup an hdfs-site.xml file per
>> https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html#Deployment
>> 2) Set the HADOOP_CONF_DIR environment variable to the location of that
>> file per
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#hdfs
>> 3) Create a flink-conf.yaml file that looks something like
>> fs.default-scheme: hdfs://
>> state.backend: rocksdb
>> state.savepoints.dir: hdfs://flink/savepoints
>> state.checkpoints.dir: hdfs://flink/checkpoints
>> 4) Dance a little jig when it works.
>>
>> Has anyone set this up? If so, am I missing anything?
>>
>> -Steve
>>
>>
>>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


Re: Reduce one event under multiple keys

2019-02-11 Thread Fabian Hueske
Hi Stephen,

A window is created with the first record that is assigned to it.
If the windows are based on time and a key, than no window will be created
(and not space be occupied) if there is not a first record for a key and
time interval.

Anyway, if tracking the number of open files & average opening time is your
use case, you might want to implement the logic with a ProcessFunction
instead of a window.
The reason is that it is that time windows don't share state, i.e., the
information about an opened but not yet closed file would not be "carried
over" to the next window.
However, if you use a ProcessFunction, you are responsible for cleaning up
the state.

Hope this helps,
Fabian

Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
stephen.alan.conno...@gmail.com>:

>
>
> On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler  wrote:
>
>> This sounds reasonable to me.
>>
>> I'm a bit confused by this question: "*Additionally, I am (naïevely)
>> hoping that if a window has no events for a particular key, the
>> memory/storage costs are zero for that key.*"
>>
>> Are you asking whether a key that was received in window X (as part of an
>> event) is still present in window x+1? If so, then the answer is no; a key
>> will only be present in a given window if an event was received that fits
>> into that window.
>>
>
> To confirm:
>
> So let's say I'l tracking the average time a file is opened in folders.
>
> In window N we get the events:
>
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>
> {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}
>
> So there will be aggregates stored for
> ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
> ("ca:fe:ba:be","/foo/bar/README.txt"), etc
>
> In window N+1 we do not get any events at all.
>
> So the memory used by my aggregation functions from window N will be freed
> and the storage will be effectively zero (modulo any follow on processing
> that might be on a longer window)
>
> This seems to be what you are saying... in which case my naïeve hope was
> not so naïve! w00t!
>
>
>>
>> On 08.02.2019 13:21, Stephen Connolly wrote:
>>
>> Ok, I'll try and map my problem into something that should be familiar to
>> most people.
>>
>> Consider collection of PCs, each of which has a unique ID, e.g.
>> ca:fe:ba:be, de:ad:be:ef, etc.
>>
>> Each PC has a tree of local files. Some of the file paths are
>> coincidentally the same names, but there is no file sharing between PCs.
>>
>> I need to produce metrics about how often files are opened and how long
>> they are open for.
>>
>> I need for every X minute tumbling window not just the cumulative
>> averages for each PC, but the averages for each file as well as the
>> cumulative averegaes for each folder and their sub-folders.
>>
>> I have a stream of events like
>>
>>
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
>> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
>> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
>> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
>> guide.txt","duration":"196"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
>> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}
>>
>> So from that I would like to know stuff like:
>>
>> ca:fe:ba:be had 4/X opens per minute in the X minute window
>> ca:fe:ba:be had 3/X closes per minute in the X minute window and the
>> average time open was (67+97+197)/3=120... there is no guarantee that the
>> closes will be matched with opens in the same window, which is why I'm only
>> tracking them separately
>> de:ad:be:ef had 2/X opens per minute in the X minute window
>> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
>> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the
>> average time open was 120
>> de:ad:be:ef /foo had 1/X opens per minute in the X minute window
>> de:ad:be:ef /bar had 1/X opens per minute in the X minute window
>> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
>> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
>> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X
>> minute window
>> de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute
>> window
>> etc
>>
>> What I think I want to do is turn each event into a series of events with
>> different keys, so that
>>
>> {"source":"ca:fe:ba:be","action":"open","path":"/f

Re: Is there a windowing strategy that allows a different offset per key?

2019-02-11 Thread Fabian Hueske
Hi Stephen,

First of all, yes, windows computing and emitting at the same time can
cause pressure on the downstream system.

There are a few ways how you can achieve this:
* use a custom window assigner. A window assigner decides into which window
a record is assigned. This is the approach you suggested.
* use a regular window and add an operator that buffers the window results
and releases them with randomized delay.
* use a ProcessFunction which allows you to control the timing of
computations yourself.

A few months ago, there was a similar discussion on the dev mailing list
[1] (didn't read the thread) started by Rong (in CC).
Maybe, he can share some ideas / experiences as well.

Cheers,
Fabian

[1]
https://lists.apache.org/thread.html/0d1e41302b89378f88693bf4fdb52c23d4b240160b5a10c163d9c46c@%3Cdev.flink.apache.org%3E


Am So., 10. Feb. 2019 um 21:03 Uhr schrieb Stephen Connolly <
stephen.alan.conno...@gmail.com>:

> Looking into the code in TumblingEventTimeWindows:
>
> @Override
> public Collection assignWindows(Object element, long
> timestamp, WindowAssignerContext context) {
> if (timestamp > Long.MIN_VALUE) {
> // Long.MIN_VALUE is currently assigned when no timestamp is present
> long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
> return Collections.singletonList(new TimeWindow(start, start + size));
> } else {
> throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no
> timestamp marker). " +
> "Is the time characteristic set to 'ProcessingTime', or did you forget to
> call " +
> "'DataStream.assignTimestampsAndWatermarks(...)'?");
> }
> }
>
> So I think I can just write my own where the offset is derived from
> hashing the element using my hash function.
>
> Good plan or bad plan?
>
>
> On Sun, 10 Feb 2019 at 19:55, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> I would like to process a stream of data firom different customers,
>> producing output say once every 15 minutes. The results will then be loaded
>> into another system for stoage and querying.
>>
>> I have been using TumblingEventTimeWindows in my prototype, but I am
>> concerned that all the windows will start and stop at the same time and
>> cause batch load effects on the back-end data store.
>>
>> What I think I would like is that the windows could have a different
>> start offset for each key, (using a hash function that I would supply)
>>
>> Thus deterministically, key "ca:fe:ba:be" would always start based on an
>> initail offset of 00:07 UTC while say key "de:ad:be:ef" would always start
>> based on an initial offset of say 00:02 UTC
>>
>> Is this possible? Or do I just have to find some way of queuing up my
>> writes using back-pressure?
>>
>> Thanks in advance
>>
>> -stephenc
>>
>> P.S. I can trade assistance with Flink for assistance with Maven or
>> Jenkins if my questions are too wierysome!
>>
>


Re: fllink 1.7.1 and RollingFileSink

2019-02-11 Thread Fabian Hueske
Hi Vishal,

Kostas (in CC) should be able to help here.

Best, Fabian

Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <
vishal.santo...@gmail.com>:

> Any one ?
>
> On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi 
> wrote:
>
>> You don't have to. Thank you for the input.
>>
>> On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor  wrote:
>>
>>> My apologies for not seeing your use case properly.   The constraint on
>>> rolling policy is only applicable for bulk formats such as Parquet as
>>> highlighted in the docs.
>>>
>>> As for your questions, I'll have to defer to others more familiar with
>>> it.   I mostly just use bulk formats such as avro and parquet.
>>>
>>> Tim
>>>
>>>
>>> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com wrote:
>>>
 That said the in the DefaultRollingPolicy it seems the check is on the
 file size ( mimics the check shouldRollOnEVent()).

 I guess the question is

 Is  the call to shouldRollOnCheckPoint.  done by the checkpointing
 thread ?

 Are the calls to the other 2 methods shouldRollOnEVent and
 shouldRollOnProcessingTIme done on the execution thread  as in inlined ?





 On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Thanks for the quick reply.
>
> I am confused. If this was a more full featured BucketingSink ,I would
> imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in
> progress file could go into pending phase and on checkpoint the pending
> part file would be  finalized. For exactly once any files ( in progress
> file ) will have a length of the file  snapshotted to the checkpoint  and
> used to truncate the file ( if supported ) or dropped as a part-length 
> file
> ( if truncate not supported )  if a resume from a checkpoint was to 
> happen,
> to indicate what part of the the finalized file ( finalized when resumed )
> was valid . and  I had always assumed ( and there is no doc otherwise )
> that shouldRollOnCheckpoint would be similar to the other 2 apart
> from the fact it does the roll and finalize step in a single step on a
> checkpoint.
>
>
> Am I better off using BucketingSink ?  When to use BucketingSink and
> when to use RollingSink is not clear at all, even though at the surface it
> sure looks RollingSink is a better version of .BucketingSink ( or not )
>
> Regards.
>
>
>
> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor 
> wrote:
>
>> I think the only rolling policy that can be used is
>> CheckpointRollingPolicy to ensure exactly once.
>>
>> Tim
>>
>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <
>> vishal.santo...@gmail.com wrote:
>>
>>> Can StreamingFileSink be used instead of 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html,
>>>  even though it looks it could.
>>>
>>>
>>> This code for example
>>>
>>>
>>> StreamingFileSink
>>> .forRowFormat(new Path(PATH),
>>> new SimpleStringEncoder())
>>> .withBucketAssigner(new 
>>> KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>> .withRollingPolicy(new RollingPolicy>> String>() {
>>>@Override
>>>public boolean 
>>> shouldRollOnCheckpoint(PartFileInfo partFileState) throws 
>>> IOException {
>>>return false;
>>>}
>>>
>>>@Override
>>>public boolean 
>>> shouldRollOnEvent(PartFileInfo partFileState,
>>> 
>>> KafkaRecord element) throws IOException {
>>>return 
>>> partFileState.getSize() > 1024 * 1024 * 1024l;
>>>}
>>>
>>>@Override
>>>public boolean 
>>> shouldRollOnProcessingTime(PartFileInfo partFileState, long 
>>> currentTime) throws IOException {
>>>return currentTime - 
>>> partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>currentTime - 
>>> partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>}
>>>}
>>> )
>>> .build();
>>>
>>>
>>> few things I see and am not sure I follow about the new RollingFil

Re: Is there a windowing strategy that allows a different offset per key?

2019-02-11 Thread Stephen Connolly
On Mon, 11 Feb 2019 at 09:54, Fabian Hueske  wrote:

> Hi Stephen,
>
> First of all, yes, windows computing and emitting at the same time can
> cause pressure on the downstream system.
>
> There are a few ways how you can achieve this:
> * use a custom window assigner. A window assigner decides into which
> window a record is assigned. This is the approach you suggested.
>

Thanks for the link. Yes I think the custom window assigner is most
certainly the way to go for my use case. Even more specifically because the
offsets I want to use are going to be based on a subset of the assigned key
not the full assigned key (if you see my other mails this week, the key I
window is a composite key of (id,path) but I want to have all the offsets
for any specific id be the same, irrespective of the path, so the
theoretical need of access to the full key that was driving Rong's original
idea for an RFE to the WindowAssignerContext is not even necessary for my
case)


> * use a regular window and add an operator that buffers the window results
> and releases them with randomized delay.
> * use a ProcessFunction which allows you to control the timing of
> computations yourself.
>
> A few months ago, there was a similar discussion on the dev mailing list
> [1] (didn't read the thread) started by Rong (in CC).
> Maybe, he can share some ideas / experiences as well.
>

Would be awesome if Rong can share any learnings he has encountered since


>
> Cheers,
> Fabian
>
> [1]
> https://lists.apache.org/thread.html/0d1e41302b89378f88693bf4fdb52c23d4b240160b5a10c163d9c46c@%3Cdev.flink.apache.org%3E
>
>
> Am So., 10. Feb. 2019 um 21:03 Uhr schrieb Stephen Connolly <
> stephen.alan.conno...@gmail.com>:
>
>> Looking into the code in TumblingEventTimeWindows:
>>
>> @Override
>> public Collection assignWindows(Object element, long
>> timestamp, WindowAssignerContext context) {
>> if (timestamp > Long.MIN_VALUE) {
>> // Long.MIN_VALUE is currently assigned when no timestamp is present
>> long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
>> return Collections.singletonList(new TimeWindow(start, start + size));
>> } else {
>> throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no
>> timestamp marker). " +
>> "Is the time characteristic set to 'ProcessingTime', or did you forget to
>> call " +
>> "'DataStream.assignTimestampsAndWatermarks(...)'?");
>> }
>> }
>>
>> So I think I can just write my own where the offset is derived from
>> hashing the element using my hash function.
>>
>> Good plan or bad plan?
>>
>>
>> On Sun, 10 Feb 2019 at 19:55, Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> I would like to process a stream of data firom different customers,
>>> producing output say once every 15 minutes. The results will then be loaded
>>> into another system for stoage and querying.
>>>
>>> I have been using TumblingEventTimeWindows in my prototype, but I am
>>> concerned that all the windows will start and stop at the same time and
>>> cause batch load effects on the back-end data store.
>>>
>>> What I think I would like is that the windows could have a different
>>> start offset for each key, (using a hash function that I would supply)
>>>
>>> Thus deterministically, key "ca:fe:ba:be" would always start based on an
>>> initail offset of 00:07 UTC while say key "de:ad:be:ef" would always start
>>> based on an initial offset of say 00:02 UTC
>>>
>>> Is this possible? Or do I just have to find some way of queuing up my
>>> writes using back-pressure?
>>>
>>> Thanks in advance
>>>
>>> -stephenc
>>>
>>> P.S. I can trade assistance with Flink for assistance with Maven or
>>> Jenkins if my questions are too wierysome!
>>>
>>


Re: Reduce one event under multiple keys

2019-02-11 Thread Stephen Connolly
On Mon, 11 Feb 2019 at 09:42, Fabian Hueske  wrote:

> Hi Stephen,
>
> A window is created with the first record that is assigned to it.
> If the windows are based on time and a key, than no window will be created
> (and not space be occupied) if there is not a first record for a key and
> time interval.
>
> Anyway, if tracking the number of open files & average opening time is
> your use case, you might want to implement the logic with a ProcessFunction
> instead of a window.
> The reason is that it is that time windows don't share state, i.e., the
> information about an opened but not yet closed file would not be "carried
> over" to the next window.
> However, if you use a ProcessFunction, you are responsible for cleaning up
> the state.
>

Ahh but I am cheating by ensuring the events are rich enough that I do not
need to match them.

I get the "open" (they are not really "open" events - I have mapped to an
analogy... it might be more like a build job start events... or not... I'm
not at liberty to say ;-) ) events because I need to count the number of
"open"s per time period.

I get the "close" events and they include the duration plus other
information that can then be transformed into the required metrics... yes I
could derive the "open" from the "close" by subtracting the duration but:

1. they would cross window boundaries quite often, leading to repeated
fetch-update-write operations on the backing data store
2. they wouldn't be as "live" and one of the things we need to know is how
many "open"s there are in the previous window... given some durations can
be many days, waiting for the "close" event to create the "open" metric
would not be a good plan.

Basically, I am pushing some of the calculations to the edge where there is
state that makes those calculations cheap and then the rich events are
*hopefully* easy to aggregate with just simple aggregation functions that
only need to maintain the running total... at least that's what the PoC I
am experimenting with Flink should show


>
> Hope this helps,
> Fabian
>
> Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
> stephen.alan.conno...@gmail.com>:
>
>>
>>
>> On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler 
>> wrote:
>>
>>> This sounds reasonable to me.
>>>
>>> I'm a bit confused by this question: "*Additionally, I am (naïevely)
>>> hoping that if a window has no events for a particular key, the
>>> memory/storage costs are zero for that key.*"
>>>
>>> Are you asking whether a key that was received in window X (as part of
>>> an event) is still present in window x+1? If so, then the answer is no; a
>>> key will only be present in a given window if an event was received that
>>> fits into that window.
>>>
>>
>> To confirm:
>>
>> So let's say I'l tracking the average time a file is opened in folders.
>>
>> In window N we get the events:
>>
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>
>> {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}
>>
>> So there will be aggregates stored for
>> ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
>> ("ca:fe:ba:be","/foo/bar/README.txt"), etc
>>
>> In window N+1 we do not get any events at all.
>>
>> So the memory used by my aggregation functions from window N will be
>> freed and the storage will be effectively zero (modulo any follow on
>> processing that might be on a longer window)
>>
>> This seems to be what you are saying... in which case my naïeve hope was
>> not so naïve! w00t!
>>
>>
>>>
>>> On 08.02.2019 13:21, Stephen Connolly wrote:
>>>
>>> Ok, I'll try and map my problem into something that should be familiar
>>> to most people.
>>>
>>> Consider collection of PCs, each of which has a unique ID, e.g.
>>> ca:fe:ba:be, de:ad:be:ef, etc.
>>>
>>> Each PC has a tree of local files. Some of the file paths are
>>> coincidentally the same names, but there is no file sharing between PCs.
>>>
>>> I need to produce metrics about how often files are opened and how long
>>> they are open for.
>>>
>>> I need for every X minute tumbling window not just the cumulative
>>> averages for each PC, but the averages for each file as well as the
>>> cumulative averegaes for each folder and their sub-folders.
>>>
>>> I have a stream of events like
>>>
>>>
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
>>> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
>>> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
>>> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","pa

Re: Flink Standalone cluster - logging problem

2019-02-11 Thread Gary Yao
Hi,

Can you define what you mean by "job logs"? For code that is run on the
cluster, i.e., JM or TM, you should add your config to log4j.properties. The
log4j-cli.properties file is only used by the Flink CLI process.

Best,
Gary

On Mon, Feb 11, 2019 at 7:39 AM simpleusr  wrote:

> Hi Chesnay,
>
> below is the content for my log4j-cli.properties file. I expect my job logs
> (packaged under com.mycompany.xyz to be written to file2 appender. However
> no file generated with prefix XYZ. I restarted the cluster , canceled
> resubmitted several times but none of them helped.
>
>
> /
> log4j.rootLogger=INFO, file
>
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=false
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>
> log4j.appender.file2=org.apache.log4j.FileAppender
> log4j.appender.file2.file=XYZ-${log.file}
> log4j.appender.file2.append=false
> log4j.appender.file2.layout=org.apache.log4j.PatternLayout
> log4j.appender.file2.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>
>
> # Log output from org.apache.flink.yarn to the console. This is used by the
> # CliFrontend class when using a per-job YARN cluster.
> log4j.logger.org.apache.flink.yarn=INFO, console
> log4j.logger.org.apache.flink.yarn.cli.FlinkYarnSessionCli=INFO, console
> log4j.logger.org.apache.hadoop=INFO, console
>
> log4j.appender.console=org.apache.log4j.ConsoleAppender
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
> log4j.appender.console.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>
> # suppress the warning that hadoop native libraries are not loaded
> (irrelevant for the client)
> log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
>
> # suppress the irrelevant (wrong) warnings from the netty channel handler
>
> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
> file
>
>
> log4j.logger.com.hazelcast=INFO, file2
> log4j.logger.com.mycompany.xyz=DEBUG, file2/
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink Standalone cluster - logging problem

2019-02-11 Thread simpleusr
Hi Gary,

By "job logs" I mean all the loggers under a subpackage of com.mycompany.xyz 
. 

We are using ./bin/flink run command for job execution thats why I modified
log4j-cli.properties. Modification of log4j.properties also did not help...

Regards



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Standalone cluster - logging problem

2019-02-11 Thread Selvaraj chennappan
Could you pls try modifying conf/logback.xml .

Regards,
Selvaraj C

On Mon, Feb 11, 2019 at 4:32 PM simpleusr  wrote:

> Hi Gary,
>
> By "job logs" I mean all the loggers under a subpackage of
> com.mycompany.xyz
> .
>
> We are using ./bin/flink run command for job execution thats why I modified
> log4j-cli.properties. Modification of log4j.properties also did not help...
>
> Regards
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 





Regards,
Selvaraj C


Re: Flink Standalone cluster - logging problem

2019-02-11 Thread simpleusr
Hi Selveraj,

This did not help either. 

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: No resource available error while testing HA

2019-02-11 Thread Gary Yao
Hi Averell,

Logback has this feature [1] but is not enabled out of the box. You will
have
to enable the JMX agent by setting the com.sun.management.jmxremote system
property [2][3]. I have not tried this out, though.

Best,
Gary

[1] https://logback.qos.ch/manual/jmxConfig.html
[2]
https://docs.oracle.com/javase/8/docs/technotes/guides/management/agent.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#env-java-opts

On Thu, Feb 7, 2019 at 11:51 AM Averell  wrote:

> Hi Gary,
>
> I am trying to reproduce that problem.
> BTW, is that possible to change log level (I'm using logback) for a running
> job?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Anyone tried to do blue-green topology deployments?

2019-02-11 Thread Stephen Connolly
I have my main application updating with a blue-green deployment strategy
whereby a new version (always called green) starts receiving an initial
fraction of the web traffic and then - based on the error rates - we
progress the % of traffic until 100% of traffic is being handled by the
green version. At which point we decommission blue and green is the new
blue when the next version comes along.

Applied to Flink, my initial thought is that you would run the two
topologies in parallel, but the first action of each topology would be a
filter based on the key.

You basically would use a consistent transformation of the key into a
number between 0 and 100 and the filter would be:

(key) -> color == green ? f(key) < level : f(key) >= level

Then I can use a suitable metric to determine if the new topology is
working and ramp up or down the level.

One issue I foresee is what happens if the level changes mid-window, I will
have output from both topologies when the window ends.

In the case of my output, which is aggregatable, I will get the same
results from two rows as from one row *provided* that the switch from blue
to green is synchronized between the two topologies. That sounds like a
hard problem though.


Another thought I had was to let the web front-end decide based on the same
key vs level approach. Rather than submit the raw event, I would add the
target topology to the event and the filter just selects based on whether
it is the target topology. This has the advantage that I know each event
will only ever be processed by one of green or blue. Heck I could even use
the main web application's blue-green deployment to drive the flink blue
green deployment as due to the way I structure my results I don't care if I
get two rows of counts for a time window or one row of counts, because I'm
adding up the total counts across multiple rows and sum is sum!


Anyone else had to try and deal with this type of thing?

-stephenc


Re: Anyone tried to do blue-green topology deployments?

2019-02-11 Thread Stephen Connolly
On Mon, 11 Feb 2019 at 13:26, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> I have my main application updating with a blue-green deployment strategy
> whereby a new version (always called green) starts receiving an initial
> fraction of the web traffic and then - based on the error rates - we
> progress the % of traffic until 100% of traffic is being handled by the
> green version. At which point we decommission blue and green is the new
> blue when the next version comes along.
>
> Applied to Flink, my initial thought is that you would run the two
> topologies in parallel, but the first action of each topology would be a
> filter based on the key.
>
> You basically would use a consistent transformation of the key into a
> number between 0 and 100 and the filter would be:
>
> (key) -> color == green ? f(key) < level : f(key) >= level
>
> Then I can use a suitable metric to determine if the new topology is
> working and ramp up or down the level.
>
> One issue I foresee is what happens if the level changes mid-window, I
> will have output from both topologies when the window ends.
>
> In the case of my output, which is aggregatable, I will get the same
> results from two rows as from one row *provided* that the switch from blue
> to green is synchronized between the two topologies. That sounds like a
> hard problem though.
>
>
> Another thought I had was to let the web front-end decide based on the
> same key vs level approach. Rather than submit the raw event, I would add
> the target topology to the event and the filter just selects based on
> whether it is the target topology. This has the advantage that I know each
> event will only ever be processed by one of green or blue. Heck I could
> even use the main web application's blue-green deployment to drive the
> flink blue green deployment
>

In other words, if a blue web node receives an event upload it adds "blue",
whereas if a green web node receives an event upload it adds "green" (not
quite those strings but rather the web deployment sequence number). This
has the advantage that the web nodes do not need to parse the event
payload. The % of web traffic will result in the matching % of events being
sent to blue and green. Also this means that all keys get processed at the
target % during the deployment, which can help flush out bugs.

I can therefore stop the old topology at > 1 window after the green web
node started getting 100% of traffic in order to allow any existing windows
in flight to flush all the way to the datastore...

Out of order events would be tagged as green once green is 100% of traffic,
and so can be processed correctly...

And I can completely ignore topology migration serialization issues...

Sounding very tempting... there must be something wrong...

(or maybe my data storage plan just allows me to make this kind of
optimization!)


> as due to the way I structure my results I don't care if I get two rows of
> counts for a time window or one row of counts, because I'm adding up the
> total counts across multiple rows and sum is sum!
>

>
> Anyone else had to try and deal with this type of thing?
>
> -stephenc
>
>
>


Re: Anyone tried to do blue-green topology deployments?

2019-02-11 Thread Stephen Connolly
Another possibility would be injecting pseudo events into the source and
having a stateful filter.

The event would be something like “key X is now owned by green”.

I can do that because getting a list of keys seen in the past X minutes is
cheap (we have it already)

But it’s unclear what impact would be adding such state to the filter

On Mon 11 Feb 2019 at 13:33, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

>
>
> On Mon, 11 Feb 2019 at 13:26, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> I have my main application updating with a blue-green deployment strategy
>> whereby a new version (always called green) starts receiving an initial
>> fraction of the web traffic and then - based on the error rates - we
>> progress the % of traffic until 100% of traffic is being handled by the
>> green version. At which point we decommission blue and green is the new
>> blue when the next version comes along.
>>
>> Applied to Flink, my initial thought is that you would run the two
>> topologies in parallel, but the first action of each topology would be a
>> filter based on the key.
>>
>> You basically would use a consistent transformation of the key into a
>> number between 0 and 100 and the filter would be:
>>
>> (key) -> color == green ? f(key) < level : f(key) >= level
>>
>> Then I can use a suitable metric to determine if the new topology is
>> working and ramp up or down the level.
>>
>> One issue I foresee is what happens if the level changes mid-window, I
>> will have output from both topologies when the window ends.
>>
>> In the case of my output, which is aggregatable, I will get the same
>> results from two rows as from one row *provided* that the switch from blue
>> to green is synchronized between the two topologies. That sounds like a
>> hard problem though.
>>
>>
>> Another thought I had was to let the web front-end decide based on the
>> same key vs level approach. Rather than submit the raw event, I would add
>> the target topology to the event and the filter just selects based on
>> whether it is the target topology. This has the advantage that I know each
>> event will only ever be processed by one of green or blue. Heck I could
>> even use the main web application's blue-green deployment to drive the
>> flink blue green deployment
>>
>
> In other words, if a blue web node receives an event upload it adds
> "blue", whereas if a green web node receives an event upload it adds
> "green" (not quite those strings but rather the web deployment sequence
> number). This has the advantage that the web nodes do not need to parse the
> event payload. The % of web traffic will result in the matching % of events
> being sent to blue and green. Also this means that all keys get processed
> at the target % during the deployment, which can help flush out bugs.
>
> I can therefore stop the old topology at > 1 window after the green web
> node started getting 100% of traffic in order to allow any existing windows
> in flight to flush all the way to the datastore...
>
> Out of order events would be tagged as green once green is 100% of
> traffic, and so can be processed correctly...
>
> And I can completely ignore topology migration serialization issues...
>
> Sounding very tempting... there must be something wrong...
>
> (or maybe my data storage plan just allows me to make this kind of
> optimization!)
>
>
>> as due to the way I structure my results I don't care if I get two rows
>> of counts for a time window or one row of counts, because I'm adding up the
>> total counts across multiple rows and sum is sum!
>>
>
>>
>> Anyone else had to try and deal with this type of thing?
>>
>> -stephenc
>>
>>
>> --
Sent from my phone


Re: Flink Standalone cluster - logging problem

2019-02-11 Thread Gary Yao
Hi,

Are you logging from your own operator implementations, and you expect these
log messages to end up in a file prefixed with XYZ-? If that is the case,
modifying log4j-cli.properties will not be expedient as I wrote earlier.

You should modify the log4j.properties on all hosts that are running the
JobManager (JM) and TaskManagers (TM). Consequently, the log files can only
be
found on the hosts that are running the JM and TMs.

Furthermore, I see a problem with the following line in your log4j
configuration:

log4j.appender.file2.file=XYZ-${log.file}

Here, ${log.file} can be an absolute path, which means you would end up
prefixing the whole path instead of just the filename.

If this does not help, please share a minimum working example with us.

Best,
Gary


On Mon, Feb 11, 2019 at 12:02 PM simpleusr  wrote:

> Hi Gary,
>
> By "job logs" I mean all the loggers under a subpackage of
> com.mycompany.xyz
> .
>
> We are using ./bin/flink run command for job execution thats why I modified
> log4j-cli.properties. Modification of log4j.properties also did not help...
>
> Regards
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Anyone tried to do blue-green topology deployments?

2019-02-11 Thread Stephen Connolly
On Mon, 11 Feb 2019 at 14:10, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> Another possibility would be injecting pseudo events into the source and
> having a stateful filter.
>
> The event would be something like “key X is now owned by green”.
>
> I can do that because getting a list of keys seen in the past X minutes is
> cheap (we have it already)
>
> But it’s unclear what impact would be adding such state to the filter
>

Hmmm might not need to be quite so stateful, if the filter was implemented
as a BroadcastProcessFunction or a KeyedBroadcastProcessFunction, I could
run the key -> threshold and compare to the level from the Broadcast
context... that way the broadcast events wouldn't need to be associated
with any specific key and could just be {"level":56}


>
> On Mon 11 Feb 2019 at 13:33, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>>
>>
>> On Mon, 11 Feb 2019 at 13:26, Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> I have my main application updating with a blue-green deployment
>>> strategy whereby a new version (always called green) starts receiving an
>>> initial fraction of the web traffic and then - based on the error rates -
>>> we progress the % of traffic until 100% of traffic is being handled by the
>>> green version. At which point we decommission blue and green is the new
>>> blue when the next version comes along.
>>>
>>> Applied to Flink, my initial thought is that you would run the two
>>> topologies in parallel, but the first action of each topology would be a
>>> filter based on the key.
>>>
>>> You basically would use a consistent transformation of the key into a
>>> number between 0 and 100 and the filter would be:
>>>
>>> (key) -> color == green ? f(key) < level : f(key) >= level
>>>
>>> Then I can use a suitable metric to determine if the new topology is
>>> working and ramp up or down the level.
>>>
>>> One issue I foresee is what happens if the level changes mid-window, I
>>> will have output from both topologies when the window ends.
>>>
>>> In the case of my output, which is aggregatable, I will get the same
>>> results from two rows as from one row *provided* that the switch from blue
>>> to green is synchronized between the two topologies. That sounds like a
>>> hard problem though.
>>>
>>>
>>> Another thought I had was to let the web front-end decide based on the
>>> same key vs level approach. Rather than submit the raw event, I would add
>>> the target topology to the event and the filter just selects based on
>>> whether it is the target topology. This has the advantage that I know each
>>> event will only ever be processed by one of green or blue. Heck I could
>>> even use the main web application's blue-green deployment to drive the
>>> flink blue green deployment
>>>
>>
>> In other words, if a blue web node receives an event upload it adds
>> "blue", whereas if a green web node receives an event upload it adds
>> "green" (not quite those strings but rather the web deployment sequence
>> number). This has the advantage that the web nodes do not need to parse the
>> event payload. The % of web traffic will result in the matching % of events
>> being sent to blue and green. Also this means that all keys get processed
>> at the target % during the deployment, which can help flush out bugs.
>>
>> I can therefore stop the old topology at > 1 window after the green web
>> node started getting 100% of traffic in order to allow any existing windows
>> in flight to flush all the way to the datastore...
>>
>> Out of order events would be tagged as green once green is 100% of
>> traffic, and so can be processed correctly...
>>
>> And I can completely ignore topology migration serialization issues...
>>
>> Sounding very tempting... there must be something wrong...
>>
>> (or maybe my data storage plan just allows me to make this kind of
>> optimization!)
>>
>>
>>> as due to the way I structure my results I don't care if I get two rows
>>> of counts for a time window or one row of counts, because I'm adding up the
>>> total counts across multiple rows and sum is sum!
>>>
>>
>>>
>>> Anyone else had to try and deal with this type of thing?
>>>
>>> -stephenc
>>>
>>>
>>> --
> Sent from my phone
>


Re: stream of large objects

2019-02-11 Thread Aggarwal, Ajay
I looked a little into broadcast state and while its interesting I don’t think 
it will help me. Since broadcast state is kept all in-memory, I am worried 
about memory requirement if I make all these LargeMessages part of broadcast 
state. Furthermore these LargeMessages need to be processed in a Keyed context, 
so sharing all of these across all downstream tasks does not seem efficient.


From: Chesnay Schepler 
Date: Sunday, February 10, 2019 at 4:57 AM
To: "Aggarwal, Ajay" , "user@flink.apache.org" 

Subject: Re: stream of large objects

NetApp Security WARNING: This is an external email. Do not click links or open 
attachments unless you recognize the sender and know the content is safe.



The Broadcast 
State
 may be interesting to you.

On 08.02.2019 15:57, Aggarwal, Ajay wrote:
Yes, another KeyBy will be used. The “small size” messages will be strings of 
length 500 to 1000.

Is there a concept of “global” state in flink? Is it possible to keep these 
lists in global state and only pass the list reference (by name?) in the 
LargeMessage?


From: Chesnay Schepler 
Date: Friday, February 8, 2019 at 8:45 AM
To: "Aggarwal, Ajay" 
, 
"user@flink.apache.org" 

Subject: Re: stream of large objects

Whether a LargeMessage is serialized depends on how the job is structured.
For example, if you were to only apply map/filter functions after the 
aggregation it is likely they wouldn't be serialized.
If you were to apply another keyBy they will be serialized again.

When you say "small size" messages, what are we talking about here?

On 07.02.2019 20:37, Aggarwal, Ajay wrote:
In my use case my source stream contain small size messages, but as part of 
flink processing I will be aggregating them into large messages and further 
processing will happen on these large messages. The structure of this large 
message will be something like this:

   Class LargeMessage {
String key
   List  messages; // this is where the aggregation of smaller 
messages happen
   }

In some cases this list field of LargeMessage can get very large (1000’s of 
messages). Is it ok to create an intermediate stream of these LargeMessages? 
What should I be concerned about while designing the flink job? Specifically 
with parallelism in mind. As these LargeMessages flow from one flink subtask to 
another, do they get serialized/deserialized ?

Thanks.







Re: How to load multiple same-format files with single batch job?

2019-02-11 Thread françois lacombe
Hi Fabian,

I've got issues for a custom InputFormat implementation with my existing
code.

Is this can be used in combination with a BatchTableSource custom source?
As I understand your solution, I should move my source to implementations
like :

tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .registerTableSource("MyTable")

right?

I currently have a BatchTableSource class which produce a DataSet from
a single geojson file.
This doesn't sound compatible with a custom InputFormat, don't you?

Thanks in advance for any addition hint, all the best

François

Le lun. 4 févr. 2019 à 12:10, Fabian Hueske  a écrit :

> Hi,
>
> The files will be read in a streaming fashion.
> Typically files are broken down into processing splits that are
> distributed to tasks for reading.
> How a task reads a file split depends on the implementation, but usually
> the format reads the split as a stream and does not read the split as a
> whole before emitting records.
>
> Best,
> Fabian
>
> Am Mo., 4. Feb. 2019 um 12:06 Uhr schrieb françois lacombe <
> francois.laco...@dcbrain.com>:
>
>> Hi Fabian,
>>
>> Thank you for this input.
>> This is interesting.
>>
>> With such an input format, will all the file will be loaded in memory
>> before to be processed or will all be streamed?
>>
>> All the best
>> François
>>
>> Le mar. 29 janv. 2019 à 22:20, Fabian Hueske  a
>> écrit :
>>
>>> Hi,
>>>
>>> You can point a file-based input format to a directory and the input
>>> format should read all files in that directory.
>>> That works as well for TableSources that are internally use file-based
>>> input formats.
>>> Is that what you are looking for?
>>>
>>> Best, Fabian
>>>
>>> Am Mo., 28. Jan. 2019 um 17:22 Uhr schrieb françois lacombe <
>>> francois.laco...@dcbrain.com>:
>>>
 Hi all,

 I'm wondering if it's possible and what's the best way to achieve the
 loading of multiple files with a Json source to a JDBC sink ?
 I'm running Flink 1.7.0

 Let's say I have about 1500 files with the same structure (same format,
 schema, everything) and I want to load them with a *batch* job
 Can Flink handle the loading of one and each file in a single source
 and send data to my JDBC sink?
 I wish I can provide the URL of the directory containing my thousand
 files to the batch source to make it load all of them sequentially.
 My sources and sinks are currently available for BatchTableSource, I
 guess the cost to make them available for streaming would be quite
 expensive for me for the moment.

 Have someone ever done this?
 Am I wrong to expect doing so with a batch job?

 All the best

 François Lacombe


    

 

 [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
 nécessaire

>>>
>>
>>    
>> 
>> 
>>
>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
>> nécessaire
>>
>

-- 

       
   



 Pensez à la 
planète, imprimer ce papier que si nécessaire 


Flink Job sometimes does not stop as expected on cancelling.

2019-02-11 Thread Mahantesh Patil
Hi Team,

We have Flink jobs running in cluster mode. When I cancel the job and check
for status it still shows as running.

Below is logs generated.  I do not see any useful information. Could you
guys point me in a right direction to debug this , our flink jobs do heavy
processing , my understanding is on cancelling flink waits till operator
completes processing to store save point. So is it happening because one of
the operator is hung waiting for resource and is there a way to configure
timeout.


Appreciate your help.



10 Jan 2019 20:36:51,490 INFO org.apache.flink.runtime.taskmanager.Task -
Triggering cancellation of task code Source: control -> control-filter
(1/1) (c6f50328bb09ce1ad389e5ea614fc8a9).
10 Jan 2019 20:36:51,490 INFO org.apache.flink.runtime.taskmanager.Task -
Attempting to cancel task interaction-details (1/1)
(dc946a863e34f66a95df3a8e8d540cab).
10 Jan 2019 20:36:51,490 INFO org.apache.flink.runtime.taskmanager.Task -
interaction-details (1/1) (dc946a863e34f66a95df3a8e8d540cab) switched from
RUNNING to CANCELING.
10 Jan 2019 20:36:51,491 INFO org.apache.flink.runtime.taskmanager.Task -
Triggering cancellation of task code interaction-details (1/1)
(dc946a863e34f66a95df3a8e8d540cab).
10 Jan 2019 20:36:51,491 INFO org.apache.flink.runtime.taskmanager.Task -
Attempting to cancel task current-longests (1/1)
(2f1cee63ab479a8e9e492c990311ccc1).
10 Jan 2019 20:36:51,491 INFO org.apache.flink.runtime.taskmanager.Task -
current-longests (1/1) (2f1cee63ab479a8e9e492c990311ccc1) switched from
RUNNING to CANCELING.
10 Jan 2019 20:36:51,491 INFO org.apache.flink.runtime.taskmanager.Task -
Triggering cancellation of task code current-longests (1/1)
(2f1cee63ab479a8e9e492c990311ccc1).
10 Jan 2019 20:36:51,492 INFO org.apache.flink.runtime.taskmanager.Task -
Attempting to cancel task interactions-count (1/1)
(aff734ce398978cdc3ac40a1e530d0ed).
10 Jan 2019 20:36:51,492 INFO org.apache.flink.runtime.taskmanager.Task -
interactions-count (1/1) (aff734ce398978cdc3ac40a1e530d0ed) switched from
RUNNING to CANCELING.
10 Jan 2019 20:36:51,628 INFO org.apache.flink.runtime.taskmanager.Task -
Triggering cancellation of task code interactions-count (1/1)
(aff734ce398978cdc3ac40a1e530d0ed).
10 Jan 2019 20:36:51,680 INFO org.apache.flink.runtime.taskmanager.Task -
Attempting to cancel task agents-working-on-interactions (1/1)
(15aa5e81a66a46127ab85719b9df7279).
10 Jan 2019 20:36:51,680 INFO org.apache.flink.runtime.taskmanager.Task -
agents-working-on-interactions (1/1) (15aa5e81a66a46127ab85719b9df7279)
switched from RUNNING to CANCELING.
10 Jan 2019 20:36:51,680 INFO org.apache.flink.runtime.taskmanager.Task -
Triggering cancellation of task code agents-working-on-interactions (1/1)
(15aa5e81a66a46127ab85719b9df7279).
10 Jan 2019 20:36:51,811 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask - Could not shut down
timer service
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
at
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
10 Jan 2019 20:36:51,811 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask - Could not shut down
timer service
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
at
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
10 Jan 2019 20:36:51,813 INFO org.apache.flink.runtime.taskmanager.Task -
Attempting to cancel task subscription-management (1/1)
(f5e529744e645e6acf00d2c5aab2006e).
10 Jan 2019 20:37:07,156 INFO org.apache.flink.runtime.taskmanager.Task -
subscription-management (1/1) (f5e529744e645e6acf00d2c5aab2006e) switched
from RUNNING to CANCELING.
10 Jan 2019 20:37:07,157 INFO org.apache.flink.runtime.taskmanager.Task -
Triggering cancellation of task code subscription-management (1/1)
(f5e529744e645e6acf00d2c5aab2006e).
10 Jan 2019 20:37:07,164 INFO org.apache.flink.runtime.taskmanager.Task -
Attempting to cancel task filter-business-metrics -> Sink: data_feed (1/1)
(6c229624ea4735215fe98f5458c58c8e).
10 Jan 2019 20:37:07,164 INFO org.apache.flink.runtime.taskmanager.T

Re: Can an Aggregate the key from a WindowedStream.aggregate()

2019-02-11 Thread Rong Rong
Hi Stephen,

Chesney was right, you will have to use a more complex version of the
window processing function.
Perhaps your goal can be achieve by this specific function with incremental
aggregation [1]. If not you can always use the regular process window
function [2].
Both of these methods have access to the KEY information you required.

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction



On Sun, Feb 10, 2019 at 11:29 AM Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

>
>
> On Sun, 10 Feb 2019 at 09:48, Chesnay Schepler  wrote:
>
>> There are also versions of WindowedStream#aggregate that accept an
>> additional WindowFunction/ProcessWindowFunction, which do have access to
>> the key via apply()/process() respectively. These functions are called
>> post aggregation.
>>
>
> Cool I'll chase those down
>
>
>>
>> On 08.02.2019 18:24, stephen.alan.conno...@gmail.com wrote:
>> > If I write my aggregation logic as a WindowFunction then I get access
>> to the key as the first parameter in WindowFunction.apply(...) however the
>> Javadocs for calling WindowedStream.apply(WindowFunction) state:
>> >
>> >> Note that this function requires that all data in the windows is
>> buffered until the window
>> >> is evaluated, as the function provides no means of incremental
>> aggregation.
>> > Which sounds bad.
>> >
>> > It seems the recommended alternative is to use one of the
>> WindowFunction.aggregate(AggregateFunction) however I cannot see how to get
>> access to the key...
>> >
>> > Is my only solution to transform my data into a Tuple if I need access
>> to the key post aggregation?
>> >
>> > Thanks in advance
>> >
>> > -stephenc
>> >
>>
>>


Re: Is there a windowing strategy that allows a different offset per key?

2019-02-11 Thread Rong Rong
getKey(IN value)Hi Stephen,

Yes, we had a discussion regarding for dynamic offsets and keys [1]. The
main idea was the same: we don't have many complex operators after the
window operator, thus a huge spike of traffic will occur after firing on
the window boundary. In the discussion the best idea is to override with a
special *WindowAssigner*, in which you can customize the offset strategy.

The only thing is that the *KeySelector* you use before windowing have to
be stateless (e.g. every invoke of *getKey(IN value)* function with same
input value should return identical result). In your case, if the id field
is used to determine the offset, you can always do that by extracting id
from the key tuple of (id, path).

Hope these helps.

Thanks,
Rong


[1]
https://docs.google.com/document/d/1fEhbcRgxxX8zFYD_iMBG1DCbHmTcTRfRQFXelPhMFiY/edit?usp=sharing

On Mon, Feb 11, 2019 at 2:20 AM Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

>
>
> On Mon, 11 Feb 2019 at 09:54, Fabian Hueske  wrote:
>
>> Hi Stephen,
>>
>> First of all, yes, windows computing and emitting at the same time can
>> cause pressure on the downstream system.
>>
>> There are a few ways how you can achieve this:
>> * use a custom window assigner. A window assigner decides into which
>> window a record is assigned. This is the approach you suggested.
>>
>
> Thanks for the link. Yes I think the custom window assigner is most
> certainly the way to go for my use case. Even more specifically because the
> offsets I want to use are going to be based on a subset of the assigned key
> not the full assigned key (if you see my other mails this week, the key I
> window is a composite key of (id,path) but I want to have all the offsets
> for any specific id be the same, irrespective of the path, so the
> theoretical need of access to the full key that was driving Rong's original
> idea for an RFE to the WindowAssignerContext is not even necessary for my
> case)
>
>
>> * use a regular window and add an operator that buffers the window
>> results and releases them with randomized delay.
>> * use a ProcessFunction which allows you to control the timing of
>> computations yourself.
>>
>> A few months ago, there was a similar discussion on the dev mailing list
>> [1] (didn't read the thread) started by Rong (in CC).
>> Maybe, he can share some ideas / experiences as well.
>>
>
> Would be awesome if Rong can share any learnings he has encountered since
>
>
>>
>> Cheers,
>> Fabian
>>
>> [1]
>> https://lists.apache.org/thread.html/0d1e41302b89378f88693bf4fdb52c23d4b240160b5a10c163d9c46c@%3Cdev.flink.apache.org%3E
>>
>>
>> Am So., 10. Feb. 2019 um 21:03 Uhr schrieb Stephen Connolly <
>> stephen.alan.conno...@gmail.com>:
>>
>>> Looking into the code in TumblingEventTimeWindows:
>>>
>>> @Override
>>> public Collection assignWindows(Object element, long
>>> timestamp, WindowAssignerContext context) {
>>> if (timestamp > Long.MIN_VALUE) {
>>> // Long.MIN_VALUE is currently assigned when no timestamp is present
>>> long start = TimeWindow.getWindowStartWithOffset(timestamp, offset,
>>> size);
>>> return Collections.singletonList(new TimeWindow(start, start + size));
>>> } else {
>>> throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no
>>> timestamp marker). " +
>>> "Is the time characteristic set to 'ProcessingTime', or did you forget
>>> to call " +
>>> "'DataStream.assignTimestampsAndWatermarks(...)'?");
>>> }
>>> }
>>>
>>> So I think I can just write my own where the offset is derived from
>>> hashing the element using my hash function.
>>>
>>> Good plan or bad plan?
>>>
>>>
>>> On Sun, 10 Feb 2019 at 19:55, Stephen Connolly <
>>> stephen.alan.conno...@gmail.com> wrote:
>>>
 I would like to process a stream of data firom different customers,
 producing output say once every 15 minutes. The results will then be loaded
 into another system for stoage and querying.

 I have been using TumblingEventTimeWindows in my prototype, but I am
 concerned that all the windows will start and stop at the same time and
 cause batch load effects on the back-end data store.

 What I think I would like is that the windows could have a different
 start offset for each key, (using a hash function that I would supply)

 Thus deterministically, key "ca:fe:ba:be" would always start based on
 an initail offset of 00:07 UTC while say key "de:ad:be:ef" would always
 start based on an initial offset of say 00:02 UTC

 Is this possible? Or do I just have to find some way of queuing up my
 writes using back-pressure?

 Thanks in advance

 -stephenc

 P.S. I can trade assistance with Flink for assistance with Maven or
 Jenkins if my questions are too wierysome!

>>>


Dynamic Rules Creation For Flink CEP

2019-02-11 Thread Titus Rakkesh
Dears,
  We are having the use case of creating and uploading new patterns
dynamically and change the app behavior in runtime. Is the current Flink
version support that?
Thanks


Re: Dynamic Rules Creation For Flink CEP

2019-02-11 Thread Dian Fu
This feature has not been supported yet. There is a 
JIRA(https://issues.apache.org/jira/browse/FLINK-7129) for this feature.

> 在 2019年2月12日,上午9:32,Titus Rakkesh  写道:
> 
> Dears,
>   We are having the use case of creating and uploading new patterns 
> dynamically and change the app behavior in runtime. Is the current Flink 
> version support that?
> Thanks



Re: Dynamic Rules Creation For Flink CEP

2019-02-11 Thread dhanuka ranasinghe
I have written a blogpost though its not used CEP but I hope you can apply
same thing for CEP as well.

http://dhanuka84.blogspot.com/2019/02/rule-execution-as-streaming-process.html?m=1




On Tue, 12 Feb 2019, 11:46 Dian Fu  This feature has not been supported yet. There is a JIRA(
> https://issues.apache.org/jira/browse/FLINK-7129) for this feature.
>
> > 在 2019年2月12日,上午9:32,Titus Rakkesh  写道:
> >
> > Dears,
> >   We are having the use case of creating and uploading new patterns
> dynamically and change the app behavior in runtime. Is the current Flink
> version support that?
> > Thanks
>
>


Frequent Heartbeat timeout

2019-02-11 Thread sohimankotia
Hi,

I am using Flink 1.5.5 . I have streaming job with 25 * 6 (150) parallelism
. I am facing too frequent heartbeat timeout . Even during off peak hours to
rule out memory issues .

Also I enabled debug logs for flink and observed Heartbeat request is
getting triggered every 5 seconds. 
*
Flink-conf.yml*

akka.ask.timeout120s
akka.client.timeout 900s
akka.framesize  256477376b
akka.lookup.timeout 60s
akka.tcp.timeout900s
akka.watch.heartbeat.interval   120s
akka.watch.heartbeat.pause  900s


If that is that this could false alerts ?


Logs :

2019-02-12 09:40:20,279 DEBUG org.apache.flink.yarn.YarnResourceManager 
   
- Received new slot report from TaskManager
container_e187_1544779926156_149366_01_25.
2019-02-12 09:40:20,279 DEBUG
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received
slot report from instance cee153db98a847c6ee779c31d60fd990.
*2019-02-12 09:40:24,951 DEBUG org.apache.flink.runtime.jobmaster.JobMaster 

- Trigger heartbeat request.*
2019-02-12 09:40:24,955 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_06.
2019-02-12 09:40:24,956 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_04.
2019-02-12 09:40:24,956 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_26.
2019-02-12 09:40:24,958 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_20.
2019-02-12 09:40:24,959 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_10.
2019-02-12 09:40:24,960 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_09.
2019-02-12 09:40:24,962 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_25.
2019-02-12 09:40:24,963 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_14.
2019-02-12 09:40:24,964 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_11.
2019-02-12 09:40:24,966 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_12.
2019-02-12 09:40:24,967 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_18.
2019-02-12 09:40:24,969 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_05.
2019-02-12 09:40:24,970 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_24.
2019-02-12 09:40:24,971 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_22.
2019-02-12 09:40:24,975 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_21.
2019-02-12 09:40:24,977 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_08.
2019-02-12 09:40:24,978 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_15.
2019-02-12 09:40:24,981 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_16.
2019-02-12 09:40:24,983 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_03.
2019-02-12 09:40:24,984 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_13.
2019-02-12 09:40:24,985 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_19.
2019-02-12 09:40:24,986 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_07.
2019-02-12 09:40:24,987 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_02.
2019-02-12 09:40:24,989 DEBUG org.apach

Limit in batch flink sql job

2019-02-11 Thread yinhua.dai
Why flink said "Limiting the result without sorting is not allowed as it
could lead to arbitrary results" when I use limit in batch mode?

SELECT * FROM csvSource limit 10;



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/