wuyunfeng commented on a change in pull request #3454:
URL: https://github.com/apache/incubator-doris/pull/3454#discussion_r421200957
##########
File path: fe/src/main/java/org/apache/doris/external/EsIndexState.java
##########
@@ -76,66 +68,45 @@ public TNetworkAddress randomAddress(Map<String,
EsNodeInfo> nodesInfo) {
EsNodeInfo[] nodeInfos = (EsNodeInfo[]) nodesInfo.values().toArray();
return nodeInfos[seed].getPublishAddress();
}
-
- public static EsIndexState parseIndexStateV55(String indexName, JSONObject
indicesRoutingMap,
- JSONObject nodesMap,
- JSONObject indicesMetaMap, PartitionInfo partitionInfo) throws
AnalysisException {
+
+ public static EsIndexState parseIndexState(String indexName, JSONObject
nodesMap,
Review comment:
EsIndexState maybe need a better name
##########
File path: fe/src/main/java/org/apache/doris/external/EsIndexState.java
##########
@@ -76,66 +68,45 @@ public TNetworkAddress randomAddress(Map<String,
EsNodeInfo> nodesInfo) {
EsNodeInfo[] nodeInfos = (EsNodeInfo[]) nodesInfo.values().toArray();
return nodeInfos[seed].getPublishAddress();
}
-
- public static EsIndexState parseIndexStateV55(String indexName, JSONObject
indicesRoutingMap,
- JSONObject nodesMap,
- JSONObject indicesMetaMap, PartitionInfo partitionInfo) throws
AnalysisException {
+
+ public static EsIndexState parseIndexState(String indexName, JSONObject
nodesMap,
Review comment:
Can you move this parse method into RestClient? such as `getHttpNodes`
##########
File path: fe/src/main/java/org/apache/doris/external/EsIndexState.java
##########
@@ -76,66 +68,45 @@ public TNetworkAddress randomAddress(Map<String,
EsNodeInfo> nodesInfo) {
EsNodeInfo[] nodeInfos = (EsNodeInfo[]) nodesInfo.values().toArray();
return nodeInfos[seed].getPublishAddress();
}
-
- public static EsIndexState parseIndexStateV55(String indexName, JSONObject
indicesRoutingMap,
- JSONObject nodesMap,
- JSONObject indicesMetaMap, PartitionInfo partitionInfo) throws
AnalysisException {
+
+ public static EsIndexState parseIndexState(String indexName, JSONObject
nodesMap,
+ JSONArray shards) {
EsIndexState indexState = new EsIndexState(indexName);
- JSONObject shardRoutings =
indicesRoutingMap.getJSONObject(indexName).getJSONObject("shards");
- for (String shardKey : shardRoutings.keySet()) {
+ int length = shards.length();
+ for (int i = 0; i < length; i++) {
List<EsShardRouting> singleShardRouting = Lists.newArrayList();
- JSONArray shardRouting = shardRoutings.getJSONArray(shardKey);
- for (int i = 0; i < shardRouting.length(); ++i) {
- JSONObject shard = shardRouting.getJSONObject(i);
+ JSONArray shardsArray = shards.getJSONArray(i);
+ int arrayLength = shardsArray.length();
+ for (int j = 0; j < arrayLength; j++) {
+ JSONObject shard = shardsArray.getJSONObject(j);
String shardState = shard.getString("state");
if ("STARTED".equalsIgnoreCase(shardState)) {
Review comment:
take the `relocating` state into account maybe as well?
##########
File path: fe/src/main/java/org/apache/doris/external/EsRestClient.java
##########
@@ -49,48 +49,52 @@
.readTimeout(10, TimeUnit.SECONDS)
.build();
- private String basicAuth;
-
- private int nextClient = 0;
+ private Request.Builder builder;
private String[] nodes;
private String currentNode;
+ private int currentNodeIndex = 0;
public EsRestClient(String[] nodes, String authUser, String authPassword) {
this.nodes = nodes;
+ this.builder = new Request.Builder();
if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) {
- basicAuth = Credentials.basic(authUser, authPassword);
+ this.builder.addHeader(HttpHeaders.AUTHORIZATION,
Credentials.basic(authUser, authPassword));
}
- selectNextNode();
+ this.currentNode = nodes[currentNodeIndex];
}
- private boolean selectNextNode() {
- if (nextClient >= nodes.length) {
- return false;
+ private void selectNextNode() {
+ currentNodeIndex++;
+ // reroute, because the previously failed node may have already been
restored
+ if (currentNodeIndex >= nodes.length) {
+ currentNodeIndex = 0;
}
- currentNode = nodes[nextClient++];
- return true;
+ currentNode = nodes[currentNodeIndex];
}
public Map<String, EsNodeInfo> getHttpNodes() throws Exception {
Map<String, Map<String, Object>> nodesData = get("_nodes/http",
"nodes");
if (nodesData == null) {
return Collections.emptyMap();
}
- Map<String, EsNodeInfo> nodes = new HashMap<>();
+ Map<String, EsNodeInfo> nodesMap = new HashMap<>();
for (Map.Entry<String, Map<String, Object>> entry :
nodesData.entrySet()) {
EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue());
if (node.hasHttp()) {
- nodes.put(node.getId(), node);
+ nodesMap.put(node.getId(), node);
}
}
- return nodes;
+ return nodesMap;
}
- public String getIndexMetaData(String indexName) {
- String path = "_cluster/state?indices=" + indexName
- + "&metric=routing_table,nodes,metadata&expand_wildcards=open";
+ public String getIndexMapping(String indexName) {
Review comment:
it is better to return the `top` needed, such as above `Map<String,
EsNodeInfo> getHttpNodes()`
##########
File path: fe/src/main/java/org/apache/doris/external/EsRestClient.java
##########
@@ -49,48 +49,52 @@
.readTimeout(10, TimeUnit.SECONDS)
.build();
- private String basicAuth;
-
- private int nextClient = 0;
+ private Request.Builder builder;
private String[] nodes;
private String currentNode;
+ private int currentNodeIndex = 0;
public EsRestClient(String[] nodes, String authUser, String authPassword) {
this.nodes = nodes;
+ this.builder = new Request.Builder();
if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) {
- basicAuth = Credentials.basic(authUser, authPassword);
+ this.builder.addHeader(HttpHeaders.AUTHORIZATION,
Credentials.basic(authUser, authPassword));
}
- selectNextNode();
+ this.currentNode = nodes[currentNodeIndex];
}
- private boolean selectNextNode() {
- if (nextClient >= nodes.length) {
- return false;
+ private void selectNextNode() {
+ currentNodeIndex++;
+ // reroute, because the previously failed node may have already been
restored
+ if (currentNodeIndex >= nodes.length) {
+ currentNodeIndex = 0;
}
- currentNode = nodes[nextClient++];
- return true;
+ currentNode = nodes[currentNodeIndex];
}
public Map<String, EsNodeInfo> getHttpNodes() throws Exception {
Map<String, Map<String, Object>> nodesData = get("_nodes/http",
"nodes");
if (nodesData == null) {
return Collections.emptyMap();
}
- Map<String, EsNodeInfo> nodes = new HashMap<>();
+ Map<String, EsNodeInfo> nodesMap = new HashMap<>();
for (Map.Entry<String, Map<String, Object>> entry :
nodesData.entrySet()) {
EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue());
if (node.hasHttp()) {
- nodes.put(node.getId(), node);
+ nodesMap.put(node.getId(), node);
}
}
- return nodes;
+ return nodesMap;
}
- public String getIndexMetaData(String indexName) {
- String path = "_cluster/state?indices=" + indexName
- + "&metric=routing_table,nodes,metadata&expand_wildcards=open";
+ public String getIndexMapping(String indexName) {
+ String path = indexName + "/_mapping";
return execute(path);
+ }
+ public String getSearchShards(String indexName) {
Review comment:
same above
##########
File path: fe/src/main/java/org/apache/doris/external/EsStateStore.java
##########
@@ -81,30 +68,34 @@ public void deRegisterTable(long tableId) {
esTables.remove(tableId);
LOG.info("deregister table [{}] from sync list", tableId);
}
-
+
@Override
protected void runAfterCatalogReady() {
for (EsTable esTable : esTables.values()) {
try {
EsRestClient client = new EsRestClient(esTable.getSeeds(),
- esTable.getUserName(), esTable.getPasswd());
- // if user not specify the es version, try to get the remote
cluster versoin
- // in the future, we maybe need this version
- String indexMetaData =
client.getIndexMetaData(esTable.getIndexName());
- if (indexMetaData == null) {
+ esTable.getUserName(), esTable.getPasswd());
+
+ String indexMapping =
client.getIndexMapping(esTable.getIndexName());
+ if (indexMapping == null) {
continue;
}
- EsTableState esTableState = parseClusterState55(indexMetaData,
esTable);
+ loadEsIndexMapping(indexMapping, esTable);
+
+ String shardLocation =
client.getSearchShards(esTable.getIndexName());
+ EsTableState esTableState = loadEsSearchShards(shardLocation,
esTable);
if (esTableState == null) {
continue;
}
+
if (EsTable.TRANSPORT_HTTP.equals(esTable.getTransport())) {
Map<String, EsNodeInfo> nodesInfo = client.getHttpNodes();
esTableState.addHttpAddress(nodesInfo);
}
esTable.setEsTableState(esTableState);
Review comment:
maybe we should rename such `XXXTableState` method name?
##########
File path: fe/src/main/java/org/apache/doris/external/EsRestClient.java
##########
@@ -137,10 +135,9 @@ private String execute(String path) {
if (!(currentNode.startsWith("http://") ||
currentNode.startsWith("https://"))) {
currentNode = "http://" + currentNode;
}
-
Request request = builder.get()
- .url(currentNode + "/" + path)
- .build();
+ .url(currentNode + "/" + path)
+ .build();
LOG.trace("es rest client request URL: {}", currentNode + "/" +
path);
Review comment:
```suggestion
if (LOG.isTraceEnabled) {
LOG.trace("es rest client request URL: {}", currentNode + "/" +
path);
}
```
##########
File path: fe/src/main/java/org/apache/doris/external/EsStateStore.java
##########
@@ -130,74 +121,30 @@ public void loadTableFromCatalog() {
}
}
- private EsTableState loadEsIndexMetadataV55(final EsTable esTable) {
- OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();
- clientBuilder.authenticator(new Authenticator() {
- @Override
- public Request authenticate(Route route, Response response) throws
IOException {
- String credential = Credentials.basic(esTable.getUserName(),
esTable.getPasswd());
- return response.request().newBuilder().header("Authorization",
credential).build();
- }
- });
- String[] seeds = esTable.getSeeds();
- for (String seed : seeds) {
- String url = seed + "/_cluster/state?indices="
- + esTable.getIndexName()
- +
"&metric=routing_table,nodes,metadata&expand_wildcards=open";
- String basicAuth = "";
- try {
- Request request = new Request.Builder()
- .get()
- .url(url)
- .addHeader("Authorization", basicAuth)
- .build();
- Call call = clientBuilder.build().newCall(request);
- Response response = call.execute();
- String responseStr = response.body().string();
- if (response.isSuccessful()) {
- try {
- EsTableState esTableState =
parseClusterState55(responseStr, esTable);
- if (esTableState != null) {
- return esTableState;
- }
- } catch (Exception e) {
- LOG.warn("errors while parse response msg {}",
responseStr, e);
- }
- } else {
- LOG.info("errors while call es [{}] to get state info {}",
url, responseStr);
- }
- } catch (Exception e) {
- LOG.warn("errors while call es [{}]", url, e);
- }
- }
- return null;
- }
-
- @VisibleForTesting
- public EsTableState parseClusterState55(String responseStr, EsTable
esTable)
- throws DdlException, AnalysisException,
ExternalDataSourceException {
- JSONObject jsonObject = new JSONObject(responseStr);
- String clusterName = jsonObject.getString("cluster_name");
- JSONObject nodesMap = jsonObject.getJSONObject("nodes");
- // we build the doc value context for fields maybe used for scanning
- // "properties": {
- // "city": {
- // "type": "text", // text field does not have docvalue
- // "fields": {
- // "raw": {
- // "type": "keyword"
- // }
- // }
- // }
- // }
- // then the docvalue context provided the mapping between the select
field and real request field :
- // {"city": "city.raw"}
- JSONObject indicesMetaMap =
jsonObject.getJSONObject("metadata").getJSONObject("indices");
- JSONObject indexMetaMap =
indicesMetaMap.optJSONObject(esTable.getIndexName());
- if (indexMetaMap != null && (esTable.isKeywordSniffEnable() ||
esTable.isDocValueScanEnable())) {
- JSONObject mappings = indexMetaMap.optJSONObject("mappings");
+ // Configure keyword and doc_values by mapping
+ public void loadEsIndexMapping(String indexMapping, EsTable esTable) {
+ JSONObject jsonObject = new JSONObject(indexMapping);
Review comment:
Can we move all `parse json` into one location?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]