[ https://issues.apache.org/jira/browse/HIVE-25596?focusedWorklogId=675609&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-675609 ]
ASF GitHub Bot logged work on HIVE-25596: ----------------------------------------- Author: ASF GitHub Bot Created on: 04/Nov/21 01:07 Start Date: 04/Nov/21 01:07 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #2724: URL: https://github.com/apache/hive/pull/2724#discussion_r741567981 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java ########## @@ -58,14 +59,15 @@ public void setMetricsMBean(ObjectName metricsMBean) { public ReplicationMetricCollector(String dbName, Metadata.ReplicationType replicationType, String stagingDir, long dumpExecutionId, HiveConf conf) { + this.conf = conf; checkEnabledForTests(conf); String policy = conf.get(Constants.SCHEDULED_QUERY_SCHEDULENAME); long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); if (!StringUtils.isEmpty(policy) && executionId > 0) { isEnabled = true; metricCollector = MetricCollector.getInstance().init(conf); MetricSink.getInstance().init(conf); - Metadata metadata = new Metadata(dbName, replicationType, stagingDir); + Metadata metadata = new Metadata(dbName, replicationType, testingModeEnabled() ? "dummyDir" :stagingDir); Review comment: nit: shift this staging logic calculation to a separate method ########## File path: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDeserialize.java ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; + +/** + * GenericUDFDeserializeString. + * + */ +@Description(name = "deserialize", + value="_FUNC_(message, encodingFormat) - Returns deserialized string of encoded message.", + extended="Example:\n" + + " > SELECT _FUNC_('H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA', 'gzip(json-2.0)') FROM src LIMIT 1;\n" Review comment: The base64 encoding is missed out here in description i.e even though the passed parameter is "gzip(json-2.0)", the fact is that it won't work unless the passed content is base64 encoded. Moreover, json part in "gzip(json-2.0)" at a UDF level would be confusing. Meaning even if underlying string is non json one, the UDF will work just fine. user mandated to pass json even if it is not doesn't go well. for a UDF level just gzip would have been fine? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDeserialize.java ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; + +/** + * GenericUDFDeserializeString. + * + */ +@Description(name = "deserialize", + value="_FUNC_(message, encodingFormat) - Returns deserialized string of encoded message.", + extended="Example:\n" + + " > SELECT _FUNC_('H4sIAAAAAAAA/ytJLS4BAAx+f9gEAAAA', 'gzip(json-2.0)') FROM src LIMIT 1;\n" + + " test") +public class GenericUDFDeserialize extends GenericUDF { + + private static final int ARG_COUNT = 2; // Number of arguments to this UDF + private static final String FUNC_NAME = "deserialize"; // External Name + + private transient PrimitiveObjectInspector stringOI = null; + private transient PrimitiveObjectInspector encodingFormat = null; + + @Override + public ObjectInspector initialize(ObjectInspector[] arguments) + throws UDFArgumentException { + if (arguments.length != ARG_COUNT) { + throw new UDFArgumentException("The function " + FUNC_NAME + " accepts " + ARG_COUNT + " arguments."); + } + for (ObjectInspector arg: arguments) { + if (arg.getCategory() != ObjectInspector.Category.PRIMITIVE || + PrimitiveObjectInspectorUtils.PrimitiveGrouping.STRING_GROUP != PrimitiveObjectInspectorUtils.getPrimitiveGrouping( + ((PrimitiveObjectInspector)arg).getPrimitiveCategory())){ + throw new UDFArgumentTypeException(0, "The arguments to " + FUNC_NAME + " must be a string/varchar"); + } + } + stringOI = (PrimitiveObjectInspector) arguments[0]; + encodingFormat = (PrimitiveObjectInspector) arguments[1]; + return PrimitiveObjectInspectorFactory.javaStringObjectInspector; + } + + @Override + public Object evaluate(DeferredObject[] arguments) throws HiveException { + String value = PrimitiveObjectInspectorUtils.getString(arguments[0].get(), stringOI); + String messageFormat = PrimitiveObjectInspectorUtils.getString(arguments[1].get(), encodingFormat); + if (value == null) { + return null; + } else if (messageFormat == null || messageFormat.isEmpty() || JSONMessageEncoder.FORMAT.equalsIgnoreCase(value)) { + return value; + } else if (GzipJSONMessageEncoder.FORMAT.equalsIgnoreCase(messageFormat)) { + return GzipJSONMessageEncoder.getInstance().getDeserializer().deSerializeGenericString(value); + } else { + throw new HiveException("Invalid message format provided: " + messageFormat + " for message: " + value); Review comment: add test for this format not supported case ########## File path: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDeserialize.java ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; + +/** + * GenericUDFDeserializeString. + * + */ +@Description(name = "deserialize", + value="_FUNC_(message, encodingFormat) - Returns deserialized string of encoded message.", Review comment: Should it be compressionFormat ? ########## File path: standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql ########## @@ -51,6 +51,10 @@ CREATE TABLE "REPLICATION_METRICS" ( --Increase the size of RM_PROGRESS to accomodate the replication statistics ALTER TABLE "REPLICATION_METRICS" ALTER "RM_PROGRESS" TYPE varchar(24000); +ALTER TABLE "REPLICATION_METRICS" ALTER "RM_PROGRESS" TYPE varchar(10000); Review comment: Has this downsizing been tested? ########## File path: ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java ########## @@ -78,6 +83,12 @@ public void setup() throws Exception { MetricSink metricSinkSpy = Mockito.spy(MetricSink.getInstance()); Mockito.doReturn(1L).when(metricSinkSpy).getFrequencyInSecs(); metricSinkSpy.init(conf); + encoder = MessageFactory.getDefaultInstanceForReplMetrics(conf); + deserializer = encoder.getDeserializer(); Review comment: if deserializer has to be obtained from encoder anyway, why deserializer is an instance variable? ########## File path: ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java ########## @@ -65,6 +68,8 @@ @RunWith(MockitoJUnitRunner.class) public class TestReplicationMetricSink { + MessageEncoder encoder; + MessageDeserializer deserializer; Review comment: Can you add comment on why the above two are not encoder/decoder or serializer/deserializer. It may look confusing otherwise ########## File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java ########## @@ -330,4 +330,9 @@ public DeletePartitionColumnStatMessage getDeletePartitionColumnStatMessage(Stri throw new IllegalArgumentException("Could not construct UpdatePartitionColumnStatMessage", e); } } + + @Override + public String deSerializeGenericString(String messageBody) { + return messageBody; Review comment: Default impl could have been in base itself? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java ########## @@ -116,14 +117,17 @@ public void run() { int totalMetricsSize = metrics.size(); List<ReplicationMetrics> replicationMetricsList = new ArrayList<>(totalMetricsSize); ObjectMapper mapper = new ObjectMapper(); + MessageEncoder encoder = MessageFactory.getDefaultInstanceForReplMetrics(conf); + MessageSerializer serializer = encoder.getSerializer(); for (int index = 0; index < totalMetricsSize; index++) { ReplicationMetric metric = metrics.removeFirst(); ReplicationMetrics persistentMetric = new ReplicationMetrics(); persistentMetric.setDumpExecutionId(metric.getDumpExecutionId()); persistentMetric.setScheduledExecutionId(metric.getScheduledExecutionId()); persistentMetric.setPolicy(metric.getPolicy()); - persistentMetric.setProgress(mapper.writeValueAsString(metric.getProgress())); - persistentMetric.setMetadata(mapper.writeValueAsString(metric.getMetadata())); + persistentMetric.setProgress(serializer.serialize(mapper.writeValueAsString(metric.getProgress()))); + persistentMetric.setMetadata(serializer.serialize(mapper.writeValueAsString(metric.getMetadata()))); Review comment: a) What would be max size of metadata content in plain text format? and how much is it post compression? b) metadata also now getting stored in compressed format but there is no reduction in size of the column. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java ########## @@ -116,14 +117,17 @@ public void run() { int totalMetricsSize = metrics.size(); List<ReplicationMetrics> replicationMetricsList = new ArrayList<>(totalMetricsSize); ObjectMapper mapper = new ObjectMapper(); + MessageEncoder encoder = MessageFactory.getDefaultInstanceForReplMetrics(conf); + MessageSerializer serializer = encoder.getSerializer(); for (int index = 0; index < totalMetricsSize; index++) { ReplicationMetric metric = metrics.removeFirst(); ReplicationMetrics persistentMetric = new ReplicationMetrics(); persistentMetric.setDumpExecutionId(metric.getDumpExecutionId()); persistentMetric.setScheduledExecutionId(metric.getScheduledExecutionId()); persistentMetric.setPolicy(metric.getPolicy()); - persistentMetric.setProgress(mapper.writeValueAsString(metric.getProgress())); - persistentMetric.setMetadata(mapper.writeValueAsString(metric.getMetadata())); + persistentMetric.setProgress(serializer.serialize(mapper.writeValueAsString(metric.getProgress()))); + persistentMetric.setMetadata(serializer.serialize(mapper.writeValueAsString(metric.getMetadata()))); Review comment: How does this justify a need to compress the metadata filed in that case? I think we should focus on the size in worst case and then see change post compression. That way we can decide on: a) whether we really need compressetion for metadata column b) if so, how much should the column size be. ########## File path: ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFDeserialize.java ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.messaging.MessageEncoder; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.io.Text; +import org.junit.Test; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertEquals; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; + +/** + * TestGenericUDFGzipJsonDeserialize. + */ +public class TestGenericUDFDeserialize { + + @Test + public void testOneArg() throws HiveException { + GenericUDFDeserialize udf = new GenericUDFDeserialize(); + ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.writableStringObjectInspector; + ObjectInspector valueOI2 = PrimitiveObjectInspectorFactory.writableStringObjectInspector; + UDFArgumentException ex = null; + try { + udf.initialize(new ObjectInspector[]{valueOI1}); + } catch (UDFArgumentException e) { + ex = e; + } + assertNotNull("The function deserialize() accepts 2 argument.", ex); + ex = null; + try { + udf.initialize(new ObjectInspector[]{valueOI2, valueOI1}); + } catch (UDFArgumentException e) { + ex = e; + } + assertNull("The function deserialize() accepts 2 argument.", ex); + } + + @Test + public void testGZIPJsonDeserializeString() throws HiveException { + GenericUDFDeserialize udf = new GenericUDFDeserialize(); + udf.initialize(new ObjectInspector[]{PrimitiveObjectInspectorFactory.writableStringObjectInspector, + PrimitiveObjectInspectorFactory.writableStringObjectInspector}); + GenericUDF.DeferredObject[] args = new GenericUDF.DeferredObject[2]; + String expectedOutput = "test"; + MessageEncoder encoder = MessageFactory.getDefaultInstanceForReplMetrics(new HiveConf()); + String serializedMsg = encoder.getSerializer().serialize(expectedOutput); + args[0] = new GenericUDF.DeferredJavaObject(new Text(serializedMsg)); + args[1] = new GenericUDF.DeferredJavaObject(new Text(encoder.getMessageFormat())); + Object actualOutput = udf.evaluate(args).toString(); + assertEquals("deserialize() test", expectedOutput, actualOutput != null ? actualOutput : null); + } + + @Test + public void testInvalidMessageString() throws HiveException { + GenericUDFDeserialize udf = new GenericUDFDeserialize(); + udf.initialize(new ObjectInspector[]{PrimitiveObjectInspectorFactory.writableStringObjectInspector, + PrimitiveObjectInspectorFactory.writableStringObjectInspector}); + GenericUDF.DeferredObject[] args = new GenericUDF.DeferredObject[2]; + String expectedOutput = "test"; + MessageEncoder encoder = MessageFactory.getDefaultInstanceForReplMetrics(new HiveConf()); + String serializedMsg = encoder.getSerializer().serialize(expectedOutput); + args[0] = new GenericUDF.DeferredJavaObject(new Text(serializedMsg)); + args[1] = new GenericUDF.DeferredJavaObject(new Text("randomSerialization")); + HiveException ex = null; + try { + Object actualOutput = udf.evaluate(args).toString(); + } catch (HiveException e) { + ex = e; + } + assertNotNull("Invalid message format provided.", ex); Review comment: Add assertion to compare the exception message ########## File path: ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFDeserialize.java ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.messaging.MessageEncoder; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.io.Text; +import org.junit.Test; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertEquals; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; + +/** + * TestGenericUDFGzipJsonDeserialize. + */ +public class TestGenericUDFDeserialize { + + @Test + public void testOneArg() throws HiveException { + GenericUDFDeserialize udf = new GenericUDFDeserialize(); + ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.writableStringObjectInspector; + ObjectInspector valueOI2 = PrimitiveObjectInspectorFactory.writableStringObjectInspector; + UDFArgumentException ex = null; + try { + udf.initialize(new ObjectInspector[]{valueOI1}); + } catch (UDFArgumentException e) { + ex = e; + } + assertNotNull("The function deserialize() accepts 2 argument.", ex); + ex = null; + try { + udf.initialize(new ObjectInspector[]{valueOI2, valueOI1}); + } catch (UDFArgumentException e) { + ex = e; + } + assertNull("The function deserialize() accepts 2 argument.", ex); + } + + @Test + public void testGZIPJsonDeserializeString() throws HiveException { + GenericUDFDeserialize udf = new GenericUDFDeserialize(); + udf.initialize(new ObjectInspector[]{PrimitiveObjectInspectorFactory.writableStringObjectInspector, + PrimitiveObjectInspectorFactory.writableStringObjectInspector}); + GenericUDF.DeferredObject[] args = new GenericUDF.DeferredObject[2]; + String expectedOutput = "test"; + MessageEncoder encoder = MessageFactory.getDefaultInstanceForReplMetrics(new HiveConf()); + String serializedMsg = encoder.getSerializer().serialize(expectedOutput); + args[0] = new GenericUDF.DeferredJavaObject(new Text(serializedMsg)); + args[1] = new GenericUDF.DeferredJavaObject(new Text(encoder.getMessageFormat())); + Object actualOutput = udf.evaluate(args).toString(); + assertEquals("deserialize() test", expectedOutput, actualOutput != null ? actualOutput : null); + } + + @Test + public void testInvalidMessageString() throws HiveException { + GenericUDFDeserialize udf = new GenericUDFDeserialize(); + udf.initialize(new ObjectInspector[]{PrimitiveObjectInspectorFactory.writableStringObjectInspector, Review comment: nit: format -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 675609) Time Spent: 4.5h (was: 4h 20m) > Compress Hive Replication Metrics while storing > ----------------------------------------------- > > Key: HIVE-25596 > URL: https://issues.apache.org/jira/browse/HIVE-25596 > Project: Hive > Issue Type: Improvement > Reporter: Haymant Mangla > Assignee: Haymant Mangla > Priority: Major > Labels: pull-request-available > Attachments: CompressedRM_Progress(k=10), CompressedRM_Progress(k=5), > PlainTextRM_Progress(k=10), PlainTextRM_Progress(k=5) > > Time Spent: 4.5h > Remaining Estimate: 0h > > Compress the json fields of sys.replication_metrics table to optimise RDBMS > space usage. -- This message was sent by Atlassian Jira (v8.3.4#803005)