sgomezvillamor commented on code in PR #12456:
URL: https://github.com/apache/hudi/pull/12456#discussion_r1888173721
##########
hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java:
##########
@@ -26,97 +26,228 @@
import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.HoodieSyncException;
import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
+import org.apache.hudi.sync.datahub.config.HoodieDataHubDatasetIdentifier;
+import org.apache.hudi.sync.datahub.util.SchemaFieldsUtil;
+import com.linkedin.common.BrowsePathEntry;
+import com.linkedin.common.BrowsePathEntryArray;
+import com.linkedin.common.BrowsePathsV2;
import com.linkedin.common.Status;
+import com.linkedin.common.SubTypes;
+import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.DatasetUrn;
-import com.linkedin.data.template.SetMode;
-import com.linkedin.data.template.StringMap;
-import com.linkedin.dataset.DatasetProperties;
-import com.linkedin.schema.ArrayType;
-import com.linkedin.schema.BooleanType;
-import com.linkedin.schema.BytesType;
-import com.linkedin.schema.EnumType;
-import com.linkedin.schema.FixedType;
-import com.linkedin.schema.MapType;
-import com.linkedin.schema.NullType;
-import com.linkedin.schema.NumberType;
+import com.linkedin.common.urn.Urn;
+import com.linkedin.container.Container;
+import com.linkedin.container.ContainerProperties;
+import com.linkedin.data.template.StringArray;
+import com.linkedin.domain.Domains;
+import
com.linkedin.metadata.aspect.patch.builder.DatasetPropertiesPatchBuilder;
+import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.schema.OtherSchema;
-import com.linkedin.schema.RecordType;
-import com.linkedin.schema.SchemaField;
-import com.linkedin.schema.SchemaFieldArray;
-import com.linkedin.schema.SchemaFieldDataType;
import com.linkedin.schema.SchemaMetadata;
-import com.linkedin.schema.StringType;
-import com.linkedin.schema.UnionType;
+import datahub.client.MetadataWriteResponse;
import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;
-import org.apache.avro.AvroTypeException;
+import io.datahubproject.schematron.converters.avro.AvroSchemaConverter;
import org.apache.avro.Schema;
import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class DataHubSyncClient extends HoodieSyncClient {
+ private static final Logger LOG =
LoggerFactory.getLogger(DataHubSyncClient.class);
+
protected final DataHubSyncConfig config;
private final DatasetUrn datasetUrn;
+ private final Urn databaseUrn;
+ private final String tableName;
+ private final String databaseName;
private static final Status SOFT_DELETE_FALSE = new
Status().setRemoved(false);
public DataHubSyncClient(DataHubSyncConfig config, HoodieTableMetaClient
metaClient) {
super(config, metaClient);
this.config = config;
- this.datasetUrn = config.datasetIdentifier.getDatasetUrn();
+ HoodieDataHubDatasetIdentifier datasetIdentifier =
+ config.getDatasetIdentifier();
+ this.datasetUrn = datasetIdentifier.getDatasetUrn();
+ this.databaseUrn = datasetIdentifier.getDatabaseUrn();
+ this.tableName = datasetIdentifier.getTableName();
+ this.databaseName = datasetIdentifier.getDatabaseName();
}
@Override
public Option<String> getLastCommitTimeSynced(String tableName) {
throw new UnsupportedOperationException("Not supported:
`getLastCommitTimeSynced`");
}
+ protected String getLastCommitTime() {
+ try {
+ return getActiveTimeline().lastInstant().get().requestedTime();
+ } catch (Exception e) {
+ LOG.error("Failed to get last commit time", e);
+ return null;
+ }
+ }
+
@Override
public void updateLastCommitTimeSynced(String tableName) {
- updateTableProperties(tableName,
Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC,
getActiveTimeline().lastInstant().get().requestedTime()));
+ String lastCommitTime = getLastCommitTime();
Review Comment:
addressed in commit ddf806add467f71deb6a4dd7c05c471945ac53e9
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]