Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6062#discussion_r191440434
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Implementation of {@link InternalTimer} for the {@link
InternalTimerHeap}.
+ *
+ * @param <K> Type of the keys to which timers are scoped.
+ * @param <N> Type of the namespace to which timers are scoped.
+ */
+@Internal
+public final class TimerHeapInternalTimer<K, N> implements
InternalTimer<K, N> {
+
+ /** The index that indicates that a tracked internal timer is not
tracked. */
+ private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX =
Integer.MIN_VALUE;
+
+ private final long timestamp;
+
+ private final K key;
+
+ private final N namespace;
+
+ /**
+ * This field holds the current physical index of this timer when it is
managed by a timer heap so that we can
+ * support fast deletes.
+ */
+ private transient int timerHeapIndex;
+
+ TimerHeapInternalTimer(long timestamp, K key, N namespace) {
+ this.timestamp = timestamp;
+ this.key = key;
+ this.namespace = namespace;
+ this.timerHeapIndex = NOT_MANAGED_BY_TIMER_QUEUE_INDEX;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public N getNamespace() {
+ return namespace;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o instanceof InternalTimer) {
+ InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o;
+ return timestamp == timer.getTimestamp()
+ && key.equals(timer.getKey())
+ && namespace.equals(timer.getNamespace());
+ }
+
+ return false;
+ }
+
+ /**
+ * Returns the current index of this timer in the owning timer heap.
+ */
+ int getTimerHeapIndex() {
+ return timerHeapIndex;
+ }
+
+ /**
+ * Sets the current index of this timer in the owning timer heap and
should only be called by the managing heap.
+ * @param timerHeapIndex the new index in the timer heap.
+ */
+ void setTimerHeapIndex(int timerHeapIndex) {
+ this.timerHeapIndex = timerHeapIndex;
+ }
+
+ /**
+ * This method can be called to indicate that the timer is no longer
managed be a timer heap, e.g. because it as
+ * removed.
+ */
+ void removedFromTimerQueue() {
+ setTimerHeapIndex(NOT_MANAGED_BY_TIMER_QUEUE_INDEX);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (timestamp ^ (timestamp >>> 32));
+ result = 31 * result + key.hashCode();
+ result = 31 * result + namespace.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "Timer{" +
+ "timestamp=" + timestamp +
+ ", key=" + key +
+ ", namespace=" + namespace +
+ '}';
+ }
+
+ /**
+ * A {@link TypeSerializer} used to serialize/deserialize a {@link
TimerHeapInternalTimer}.
+ */
+ public static class TimerSerializer<K, N> extends
TypeSerializer<InternalTimer<K, N>> {
+
+ private static final long serialVersionUID =
1119562170939152304L;
+
+ private final TypeSerializer<K> keySerializer;
+
+ private final TypeSerializer<N> namespaceSerializer;
--- End diff --
Can these serializers be `null`? If not, then let's mark them as `@Nonnull`
---