[ 
https://issues.apache.org/jira/browse/FLINK-10157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620319#comment-16620319
 ] 

ASF GitHub Bot commented on FLINK-10157:
----------------------------------------

StefanRRichter commented on a change in pull request #6707: [FLINK-10157] 
[State TTL] Allow `null` user values in map state with TTL
URL: https://github.com/apache/flink/pull/6707#discussion_r218486113
 
 

 ##########
 File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
 ##########
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Serializer wrapper to add support of null value serialization.
+ *
+ * <p>If the target serializer does not support null values of its type,
+ * you can use this class to wrap this serializer.
+ * This is a generic treatment of null value serialization
+ * which comes with the cost of additional byte in the final serialized value.
+ * The {@code NullableSerializer} will intercept null value serialization case
+ * and prepend the target serialized value with a boolean flag marking whether 
it is null or not.
+ * <pre> {@code
+ * TypeSerializer<T> originalSerializer = ...;
+ * TypeSerializer<T> serializerWithNullValueSupport = 
NullableSerializer.wrap(originalSerializer);
+ * // or
+ * TypeSerializer<T> serializerWithNullValueSupport = 
NullableSerializer.wrapIfNullIsNotSupported(originalSerializer);
+ * }}</pre>
+ *
+ * @param <T> type to serialize
+ */
+public class NullableSerializer<T> extends TypeSerializer<T> {
+       private static final long serialVersionUID = 3335569358214720033L;
+
+       private final TypeSerializer<T> originalSerializer;
+
+       private NullableSerializer(TypeSerializer<T> originalSerializer) {
+               Preconditions.checkNotNull(originalSerializer, "The original 
serializer cannot be null");
+               this.originalSerializer = originalSerializer;
+       }
+
+       /**
+        * This method tries to serialize null value with the {@code 
originalSerializer}
+        * and wraps it in case of {@link NullPointerException}, otherwise it 
returns the {@code originalSerializer}.
+        */
+       public static <T> TypeSerializer<T> 
wrapIfNullIsNotSupported(TypeSerializer<T> originalSerializer) {
+               return checkIfNullSupported(originalSerializer) ? 
originalSerializer : wrap(originalSerializer);
+       }
+
+       private static <T> boolean checkIfNullSupported(TypeSerializer<T> 
originalSerializer) {
+               try {
+                       originalSerializer.serialize(null, new 
DataOutputSerializer(1));
+                       
Preconditions.checkArgument(originalSerializer.copy(null) == null);
+               } catch (NullPointerException | IOException e) {
+                       return false;
+               }
+               return true;
+       }
+
+       /** This method wraps the {@code originalSerializer} with the {@code 
NullableSerializer} if not already wrapped. */
+       public static <T> TypeSerializer<T> wrap(TypeSerializer<T> 
originalSerializer) {
+               return originalSerializer instanceof NullableSerializer ?
+                       originalSerializer : new 
NullableSerializer<>(originalSerializer);
+       }
+
+       @Override
+       public boolean isImmutableType() {
+               return originalSerializer.isImmutableType();
+       }
+
+       @Override
+       public TypeSerializer<T> duplicate() {
+               return new NullableSerializer<>(originalSerializer.duplicate());
+       }
+
+       @Override
+       public T createInstance() {
+               return originalSerializer.createInstance();
+       }
+
+       @Override
+       public T copy(T from) {
+               return from == null ? null : originalSerializer.copy(from);
+       }
+
+       @Override
+       public T copy(T from, T reuse) {
+               return from == null ? null :
+                       (reuse == null ? originalSerializer.copy(from) : 
originalSerializer.copy(from, reuse));
+       }
+
+       @Override
+       public int getLength() {
+               int len = originalSerializer.getLength();
+               return len < 0 ? len : len + 1;
+       }
+
+       @Override
+       public void serialize(T record, DataOutputView target) throws 
IOException {
+               if (record == null) {
+                       target.writeBoolean(true);
+               } else {
+                       target.writeBoolean(false);
+                       originalSerializer.serialize(record, target);
+               }
+       }
+
+       @Override
+       public T deserialize(DataInputView source) throws IOException {
+               boolean isNull = source.readBoolean();
+               return isNull ? null : originalSerializer.deserialize(source);
+       }
+
+       @Override
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               boolean isNull = source.readBoolean();
+               return isNull ? null : (reuse == null ?
+                       originalSerializer.deserialize(source) : 
originalSerializer.deserialize(reuse, source));
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               boolean isNull = source.readBoolean();
+               target.writeBoolean(isNull);
+               if (!isNull) {
+                       originalSerializer.copy(source, target);
+               }
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               return obj == this ||
+                       (obj != null && obj.getClass() == getClass() &&
+                               originalSerializer.equals(((NullableSerializer) 
originalSerializer).originalSerializer));
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return (obj != null && obj.getClass() == getClass());
 
 Review comment:
   ...and also check if the originalSerializer `canEqual`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow `null` user values in map state with TTL
> ----------------------------------------------
>
>                 Key: FLINK-10157
>                 URL: https://issues.apache.org/jira/browse/FLINK-10157
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>         Environment: Flink:1.6.0
> Scala:2.11
> JDK:1.8
>            Reporter: chengjie.wu
>            Assignee: Andrey Zagrebin
>            Priority: Minor
>              Labels: pull-request-available
>         Attachments: StateWithOutTtlTest.scala, StateWithTtlTest.scala
>
>
> Thanks for the StateTtl feature,this is exactly what I need now! But I found 
> an issue.
> In the previous version or when StateTtl is not enabled,MapState allows 
> `null` value,that means after
> {code:java}
> mapState.put("key", null){code}
> , then
> {code:java}
> mapState.contains("key"){code}
> will return {color:#ff0000}*true*{color}, but when StateTtl is enabled,
> {code:java}
> mapState.contains("key"){code}
> will return {color:#ff0000}*false*{color}(*the key has not expired*).
>  So I think the field `userValue` in 
> `org.apache.flink.runtime.state.ttl.TtlValue` should allow `null` value. User 
> state is null may not means the TtlValue should be null.
>  
> {code:java}
> /**
>  * This class wraps user value of state with TTL.
>  *
>  * @param <T> Type of the user value of state with TTL
>  */
> class TtlValue<T> implements Serializable {
>  private final T userValue;
>  private final long lastAccessTimestamp;
> TtlValue(T userValue, long lastAccessTimestamp) {
>  Preconditions.checkNotNull(userValue);
>  this.userValue = userValue;
>  this.lastAccessTimestamp = lastAccessTimestamp;
>  }
> T getUserValue() {
>  return userValue;
>  }
> long getLastAccessTimestamp() {
>  return lastAccessTimestamp;
>  }
> }
> {code}
> Am I understanding right?
> This is my test class.
> [^StateWithTtlTest.scala] [^StateWithOutTtlTest.scala]
> ^Thanks!:)^



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to