felixcheung commented on a change in pull request #3340: [ZEPPELIN-4068] 
Implement MongoNotebookRepo
URL: https://github.com/apache/zeppelin/pull/3340#discussion_r268418951
 
 

 ##########
 File path: 
zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepo.java
 ##########
 @@ -0,0 +1,390 @@
+package org.apache.zeppelin.notebook.repo;
+
+import static com.mongodb.client.model.Filters.and;
+import static com.mongodb.client.model.Filters.eq;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.ArrayUtils;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.bson.types.ObjectId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.AggregateIterable;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.Updates;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.NoteInfo;
+import org.apache.zeppelin.user.AuthenticationInfo;
+
+/**
+ * Backend for storing Notebook on MongoDB.
+ */
+public class MongoNotebookRepo implements NotebookRepo {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MongoNotebookRepo.class);
+
+  private ZeppelinConfiguration conf;
+
+  private MongoClient client;
+
+  private MongoDatabase db;
+
+  private MongoCollection<Document> notes;
+
+  private MongoCollection<Document> folders;
+
+  private String folderName;
+
+  public MongoNotebookRepo() {
+  }
+
+  @Override
+  public void init(ZeppelinConfiguration zConf) throws IOException {
+    this.conf = zConf;
+    client = new MongoClient(new MongoClientURI(conf.getMongoUri()));
+    db = client.getDatabase(conf.getMongoDatabase());
+    notes = db.getCollection(conf.getMongoCollection());
+    folderName = conf.getMongoFolder();
+    folders = db.getCollection(folderName);
+
+    if (conf.getMongoAutoimport()) {
+      // import local notes into MongoDB
+      insertFileSystemNotes();
+    }
+  }
+
+  private void insertFileSystemNotes() throws IOException {
+    NotebookRepo vfsRepo = new VFSNotebookRepo();
+    vfsRepo.init(this.conf);
+    Map<String, NoteInfo> infos = vfsRepo.list(null);
+    for (NoteInfo info : infos.values()) {
+      Note note = vfsRepo.get(info.getId(), info.getPath(), null);
+      save(note, null);
+    }
+
+    vfsRepo.close();
+  }
+
+  @Override
+  public Map<String, NoteInfo> list(AuthenticationInfo subject) throws 
IOException {
+    Map<String, NoteInfo> infos = new HashMap<>();
+
+    Document match = new Document("$match", new Document(Fields.IS_DIR, 
false));
+    Document graphLookup = new Document("$graphLookup"
+        , new Document("from", folderName)
+        .append("startWith", "$" + Fields.PID)
+        .append("connectFromField", Fields.PID)
+        .append("connectToField", Fields.ID)
+        .append("as", "fullPath"));
+
+
+    ArrayList<Document> list = Lists.newArrayList(match, graphLookup);
+
+    AggregateIterable<Document> aggregate = folders.aggregate(list);
+    for (Document document : aggregate) {
+      String id = document.getString(Fields.ID);
+      String name = document.getString(Fields.NAME);
+      List<Document> fullPath = document.get("fullPath", List.class);
+
+      StringBuilder sb = new StringBuilder();
+      for (Document pathNode : fullPath) {
+        sb.append("/").append(pathNode.getString(Fields.NAME));
+      }
+
+      String fullPathStr = sb.append("/").append(name).toString();
+
+      NoteInfo noteInfo = new NoteInfo(id, fullPathStr);
+      infos.put(id, noteInfo);
+    }
+
+    return infos;
+  }
+
+  @Override
+  public Note get(String noteId, String notePath, AuthenticationInfo subject) 
throws IOException {
+    ////validate folder exists
+    //String[] pathArray = toPathArray(notePath, false);
+    //findFolder(pathArray);
+    LOG.info("====get{}:{}", noteId, notePath);
+
+    return getNote(noteId, notePath);
+  }
+
+  private Note getNote(String noteId, String notePath) throws IOException {
+    Document doc = notes.find(eq(Fields.ID, noteId)).first();
+    if (doc == null) {
+      throw new IOException("Note '" + noteId + "' in path '" + notePath + 
"'not found");
+    }
+
+    return documentToNote(doc);
+  }
+
+  @Override
+  public void save(Note note, AuthenticationInfo subject) throws IOException {
+    LOG.info("====save {}:", note);
+    String[] pathArray = toPathArray(note.getPath(), false);
+    String pId = completeFolder(pathArray);
+    saveNotePath(note.getId(), note.getName(), pId);
+    saveNote(note);
+  }
+
+  /**
+   * create until parent folder if not exists, save note to path.
+   *
+   * @param noteId note id
+   * @param pId    note parent folder id
+   */
+  private void saveNotePath(String noteId, String noteName, String pId) {
+    Document filter = new Document(Fields.ID, noteId);
+    Document doc = new Document(Fields.ID, noteId)
+        .append(Fields.PID, pId)
+        .append(Fields.IS_DIR, false)
+        .append(Fields.NAME, noteName);
+
+    folders.replaceOne(filter, doc, new UpdateOptions().upsert(true));
+  }
+
+  private void saveNote(Note note) {
+    Document doc = noteToDocument(note);
+    notes.replaceOne(eq(Fields.ID, note.getId()), doc, new 
UpdateOptions().upsert(true));
+  }
+
+  @Override
+  public void move(String noteId, String notePath, String newNotePath
+      , AuthenticationInfo subject) throws IOException {
+    LOG.info("====move{}:{}:{}", noteId, notePath, newNotePath);
+    String[] pathArray = toPathArray(newNotePath, true);
+    String[] parentPathArray = Arrays.copyOfRange(pathArray, 0, 
pathArray.length - 1);
+    String noteName = pathArray[pathArray.length - 1];
+    String pId = completeFolder(parentPathArray);
+    moveNote(noteId, pId, noteName);
+  }
+
+  private void moveNote(String noteId, String parentId, String noteName) {
+    Document doc = new Document("$set"
+        , new Document(Fields.PID, parentId)
+        .append(Fields.NAME, noteName));
+
+    folders.updateOne(eq(Fields.ID, noteId), doc);
+    notes.updateOne(eq(Fields.ID, noteId), Updates.set(Fields.NAME, noteName));
+  }
+
+  @Override
+  public void move(String folderPath, String newFolderPath
+      , AuthenticationInfo subject) throws IOException {
+    LOG.info("====move folder {}:{}", folderPath, newFolderPath);
+    if (folderPath.equals(newFolderPath)) {
+      return;
+    }
+
+    String[] pathArray = toPathArray(folderPath, true);
+    String[] newPathArray = toPathArray(newFolderPath, true);
+    String[] newFolderParentArray = Arrays.copyOfRange(newPathArray, 0, 
newPathArray.length - 1);
+    String id = findFolder(pathArray);
+    String newPId = completeFolder(newFolderParentArray);
+    String newFolderName = newPathArray[newPathArray.length - 1];
+
+    Document doc = new Document("$set"
+        , new Document(Fields.ID, id)
+        .append(Fields.PID, newPId)
+        .append(Fields.IS_DIR, true)
+        .append(Fields.NAME, newFolderName));
+
+    folders.updateOne(eq(Fields.ID, id), doc);
+  }
+
+  @Override
+  public void remove(String noteId, String notePath
+      , AuthenticationInfo subject) throws IOException {
+    LOG.info("====remove {}:{}", noteId, notePath);
+    folders.deleteOne(eq(Fields.ID, noteId));
+    notes.deleteOne(eq(Fields.ID, noteId));
+
+    //clean empty folder
+    String[] pathArray = toPathArray(notePath, false);
+    for (int i = pathArray.length; i >= 0; i--) {
+      String[] current = Arrays.copyOfRange(pathArray, 0, i);
+      String folderId = findFolder(current);
+      boolean isEmpty = folders.count(eq(Fields.PID, folderId)) <= 0;
+      if (isEmpty) {
+        folders.deleteOne(eq(Fields.ID, folderId));
 
 Review comment:
   this perhaps could have a race condition? if a new note is being added in 
the same folder

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to