JobManager web interface redirect strategy when running in HA

2017-10-31 Thread mrooding
Hi

We're running 3 job managers in high availability cluster mode backed by
OpenStack/Openshift. We're currently exposing all 3 job managers using 3
different routes (flink-1.domain.tld, flink-2.domain.tld,
flink-3.domain.tld). When accessing the route for a job manager which isn't
the leader it automatically redirects the user to the host and port of the
leading job manager. From what I've seen in the source code the rpc address
and port are being used to redirect. Since the internal hostnames are not
accessible outside the cluster this obviously doesn't work.

The nicest solution would be a single route (flink.domain.tld) which would
correctly delegate requests to the leading job manager. The second best
solution would probably be the possibility to declare a public URL in the
flink configuration file.

I'd be more than happy to contribute to Flink and add support for this but
I'd love to hear your ideas about it.

Kind regards

Marc




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: JobManager web interface redirect strategy when running in HA

2017-11-06 Thread mrooding
Chesnay, your solution is definitely the best approach. I was already
wondering why the decision was made to only support the UI through the
leading job manager only.

Jürgen, I don't think that your solution will work in our setup. We're
currently running 3 services, one for each job manager. We need a service
per job manager because they obviously need to be able to talk to each
other. In the latest version of OpenShift you can use a StatefulSet to
handle these situations but unfortunately, StatefulSets seem to rely on each
node receiving its own persistent volume claim whereas Flink seems to share
1 persistent volume claim for all nodes.

I've been going through the Kubernetes documentation about Load Balancers
but I'm unable to find a solution which handles both cases:
- each node being available through a cluster name (e.g.
flink-jobmanager-1.env.svc.cluster.local)
- exposing 1 URL which uses the load balancing solution proposed by you

Worst case is that we would have to wait for Flink 1.5 and keep using 3
distinct URLs. It's not ideal but there are also bigger fish to tackle.

Marc



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Savepoints and migrating value state data types

2017-11-06 Thread mrooding
Hi Gordon

I've been looking into creating a custom AvroSerializer without Kryo which
would support Avro schemas and I'm starting to wonder if this is actually
the most straightforward way to do it. 

If I extend a class from TypeSerializer I would also need to implement a
TypeInformation class to be able to provide my serializer. Implementing all
these classes seems to be quite the ordeal without proper documentation. Are
you sure that this is the right way forward and that there's no other option
of using Avro serialization with schema support for Flink?

Thanks again

Marc





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Savepoints and migrating value state data types

2017-09-20 Thread mrooding
Hi

We've got a situation where we're merging several Kafka streams and for
certain streams, we want to retain up to 6 days of history. We're trying to
figure out how we can migrate savepoint data between application updates
when the data type for a certain state buffer updates.

Let's assume that we have 2 streams with the following data types:

case class A(id: String, name: String)
case class B1(id: String, price: Double)

We have a CoProcessFunction which combines the 2 streams and maintains 2
different buffer states:

MapState[String, A] and ValueState[B1]

In our scenario, we're trying to anticipate the data type of B1 changing in
the future. Let's assume that in the foreseeable future, B1 will change to:

case class B2(id: String, price: Double, date: String)

When we create a snapshot using B1 and then upgrading the application to B2
the obvious attempt would be to try and retrieve the stored ValueState and
the new ValueState:

val oldState = getRuntimeContext.getState(new
ValueStateDescriptor[B1]("1Buffer", createTypeInformation[B1]))
val newState = getRuntimeContext.getState(new
ValueStateDescriptor[B2]("2Buffer", createTypeInformation[B2]))

However, as soon as you do the following error occurs:

Unable to restore keyed state [aBuffer]. For memory-backed keyed state, the
previous serializer of the keyed state must be present; the serializer could
have been removed from the classpath, or its implementation have changed and
could not be loaded. This is a temporary restriction that will be fixed in
future versions.

Our assumption is that the process operator which has a specified ID which
Flink uses to save and restore savepoints. The CoProcessorFunction types
changed from CoProcessFunction[A, B1, A] to CoProcessFunction[A, B2, A] and
therefore the savepoint data does not apply to the operator anymore. Is this
assumption correct?

We've been going through the documentation and source code of Flink and it
seems like there's no achieve this kind of migrations. If this is the case,
we'd be interested in contributing to Flink to get this added a.s.a.p. and
would love to get some feedback on how to approach this.

Thanks in advance

Marc



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Savepoints and migrating value state data types

2017-10-05 Thread mrooding
Gordon

Thanks for the detailed response. I have verified your assumption and that
is, unfortunately, the case.

I also looked into creating a custom Kryo serializer but I got stuck on
serializing arrays of complex types. It seems like this isn't trivial at all
with Kryo.

As an alternative, I've been looking into using Avro only for the Flink
buffers. Basically, as a first step, we'd still be sending JSON messages
through Kafka but we would use a custom TypeSerializer that converts the
case classes to bytes using Avro and vice versa. Unfortunately,
documentation is really scarce. 

In a different topic,
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Serialization-performance-td12019.html,
it says that Avro is a bit of an odd citizen and that the AvroSerializer
provided by Flink uses Kryo. This confirms what I've found by going through
the source code of Flink myself. 

I hope that you can provide me with some pointers. Is extending
TypeSerializer[T] the best way forward if we only want to use Avro for state
buffers and thus utilize Avro's schema migration facilities?

Any pointers would be greatly appreciated!

Kind regards

Marc



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/