Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5679#discussion_r173742094 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java --- @@ -143,10 +152,66 @@ public int hashCode() { } /** - * Processing mode. + * JSON serializer for {@link ProcessingMode}. */ + @JsonSerialize(using = ProcessingModeSerializer.class) + @JsonDeserialize(using = ProcessingModeDeserializer.class) public enum ProcessingMode { - AT_LEAST_ONCE, - EXACTLY_ONCE + AT_LEAST_ONCE("at_least_once"), + EXACTLY_ONCE("exactly_once"); + + private String value; + + ProcessingMode(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public static ProcessingMode fromString(String value) { + for (ProcessingMode mode : ProcessingMode.values()) { + if (mode.value.equalsIgnoreCase(value)) { + return mode; + } + } + + throw new IllegalArgumentException("No constant with value " + value + " found"); + } + + } + + /** + * JSON deserializer for {@link ProcessingMode}. + */ + public static class ProcessingModeSerializer extends StdSerializer<ProcessingMode> { + + public ProcessingModeSerializer() { + super(ProcessingMode.class); + } + + @Override + public void serialize(ProcessingMode mode, JsonGenerator generator, SerializerProvider serializerProvider) + throws IOException { + generator.writeString(mode.getValue()); + } } + + /** + * Processing mode deserializer. + */ + public static class ProcessingModeDeserializer extends StdDeserializer<ProcessingMode> { + + public ProcessingModeDeserializer() { + super(ProcessingMode.class); + } + + @Override + public ProcessingMode deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException { + return ProcessingMode.fromString(jsonParser.getValueAsString()); --- End diff -- replace with `ProcessingMode.valueOf(jsonParser.getValueAsString.toUpperCase())`
---