JobManager web interface redirect strategy when running in HA
Hi We're running 3 job managers in high availability cluster mode backed by OpenStack/Openshift. We're currently exposing all 3 job managers using 3 different routes (flink-1.domain.tld, flink-2.domain.tld, flink-3.domain.tld). When accessing the route for a job manager which isn't the leader it automatically redirects the user to the host and port of the leading job manager. From what I've seen in the source code the rpc address and port are being used to redirect. Since the internal hostnames are not accessible outside the cluster this obviously doesn't work. The nicest solution would be a single route (flink.domain.tld) which would correctly delegate requests to the leading job manager. The second best solution would probably be the possibility to declare a public URL in the flink configuration file. I'd be more than happy to contribute to Flink and add support for this but I'd love to hear your ideas about it. Kind regards Marc -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: JobManager web interface redirect strategy when running in HA
Chesnay, your solution is definitely the best approach. I was already wondering why the decision was made to only support the UI through the leading job manager only. Jürgen, I don't think that your solution will work in our setup. We're currently running 3 services, one for each job manager. We need a service per job manager because they obviously need to be able to talk to each other. In the latest version of OpenShift you can use a StatefulSet to handle these situations but unfortunately, StatefulSets seem to rely on each node receiving its own persistent volume claim whereas Flink seems to share 1 persistent volume claim for all nodes. I've been going through the Kubernetes documentation about Load Balancers but I'm unable to find a solution which handles both cases: - each node being available through a cluster name (e.g. flink-jobmanager-1.env.svc.cluster.local) - exposing 1 URL which uses the load balancing solution proposed by you Worst case is that we would have to wait for Flink 1.5 and keep using 3 distinct URLs. It's not ideal but there are also bigger fish to tackle. Marc -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Savepoints and migrating value state data types
Hi Gordon I've been looking into creating a custom AvroSerializer without Kryo which would support Avro schemas and I'm starting to wonder if this is actually the most straightforward way to do it. If I extend a class from TypeSerializer I would also need to implement a TypeInformation class to be able to provide my serializer. Implementing all these classes seems to be quite the ordeal without proper documentation. Are you sure that this is the right way forward and that there's no other option of using Avro serialization with schema support for Flink? Thanks again Marc -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Savepoints and migrating value state data types
Hi We've got a situation where we're merging several Kafka streams and for certain streams, we want to retain up to 6 days of history. We're trying to figure out how we can migrate savepoint data between application updates when the data type for a certain state buffer updates. Let's assume that we have 2 streams with the following data types: case class A(id: String, name: String) case class B1(id: String, price: Double) We have a CoProcessFunction which combines the 2 streams and maintains 2 different buffer states: MapState[String, A] and ValueState[B1] In our scenario, we're trying to anticipate the data type of B1 changing in the future. Let's assume that in the foreseeable future, B1 will change to: case class B2(id: String, price: Double, date: String) When we create a snapshot using B1 and then upgrading the application to B2 the obvious attempt would be to try and retrieve the stored ValueState and the new ValueState: val oldState = getRuntimeContext.getState(new ValueStateDescriptor[B1]("1Buffer", createTypeInformation[B1])) val newState = getRuntimeContext.getState(new ValueStateDescriptor[B2]("2Buffer", createTypeInformation[B2])) However, as soon as you do the following error occurs: Unable to restore keyed state [aBuffer]. For memory-backed keyed state, the previous serializer of the keyed state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions. Our assumption is that the process operator which has a specified ID which Flink uses to save and restore savepoints. The CoProcessorFunction types changed from CoProcessFunction[A, B1, A] to CoProcessFunction[A, B2, A] and therefore the savepoint data does not apply to the operator anymore. Is this assumption correct? We've been going through the documentation and source code of Flink and it seems like there's no achieve this kind of migrations. If this is the case, we'd be interested in contributing to Flink to get this added a.s.a.p. and would love to get some feedback on how to approach this. Thanks in advance Marc -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Savepoints and migrating value state data types
Gordon Thanks for the detailed response. I have verified your assumption and that is, unfortunately, the case. I also looked into creating a custom Kryo serializer but I got stuck on serializing arrays of complex types. It seems like this isn't trivial at all with Kryo. As an alternative, I've been looking into using Avro only for the Flink buffers. Basically, as a first step, we'd still be sending JSON messages through Kafka but we would use a custom TypeSerializer that converts the case classes to bytes using Avro and vice versa. Unfortunately, documentation is really scarce. In a different topic, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Serialization-performance-td12019.html, it says that Avro is a bit of an odd citizen and that the AvroSerializer provided by Flink uses Kryo. This confirms what I've found by going through the source code of Flink myself. I hope that you can provide me with some pointers. Is extending TypeSerializer[T] the best way forward if we only want to use Avro for state buffers and thus utilize Avro's schema migration facilities? Any pointers would be greatly appreciated! Kind regards Marc -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/