Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197719642 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.runtime.state.internal.InternalListState; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** Test suite for {@link TtlListState}. */ +public class TtlListStateTest extends TtlStateTestBase<TtlListState<?, ?, Integer>, List<Integer>, Iterable<Integer>> { + @Override + TtlListState<?, ?, Integer> createState() { + return new TtlListState<>(new MockInternalListState<>(), ttlConfig, timeProvider, null); + } + + @Override + void initTestValues() { + updater = v -> ttlState.addAll(v); + getter = () -> StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList()); + originalGetter = () -> ttlState.original.get(); + + emptyValue = Collections.emptyList(); + + updateValue1 = Arrays.asList(5, 7, 10); + updateValue2 = Arrays.asList(8, 9, 11); + + getValue1 = updateValue1; + getValue2 = updateValue2; + } + + private static class MockInternalListState<K, N, T> + extends MockInternalKvState<K, N, List<T>> + implements InternalListState<K, N, T> { + + MockInternalListState() { + value = new ArrayList<>(); + } + + @Override + public void update(List<T> elements) { + updateInternal(elements); + } + + @Override + public void addAll(List<T> elements) { + value.addAll(elements); + } + + @Override + public void mergeNamespaces(N target, Collection<N> sources) { --- End diff -- Wouldn't it make sense to check that merging namespaces also works correctly with the TTL state and to fix validate the contract what has to happen to the timestamps in this case?
---