[ https://issues.apache.org/jira/browse/FLINK-3786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15262348#comment-15262348 ]
ASF GitHub Bot commented on FLINK-3786: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1928#discussion_r61449670 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +@Internal +public final class BigDecSerializer extends TypeSerializerSingleton<BigDecimal> { + + private static final long serialVersionUID = 1L; + + public static final BigDecSerializer INSTANCE = new BigDecSerializer(); + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public BigDecimal createInstance() { + return BigDecimal.ZERO; + } + + @Override + public BigDecimal copy(BigDecimal from) { + return from; + } + + @Override + public BigDecimal copy(BigDecimal from, BigDecimal reuse) { + return from; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(BigDecimal record, DataOutputView target) throws IOException { + // null value support + if (record == null) { + BigIntSerializer.writeBigInteger(null, target); + return; + } + // fast paths for 0, 1, 10 + else if (record == BigDecimal.ZERO) { + BigIntSerializer.writeBigInteger(BigInteger.ZERO, target); + target.writeInt(0); + return; + } + else if (record == BigDecimal.ONE) { + BigIntSerializer.writeBigInteger(BigInteger.ONE, target); + target.writeInt(0); + return; + } + else if (record == BigDecimal.TEN) { + BigIntSerializer.writeBigInteger(BigInteger.TEN, target); + target.writeInt(0); + return; + } + // default + BigIntSerializer.writeBigInteger(record.unscaledValue(), target); + target.writeInt(record.scale()); + } + + @Override + public BigDecimal deserialize(DataInputView source) throws IOException { + return readBigDecimal(source); + } + + @Override + public BigDecimal deserialize(BigDecimal reuse, DataInputView source) throws IOException { + return readBigDecimal(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + final boolean isNull = BigIntSerializer.copyBigInteger(source, target); + if (!isNull) { + final int scale = source.readInt(); + target.writeInt(scale); + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof BigDecSerializer; + } + + // -------------------------------------------------------------------------------------------- + // Static Helpers for BigInteger Serialization + // -------------------------------------------------------------------------------------------- + + public static BigDecimal readBigDecimal(DataInputView source) throws IOException { + final BigInteger unscaledValue = BigIntSerializer.readBigInteger(source); + if (unscaledValue == null) { + return null; + } + final int scale = source.readInt(); + // fast-path for 0, 1, 10 + if (scale == 0) { + if (unscaledValue == BigInteger.ZERO) { --- End diff -- Check with equals > Add BigDecimal and BigInteger as Basic types > -------------------------------------------- > > Key: FLINK-3786 > URL: https://issues.apache.org/jira/browse/FLINK-3786 > Project: Flink > Issue Type: New Feature > Components: Core > Reporter: Timo Walther > Assignee: Timo Walther > > We already had the discussion on the mailing list some months ago about > adding BigDecimal and BigInteger as basic types. > Especially for business or scientific applications it > makes sense to support the BigInteger and BigDecimal types natively. In > my opinion they are as important as Date or Void and should be added as > BasicTypes. The Table API would also benefit from it. > http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%3c564cad71.8070...@apache.org%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)