[
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15476158#comment-15476158
]
ASF GitHub Bot commented on FLINK-3599:
---------------------------------------
Github user ggevay commented on a diff in the pull request:
https://github.com/apache/flink/pull/2211#discussion_r78136985
--- Diff:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java
---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+import static
org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
+
+public final class PojoSerializerGenerator<T> {
+ private static final String packageName =
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+ private final Class<T> clazz;
+ private final Field[] refFields;
+ private final TypeSerializer<?>[] fieldSerializers;
+ private final ExecutionConfig config;
+ private String code;
+
+ public PojoSerializerGenerator(
+ Class<T> clazz,
+ TypeSerializer<?>[] fields,
+ Field[] reflectiveFields,
+ ExecutionConfig config) {
+ this.clazz = checkNotNull(clazz);
+ this.refFields = checkNotNull(reflectiveFields);
+ this.fieldSerializers = checkNotNull(fields);
+ this.config = checkNotNull(config);
+ for (int i = 0; i < this.refFields.length; i++) {
+ this.refFields[i].setAccessible(true);
+ }
+ }
+
+ public TypeSerializer<T> createSerializer() {
+ final String className = clazz.getCanonicalName().replace('.',
'_') + "_GeneratedSerializer";
+ final String fullClassName = packageName + "." + className;
+ Class<?> serializerClazz;
+ code = InstantiationUtil.getCodeForCachedClass(fullClassName);
+ if (code == null) {
+ generateCode(className);
+ }
+ if(config.isWrapGeneratedClassesEnabled()) {
+ return new GenTypeSerializerProxy<>(clazz,
fullClassName, code, fieldSerializers, config);
+ }
+ try {
+ serializerClazz =
InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Unable to generate
serializer: " + className, e);
+ }
+ Constructor<?>[] ctors = serializerClazz.getConstructors();
+ assert ctors.length == 1;
+ try {
+ return (TypeSerializer<T>) ctors[0].newInstance(new
Object[]{clazz, fieldSerializers, config});
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to instantiate
serializer: " + className, e);
+ }
+
+ }
+
+ private void generateCode(String className) {
+ assert fieldSerializers.length > 0;
+ String typeName = clazz.getCanonicalName();
+ StringBuilder members = new StringBuilder();
+ for (int i = 0; i < fieldSerializers.length; ++i) {
+ members.append(String.format("final TypeSerializer
f%d;\n", i));
+ }
+ StringBuilder initMembers = new StringBuilder();
+ for (int i = 0; i < fieldSerializers.length; ++i) {
+ initMembers.append(String.format("f%d =
serializerFields[%d];\n", i, i));
+ }
+ StringBuilder createFields = new StringBuilder();
+ for (int i = 0; i < fieldSerializers.length; ++i) {
+ createFields.append(String.format("((" + typeName +
")t)." + modifyStringForField(refFields[i],
+ "f%d.createInstance()") + ";\n", i));
+ }
+ StringBuilder copyFields = new StringBuilder();
+ copyFields.append("Object value;\n");
+ for (int i = 0; i < fieldSerializers.length; ++i) {
+ if (refFields[i].getType().isPrimitive()) {
+ copyFields.append(String.format("((" + typeName
+ ")target)." + modifyStringForField(refFields[i],
+ "f%d.copy(((" + typeName + ")from)." +
accessStringForField(refFields[i])) + ");\n", i));
+ } else {
+ copyFields.append(String.format(
+ "value = ((" + typeName + ")from)." +
accessStringForField(refFields[i]) + ";\n" +
+ "if (value != null) {\n" +
+ " ((" + typeName + ")target)." +
modifyStringForField(refFields[i], "f%d.copy(value)") + ";\n" +
+ "} else {\n" +
+ " ((" + typeName + ")target)." +
modifyStringForField(refFields[i], "null") + ";\n" +
+ "}\n", i));
+ }
+ }
+ StringBuilder reuseCopyFields = new StringBuilder();
+ reuseCopyFields.append("Object value;\n");
+ reuseCopyFields.append("Object reuseValue;\n");
+ reuseCopyFields.append("Object copy;\n");
+ for (int i = 0; i < fieldSerializers.length; ++i) {
+ if (refFields[i].getType().isPrimitive()) {
+ reuseCopyFields.append(String.format("((" +
typeName + ")reuse)." + modifyStringForField(refFields[i],
+ "f%d.copy(((" + typeName + ")from)." +
accessStringForField(refFields[i]) + ")") + ";\n", i));
+ } else {
+ reuseCopyFields.append(String.format(
+ "value = ((" + typeName + ")from)." +
accessStringForField(refFields[i]) + ";\n" +
+ "if (value != null) {\n" +
+ " reuseValue = ((" + typeName +
")reuse)." + accessStringForField(refFields[i]) + ";\n" +
+ " if (reuseValue != null) {\n" +
--- End diff --
Ah yes, sorry!
> GSoC: Code Generation in Serializers
> ------------------------------------
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
> Issue Type: Improvement
> Components: Type Serialization System
> Reporter: Márton Balassi
> Assignee: Gabor Horvath
> Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is
> slow [2].
> For the complete proposal see [3].
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3]
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)