This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0b3bc412f2b09f4818cb0c50fccec5cfb3ddfe8d
Author: Noemi Pap-Takacs <[email protected]>
AuthorDate: Wed Mar 5 16:45:19 2025 +0100

    IMPALA-13738 (Part1): Refactor IcebergTable.getPartialInfo()
    
    This patch aims to refactor Iceberg tables in the Catalog.
    It reduces the dependency on the internal hdfsTable_ object in
    IcebergTable.getPartialInfo(). Now we depend less on HdfsTable
    when we load an Iceberg table in local catalog mode.
    The remaining dependencies will be addressed in following patches.
    
    Now creating TPartialPartitionInfo is the responsibility of the
    partitions, not the tables:
    - FeFsPartition holds a basic default implementation for the
      unpartitioned tables, which includes all Iceberg tables (we
      handle Iceberg tables similarly to unpartitioned HDFS tables
      in the Catalog).
    - HdfsPartition extends this implementation with HDFS and
      Hive ACID specific fields.
    
    Testing:
     - existing Iceberg e2e tests in local catalog mode
     - existing Hive ACID tests in local catalog mode
     - ran test_iceberg_with_puffin.py in local catalog mode
     - LocalCatalog FE test
    
    Change-Id: I75d277fb555d0088a2e28111ff529a4605d734fa
    Reviewed-on: http://gerrit.cloudera.org:8080/22605
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/catalog/FeFsPartition.java   | 42 +++++++++++
 .../java/org/apache/impala/catalog/FeFsTable.java  |  7 ++
 .../org/apache/impala/catalog/HdfsPartition.java   | 81 ++++++++++++++++++----
 .../java/org/apache/impala/catalog/HdfsTable.java  | 75 ++++----------------
 .../org/apache/impala/catalog/IcebergTable.java    | 73 +++++++++++++++----
 .../apache/impala/catalog/local/LocalFsTable.java  |  5 ++
 6 files changed, 194 insertions(+), 89 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
index bcec2a634..40fba33c1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
@@ -17,6 +17,7 @@
 package org.apache.impala.catalog;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -28,8 +29,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.thrift.TAccessLevel;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.THdfsPartitionLocation;
 import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPartialPartitionInfo;
 import org.apache.impala.thrift.TPartitionStats;
 import org.apache.impala.util.ListMap;
 
@@ -209,4 +212,43 @@ public interface FeFsPartition {
    * Returns new FeFsPartition that has the delete delta descriptors as file 
descriptors.
    */
   FeFsPartition genDeleteDeltaPartition();
+
+  /**
+   * Creates TPartialPartitionInfo for the default partition (the one and only 
partition
+   * that unpartitioned tables have).
+   */
+  default TPartialPartitionInfo getDefaultPartialPartitionInfo(
+      TGetPartialCatalogObjectRequest req) {
+
+    TPartialPartitionInfo partInfo = new TPartialPartitionInfo(getId());
+
+    if (req.table_info_selector.want_partition_names) {
+      partInfo.setName(getPartitionName());
+    }
+
+    if (req.table_info_selector.want_partition_metadata) {
+      // We do not need to set partition metadata for unpartitioned tables.
+      partInfo.setHas_incremental_stats(hasIncrementalStats());
+    }
+
+    if (req.table_info_selector.want_partition_files) {
+      partInfo.setLast_compaction_id(-1);
+      partInfo.insert_file_descriptors = new ArrayList<>();
+      partInfo.delete_file_descriptors = new ArrayList<>();
+      partInfo.file_descriptors = new ArrayList<>();
+      if (!getTable().isHiveAcid()) {
+        for (FileDescriptor fd: getFileDescriptors()) {
+          partInfo.file_descriptors.add(fd.toThrift());
+        }
+      }
+    }
+
+    if (req.table_info_selector.want_partition_stats) {
+      partInfo.setPartition_stats(getPartitionStatsCompressed());
+    }
+
+    partInfo.setIs_marked_cached(isMarkedCached());
+
+    return partInfo;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index b4d464b4f..5d66c8e74 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -277,6 +277,13 @@ public interface FeFsTable extends FeTable {
    */
   SqlConstraints getSqlConstraints();
 
+  /**
+   * @return whether it is a Hive ACID table.
+   */
+  default boolean isHiveAcid() {
+    return false;
+  }
+
   default FileSystem getFileSystem() throws CatalogException {
     FileSystem tableFs;
     try {
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index c3eef49fe..7cece6268 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -40,6 +40,7 @@ import javax.annotation.Nullable;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.LiteralExpr;
@@ -58,12 +59,14 @@ import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TExprNode;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.THdfsFileDesc;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsPartitionLocation;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPartialPartitionInfo;
 import org.apache.impala.thrift.TPartitionStats;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.impala.util.ListMap;
 import org.slf4j.Logger;
@@ -592,6 +595,70 @@ public class HdfsPartition extends CatalogObjectImpl
            encodedDeleteFileDescriptors_.size();
   }
 
+  /**
+   * Returns the requested partitions info and the number of files filtered 
out based on
+   * the ACID writeIdList.
+   * For non-ACID tables this number is always 0. This number being null 
indicates that
+   * the file descriptors are missing from the Catalog.
+   */
+  public Pair<TPartialPartitionInfo, Integer> getPartialPartitionInfo(
+      TGetPartialCatalogObjectRequest req, ValidWriteIdList reqWriteIdList) {
+
+    TPartialPartitionInfo partInfo =
+        FeFsPartition.super.getDefaultPartialPartitionInfo(req);
+    // Add HdfsPartition specific information to the basic default partition 
info.
+    if (req.table_info_selector.want_hms_partition) {
+      partInfo.hms_partition = toHmsPartition();
+    }
+    // The special "prototype partition" or the only partition of an 
unpartitioned table
+    // don't have partition metadata.
+    if (req.table_info_selector.want_partition_metadata
+        && table_.isPartitioned()
+        && id_ != CatalogObjectsConstants.PROTOTYPE_PARTITION_ID) {
+      // Don't need to make a copy here since the HMS parameters shouldn't 
change during
+      // the invocation of getPartialPartitionInfo().
+      partInfo.hms_parameters = getParameters();
+      partInfo.write_id = writeId_;
+      partInfo.hdfs_storage_descriptor = fileFormatDescriptor_.toThrift();
+      partInfo.location = getLocationAsThrift();
+    }
+    int numFilesFiltered = 0;
+    if (req.table_info_selector.want_partition_files) {
+      partInfo.setLast_compaction_id(getLastCompactionId());
+      if (table_.isHiveAcid()) {
+        try {
+          if (!getInsertFileDescriptors().isEmpty()) {
+            numFilesFiltered += addFilteredFds(getInsertFileDescriptors(),
+                partInfo.insert_file_descriptors, reqWriteIdList);
+            numFilesFiltered += addFilteredFds(getDeleteFileDescriptors(),
+                partInfo.delete_file_descriptors, reqWriteIdList);
+          } else {
+            numFilesFiltered += addFilteredFds(getFileDescriptors(),
+                partInfo.file_descriptors, reqWriteIdList);
+          }
+        } catch (CatalogException ex) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Could not use cached file descriptors of partition {} 
of table {}"
+                    + " for writeIdList {}", getPartitionName(), 
getTable().getFullName(),
+                reqWriteIdList, ex);
+          }
+          return new Pair<>(partInfo, null);
+        }
+      }
+    }
+    return new Pair<>(partInfo, numFilesFiltered);
+  }
+
+  private int addFilteredFds(List<FileDescriptor> fds, List<THdfsFileDesc> 
thriftFds,
+      ValidWriteIdList writeIdList) throws CatalogException {
+    List<FileDescriptor> filteredFds = new ArrayList<>(fds);
+    int numFilesFiltered = AcidUtils.filterFdsForAcidState(filteredFds, 
writeIdList);
+    for (FileDescriptor fd: filteredFds) {
+      thriftFds.add(fd.toThrift());
+    }
+    return numFilesFiltered;
+  }
+
   public FileMetadataStats getFileMetadataStats() {
     return fileMetadataStats_;
   }
@@ -606,20 +673,6 @@ public class HdfsPartition extends CatalogObjectImpl
     return cachedMsPartitionDescriptor_;
   }
 
-  public void setPartitionMetadata(TPartialPartitionInfo tPart) {
-    // The special "prototype partition" or the only partition of an 
unpartitioned table
-    // don't have partition metadata.
-    if (id_ == CatalogObjectsConstants.PROTOTYPE_PARTITION_ID
-        || !table_.isPartitioned()) {
-      return;
-    }
-    // Don't need to make a copy here since the caller should not modify the 
parameters.
-    tPart.hms_parameters = getParameters();
-    tPart.write_id = writeId_;
-    tPart.hdfs_storage_descriptor = fileFormatDescriptor_.toThrift();
-    tPart.location = getLocationAsThrift();
-  }
-
   /**
    * Returns a Hive-compatible partition object that may be used in calls to 
the
    * metastore.
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 020df5e1d..00994419b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -75,7 +75,6 @@ import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
-import org.apache.impala.thrift.THdfsFileDesc;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TNetworkAddress;
@@ -461,6 +460,12 @@ public class HdfsTable extends Table implements FeFsTable {
     return null;
   }
 
+  @Override
+  public boolean isHiveAcid() {
+    if (validWriteIds_ != null) return true;
+    return AcidUtils.isTransactionalTable(msTable_.getParameters());
+  }
+
   /**
    * Marks a partition dirty by registering the partition builder for its new 
instance.
    */
@@ -2293,56 +2298,16 @@ public class HdfsTable extends Table implements 
FeFsTable {
           return new TGetPartialCatalogObjectResponse().setLookup_status(
               CatalogLookupStatus.PARTITION_NOT_FOUND);
         }
-        TPartialPartitionInfo partInfo = new TPartialPartitionInfo(partId);
-
-        if (req.table_info_selector.want_partition_names) {
-          partInfo.setName(part.getPartitionName());
-        }
-
-        if (req.table_info_selector.want_partition_metadata) {
-          part.setPartitionMetadata(partInfo);
-          partInfo.setHas_incremental_stats(part.hasIncrementalStats());
-        }
-        if (req.table_info_selector.want_hms_partition) {
-          partInfo.hms_partition = part.toHmsPartition();
-        }
-
-        if (req.table_info_selector.want_partition_files) {
-          partInfo.setLast_compaction_id(part.getLastCompactionId());
-          try {
-            if (!part.getInsertFileDescriptors().isEmpty()) {
-              partInfo.file_descriptors = new ArrayList<>();
-              partInfo.insert_file_descriptors = new ArrayList<>();
-              numFilesFiltered += 
addFilteredFds(part.getInsertFileDescriptors(),
-                  partInfo.insert_file_descriptors, reqWriteIdList);
-              partInfo.delete_file_descriptors = new ArrayList<>();
-              numFilesFiltered += 
addFilteredFds(part.getDeleteFileDescriptors(),
-                  partInfo.delete_file_descriptors, reqWriteIdList);
-            } else {
-              partInfo.file_descriptors = new ArrayList<>();
-              numFilesFiltered += addFilteredFds(part.getFileDescriptors(),
-                  partInfo.file_descriptors, reqWriteIdList);
-              partInfo.insert_file_descriptors = new ArrayList<>();
-              partInfo.delete_file_descriptors = new ArrayList<>();
-            }
-            hits.inc();
-          } catch (CatalogException ex) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Could not use cached file descriptors of partition {} 
of table"
-                      + " {} for writeIdList {}", part.getPartitionName(), 
getFullName(),
-                  reqWriteIdList, ex);
-            }
-            misses.inc();
-            missingPartitionInfos.put(part, partInfo);
-          }
-        }
-
-        if (req.table_info_selector.want_partition_stats) {
-          partInfo.setPartition_stats(part.getPartitionStatsCompressed());
+        Pair<TPartialPartitionInfo, Integer> partInfoStatus =
+            part.getPartialPartitionInfo(req, reqWriteIdList);
+        if (partInfoStatus.second != null) {
+          hits.inc();
+          numFilesFiltered += partInfoStatus.second;
+        } else {
+          misses.inc();
+          missingPartitionInfos.put(part, partInfoStatus.first);
         }
-
-        partInfo.setIs_marked_cached(part.isMarkedCached());
-        resp.table_info.partitions.add(partInfo);
+        resp.table_info.partitions.add(partInfoStatus.first);
       }
     }
     // In most of the cases, the prefix map only contains one item for the 
table location.
@@ -2373,16 +2338,6 @@ public class HdfsTable extends Table implements 
FeFsTable {
     return resp;
   }
 
-  private int addFilteredFds(List<FileDescriptor> fds, List<THdfsFileDesc> 
thriftFds,
-      ValidWriteIdList writeIdList) throws CatalogException {
-    List<FileDescriptor> filteredFds = new ArrayList<>(fds);
-    int numFilesFiltered = AcidUtils.filterFdsForAcidState(filteredFds, 
writeIdList);
-    for (FileDescriptor fd: filteredFds) {
-      thriftFds.add(fd.toThrift());
-    }
-    return numFilesFiltered;
-  }
-
   private double getFileMetadataCacheHitRate() {
     long hits = metrics_.getCounter(FILEMETADATA_CACHE_HIT_METRIC).getCount();
     long misses = 
metrics_.getCounter(FILEMETADATA_CACHE_MISS_METRIC).getCount();
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 38d72914c..09d6e8d12 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -42,6 +44,7 @@ import org.apache.impala.analysis.IcebergPartitionTransform;
 import org.apache.impala.catalog.iceberg.GroupedContentFiles;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.CatalogLookupStatus;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
@@ -55,6 +58,7 @@ import org.apache.impala.thrift.TIcebergPartitionSpec;
 import org.apache.impala.thrift.TIcebergPartitionStats;
 import org.apache.impala.thrift.TIcebergTable;
 import org.apache.impala.thrift.TPartialPartitionInfo;
+import org.apache.impala.thrift.TSqlConstraints;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
@@ -553,12 +557,6 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
           if (!col.getStats().hasNumDistinctValues()
               || snapshot.timestampMillis() >= hmsStatsTimestampMs) {
             col.getStats().setNumDistinctValues(ndv);
-
-            // In local catalog mode, the stats sent from the catalog are 
those of
-            // 'hdfsTable_', not those of this class.
-            Column hdfsTableCol = hdfsTable_.getColumn(col.getName());
-            Preconditions.checkNotNull(hdfsTableCol);
-            hdfsTableCol.getStats().setNumDistinctValues(ndv);
           }
         }
       }
@@ -711,7 +709,7 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
       List<TIcebergPartitionSpec> params) {
     List<IcebergPartitionSpec> ret = new ArrayList<>();
     for (TIcebergPartitionSpec param : params) {
-      // Non-partition iceberg table only has one PartitionSpec with an empty
+      // Non-partitioned iceberg table only has one PartitionSpec with an empty
       // PartitionField set and a partition id
       if (param.getPartition_fields() != null) {
         List<IcebergPartitionField> fields = new ArrayList<>();
@@ -760,16 +758,61 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
   public TGetPartialCatalogObjectResponse getPartialInfo(
       TGetPartialCatalogObjectRequest req) throws CatalogException {
     Preconditions.checkState(isLoaded(), "unloaded table: %s", getFullName());
-    Map<HdfsPartition, TPartialPartitionInfo> missingPartialInfos = new 
HashMap<>();
-    TGetPartialCatalogObjectResponse resp =
-        getHdfsTable().getPartialInfo(req, missingPartialInfos);
-    if (resp.table_info != null) {
-      // Clear HdfsTable virtual columns and add IcebergTable virtual columns.
-      resp.table_info.unsetVirtual_columns();
-      for (VirtualColumn vCol : getVirtualColumns()) {
-        resp.table_info.addToVirtual_columns(vCol.toThrift());
+    TGetPartialCatalogObjectResponse resp = super.getPartialInfo(req);
+    Preconditions.checkState(resp.table_info != null);
+    boolean wantPartitionInfo = req.table_info_selector.want_partition_files
+        || req.table_info_selector.want_partition_metadata
+        || req.table_info_selector.want_partition_names
+        || req.table_info_selector.want_partition_stats;
+    Preconditions.checkState(!req.table_info_selector.want_hms_partition);
+    Collection<Long> partIds = req.table_info_selector.partition_ids;
+
+    if (partIds != null && partIds.isEmpty()) {
+      resp.table_info.partitions = Lists.newArrayListWithCapacity(0);
+    } else if (wantPartitionInfo || partIds != null) {
+      // Caller specified at least one piece of partition info. If they didn't 
explicitly
+      // specify the partitions, it means that they want the info for all 
partitions.
+      // (Iceberg tables are handled as unpartitioned tables, having only 1 
partition.)
+      Preconditions.checkState(partIds == null || partIds.size() == 1);
+      long partId = getPartitionMap().keySet().iterator().next();
+      FeFsPartition part = (FeFsPartition) getPartitionMap().get(partId);
+      if (part == null) {
+        LOG.warn(String.format("Missing partition ID: %s, Table: %s", partId,
+            getFullName()));
+        return new TGetPartialCatalogObjectResponse().setLookup_status(
+            CatalogLookupStatus.PARTITION_NOT_FOUND);
       }
+      TPartialPartitionInfo partInfo = 
part.getDefaultPartialPartitionInfo(req);
+      resp.table_info.partitions = Lists.newArrayList(partInfo);
+    }
+
+    // In most of the cases, the prefix map only contains one item for the 
table location.
+    // Here we always send it since it's small.
+    resp.table_info.setPartition_prefixes(
+        hdfsTable_.partitionLocationCompressor_.getPrefixes());
+
+    if (req.table_info_selector.want_partition_files) {
+      // TODO(todd) we are sending the whole host index even if we returned 
only
+      // one file -- maybe not so efficient, but the alternative is to do a 
bunch
+      // of cloning of file descriptors which might increase memory pressure.
+      resp.table_info.setNetwork_addresses(getHostIndex().getList());
     }
+
+    if (req.table_info_selector.want_table_constraints) {
+      TSqlConstraints sqlConstraints =
+          new TSqlConstraints(getSqlConstraints().getPrimaryKeys(),
+              getSqlConstraints().getForeignKeys());
+      resp.table_info.setSql_constraints(sqlConstraints);
+    }
+    // Publish the isMarkedCached_ marker so coordinators don't need to 
validate
+    // it again which requires additional HDFS RPCs.
+    resp.table_info.setIs_marked_cached(isMarkedCached());
+
+    // Add IcebergTable virtual columns.
+    for (VirtualColumn vCol : getVirtualColumns()) {
+      resp.table_info.addToVirtual_columns(vCol.toThrift());
+    }
+
     if (req.table_info_selector.want_iceberg_table) {
       resp.table_info.setIceberg_table(Utils.getTIcebergTable(this));
       if (!resp.table_info.isSetNetwork_addresses()) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 4f5521ddf..bd1113368 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -297,6 +297,11 @@ public class LocalFsTable extends LocalTable implements 
FeFsTable {
     return null;
   }
 
+  @Override
+  public boolean isHiveAcid() {
+    return AcidUtils.isTransactionalTable(getMetaStoreTable().getParameters());
+  }
+
   @Override
   public TResultSet getTableStats() {
     return HdfsTable.getTableStats(this);

Reply via email to