xintongsong commented on code in PR #332:
URL: https://github.com/apache/flink-agents/pull/332#discussion_r2605064694


##########
python/flink_agents/api/vector_stores/vector_store.py:
##########
@@ -176,3 +250,74 @@ def query_embedding(self, embedding: List[float], limit: 
int = 10, **kwargs: Any
         Returns:
             List of documents matching the search criteria
         """
+
+    @abstractmethod
+    def add_embedding(
+        self,
+        *,
+        documents: List[Document],
+        embeddings: List[List[float]],
+        collection_name: str | None = None,
+        **kwargs: Any,
+    ) -> List[str]:
+        """Add documents with pre-computed embeddings to the vector store.
+
+        Args:
+            documents: Documents to add to the vector store
+            embeddings: Pre-computed embedding vector for each document
+            collection_name: The collection name of the documents to add. 
Optional.
+            **kwargs: Vector store-specific parameters (collection, namespace, 
etc.)
+
+        Returns:
+            List of document IDs that were added to the vector store
+        """
+
+
+class Collection(BaseModel):
+    """Represents a collection of documents."""
+    name: str
+    size: int

Review Comment:
   What is this size used for and how do we keep it consistent?



##########
python/flink_agents/api/memory/long_term_memory.py:
##########
@@ -0,0 +1,294 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import importlib
+from abc import ABC, abstractmethod
+from datetime import datetime
+from enum import Enum
+from typing import Any, Dict, List, Type
+
+from pydantic import (
+    BaseModel,
+    Field,
+    field_serializer,
+    model_validator,
+)
+from typing_extensions import override
+
+from flink_agents.api.chat_message import ChatMessage
+from flink_agents.api.prompts.prompt import Prompt
+
+ItemType = str | ChatMessage
+
+
+class CompactionStrategyType(Enum):
+    """Strategy for compact memory set."""
+
+    SUMMARIZATION = "summarization"
+
+
+class CompactionStrategy(BaseModel, ABC):
+    """Strategy for compact memory set."""
+
+    @property
+    @abstractmethod
+    def type(self) -> CompactionStrategyType:
+        """Return type of this strategy."""
+
+
+class SummarizationStrategy(CompactionStrategy):
+    """Summarization strategy."""
+
+    model: str
+    prompt: str | Prompt | None = None
+
+    @property
+    @override
+    def type(self) -> CompactionStrategyType:
+        return CompactionStrategyType.SUMMARIZATION
+
+
+class LongTermMemoryBackend(Enum):
+    """Backend for Long-Term Memory."""
+
+    VectorStore = "vectorstore"

Review Comment:
   ```suggestion
       EXTERNAL_VECTOR_STORE = "external_vector_store"
   ```



##########
python/flink_agents/api/memory/long_term_memory.py:
##########
@@ -0,0 +1,294 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import importlib
+from abc import ABC, abstractmethod
+from datetime import datetime
+from enum import Enum
+from typing import Any, Dict, List, Type
+
+from pydantic import (
+    BaseModel,
+    Field,
+    field_serializer,
+    model_validator,
+)
+from typing_extensions import override
+
+from flink_agents.api.chat_message import ChatMessage
+from flink_agents.api.prompts.prompt import Prompt
+
+ItemType = str | ChatMessage
+
+
+class CompactionStrategyType(Enum):
+    """Strategy for compact memory set."""
+
+    SUMMARIZATION = "summarization"
+
+
+class CompactionStrategy(BaseModel, ABC):
+    """Strategy for compact memory set."""
+
+    @property
+    @abstractmethod
+    def type(self) -> CompactionStrategyType:
+        """Return type of this strategy."""
+
+
+class SummarizationStrategy(CompactionStrategy):
+    """Summarization strategy."""
+
+    model: str
+    prompt: str | Prompt | None = None
+
+    @property
+    @override
+    def type(self) -> CompactionStrategyType:
+        return CompactionStrategyType.SUMMARIZATION
+
+
+class LongTermMemoryBackend(Enum):
+    """Backend for Long-Term Memory."""
+
+    VectorStore = "vectorstore"
+
+
+class DatetimeRange(BaseModel):
+    """Represents a datetime range."""
+
+    start: datetime
+    end: datetime
+
+
+class MemorySetItem(BaseModel):
+    """Represents a long term memory item retrieved from vector store.
+
+    Attributes:
+        memory_set_name: The name of the memory set this item belongs to.
+        id: The id of this item.
+        value: The value of this item.
+        compacted: Whether this item has been compacted.
+        created_time: The timestamp this item was added to the memory set.
+        last_accessed_time: The timestamp this item was last accessed.
+        additional_metadata: Additional metadata for this item.
+    """
+
+    memory_set_name: str
+    id: str
+    value: Any
+    compacted: bool = False
+    created_time: datetime | DatetimeRange = None
+    last_accessed_time: datetime
+    additional_metadata: Dict[str, Any] | None = None
+
+
+class MemorySet(BaseModel):
+    """Represents a long term memory set contains memory items.
+
+    Attributes:
+        name: The name of this memory set.
+        item_type: The type of items stored in this set.
+        capacity: The capacity of this memory set.
+        compaction_strategy: Compaction strategy and additional arguments used
+        to compact memory set.
+        size: The size of this memory set.
+    """
+
+    name: str
+    item_type: Type[str] | Type[ChatMessage]
+    capacity: int
+    compaction_strategy: CompactionStrategy
+    size: int = Field(default=0, exclude=True)
+    ltm: "BaseLongTermMemory" = Field(default=None, exclude=True)
+
+    @field_serializer("item_type")
+    def _serialize_item_type(self, item_type: Type) -> Dict[str, str]:
+        return {"module": item_type.__module__, "name": item_type.__name__}
+
+    @field_serializer("compaction_strategy")
+    def _serialize_compaction_strategy(
+        self, compaction_strategy: CompactionStrategy
+    ) -> Dict[str, str]:
+        data = compaction_strategy.model_dump()
+        data.update(
+            {
+                "module": compaction_strategy.__class__.__module__,
+                "name": compaction_strategy.__class__.__name__,
+            }
+        )
+        return data
+
+    @model_validator(mode="before")
+    def _deserialize_item_type(self) -> "MemorySet":
+        if isinstance(self["item_type"], Dict):
+            module = importlib.import_module(self["item_type"]["module"])
+            self["item_type"] = getattr(module, self["item_type"]["name"])
+        if isinstance(self["compaction_strategy"], Dict):
+            module = 
importlib.import_module(self["compaction_strategy"].pop("module"))
+            clazz = getattr(module, self["compaction_strategy"].pop("name"))
+            self["compaction_strategy"] = clazz.model_validate(
+                self["compaction_strategy"]
+            )
+        return self
+
+    def add(
+        self, items: ItemType | List[ItemType], ids: str | List[str] | None = 
None
+    ) -> None:
+        """Add a memory item to the set, currently only support item with
+        type str or ChatMessage.
+
+        If the capacity of this memory set is reached, will trigger reduce
+        operation to manage the memory set size.
+
+        Args:
+            items: The items to be inserted to this set.
+            ids: The ids of the items to be inserted. Optional.
+        """
+        self.ltm.add(memory_set=self, memory_items=items, ids=ids)
+
+    def get(
+        self, ids: str | List[str] | None = None
+    ) -> MemorySetItem | List[MemorySetItem]:
+        """Retrieve memory items. If no item id provided, will return all 
items.
+
+        Args:
+            ids: The ids of the items to retrieve.
+
+        Returns:
+            The memory items retrieved.
+        """
+        return self.ltm.get(memory_set=self, ids=ids)
+
+    def search(self, query: str, limit: int, **kwargs: Any) -> 
List[MemorySetItem]:
+        """Retrieve n memory items related to the query.
+
+        Args:
+            query: The query to search for.
+            limit: The number of items to retrieve.
+            **kwargs: Additional arguments for search.
+        """
+        return self.ltm.search(memory_set=self, query=query, limit=limit, 
**kwargs)
+
+
+class BaseLongTermMemory(ABC, BaseModel):
+    """Base Abstract class for long term memory."""
+
+    @abstractmethod
+    def get_or_create_memory_set(
+        self,
+        name: str,
+        item_type: str | Type[ChatMessage],
+        capacity: int,
+        compaction_strategy: CompactionStrategy,
+    ) -> MemorySet:
+        """Create a memory set, if the memory set already exists, return it.
+
+        Args:
+            name: The name of the memory set.
+            item_type: The type of the memory item.
+            capacity: The capacity of the memory set.
+            compaction_strategy: The compaction strategy and arguments for
+            storge management.
+
+        Returns:
+            The created memory set.
+        """
+
+    @abstractmethod
+    def get_memory_set(self, name: str) -> MemorySet:
+        """Get the memory set.
+
+        Args:
+            name: The name of the memory set.
+
+        Returns:
+            The memory set.
+        """
+
+    @abstractmethod
+    def delete_memory_set(self, name: str) -> MemorySet:

Review Comment:
   It probably make sense to just return a boolean. The memory set should be no 
longer bond with the underlying store.



##########
python/flink_agents/api/vector_stores/vector_store.py:
##########
@@ -176,3 +250,74 @@ def query_embedding(self, embedding: List[float], limit: 
int = 10, **kwargs: Any
         Returns:
             List of documents matching the search criteria
         """
+
+    @abstractmethod
+    def add_embedding(

Review Comment:
   `add_embedding` and `query_embedding` are not meant for users to call. We 
probably should mark them as protected.



##########
python/flink_agents/runtime/memory/compaction_functions.py:
##########
@@ -0,0 +1,149 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import TYPE_CHECKING, List, Type, cast
+
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.memory.long_term_memory import (
+    BaseLongTermMemory,
+    MemorySet,
+    MemorySetItem,
+    SummarizationStrategy,
+)
+from flink_agents.api.resource import ResourceType
+from flink_agents.api.runner_context import RunnerContext
+
+if TYPE_CHECKING:
+    from flink_agents.api.chat_models.chat_model import BaseChatModelSetup
+    from flink_agents.api.prompts.prompt import Prompt
+
+
+def summarize(

Review Comment:
   It might require more efforts than we can take in this PR to tune the 
compaction strategy in order to get an ideal performance. But at least in the 
abstraction we should not assume this only returns one message.



##########
python/flink_agents/api/memory/long_term_memory.py:
##########
@@ -0,0 +1,294 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import importlib
+from abc import ABC, abstractmethod
+from datetime import datetime
+from enum import Enum
+from typing import Any, Dict, List, Type
+
+from pydantic import (
+    BaseModel,
+    Field,
+    field_serializer,
+    model_validator,
+)
+from typing_extensions import override
+
+from flink_agents.api.chat_message import ChatMessage
+from flink_agents.api.prompts.prompt import Prompt
+
+ItemType = str | ChatMessage
+
+
+class CompactionStrategyType(Enum):
+    """Strategy for compact memory set."""
+
+    SUMMARIZATION = "summarization"
+
+
+class CompactionStrategy(BaseModel, ABC):
+    """Strategy for compact memory set."""
+
+    @property
+    @abstractmethod
+    def type(self) -> CompactionStrategyType:
+        """Return type of this strategy."""
+
+
+class SummarizationStrategy(CompactionStrategy):
+    """Summarization strategy."""
+
+    model: str
+    prompt: str | Prompt | None = None
+
+    @property
+    @override
+    def type(self) -> CompactionStrategyType:
+        return CompactionStrategyType.SUMMARIZATION
+
+
+class LongTermMemoryBackend(Enum):
+    """Backend for Long-Term Memory."""
+
+    VectorStore = "vectorstore"
+
+
+class DatetimeRange(BaseModel):
+    """Represents a datetime range."""
+
+    start: datetime
+    end: datetime
+
+
+class MemorySetItem(BaseModel):
+    """Represents a long term memory item retrieved from vector store.
+
+    Attributes:
+        memory_set_name: The name of the memory set this item belongs to.
+        id: The id of this item.
+        value: The value of this item.
+        compacted: Whether this item has been compacted.
+        created_time: The timestamp this item was added to the memory set.
+        last_accessed_time: The timestamp this item was last accessed.
+        additional_metadata: Additional metadata for this item.
+    """
+
+    memory_set_name: str
+    id: str
+    value: Any
+    compacted: bool = False
+    created_time: datetime | DatetimeRange = None
+    last_accessed_time: datetime
+    additional_metadata: Dict[str, Any] | None = None
+
+
+class MemorySet(BaseModel):
+    """Represents a long term memory set contains memory items.
+
+    Attributes:
+        name: The name of this memory set.
+        item_type: The type of items stored in this set.
+        capacity: The capacity of this memory set.
+        compaction_strategy: Compaction strategy and additional arguments used
+        to compact memory set.
+        size: The size of this memory set.
+    """
+
+    name: str
+    item_type: Type[str] | Type[ChatMessage]
+    capacity: int
+    compaction_strategy: CompactionStrategy
+    size: int = Field(default=0, exclude=True)

Review Comment:
   It's fragile to maintain the size separately from the underlying actual 
collection. We might consider get the size from the store.



##########
python/flink_agents/runtime/memory/compaction_functions.py:
##########
@@ -0,0 +1,149 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import TYPE_CHECKING, List, Type, cast
+
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.memory.long_term_memory import (
+    BaseLongTermMemory,
+    MemorySet,
+    MemorySetItem,
+    SummarizationStrategy,
+)
+from flink_agents.api.resource import ResourceType
+from flink_agents.api.runner_context import RunnerContext
+
+if TYPE_CHECKING:
+    from flink_agents.api.chat_models.chat_model import BaseChatModelSetup
+    from flink_agents.api.prompts.prompt import Prompt
+
+
+def summarize(

Review Comment:
   It doesn't make sense to always compact the entire long term memory into 1 
message. I think we should only merge similar messages, and discard meaning 
less one. As a result, we should get a smaller set of messages with higher 
information density.



##########
python/flink_agents/api/vector_stores/vector_store.py:
##########
@@ -176,3 +250,74 @@ def query_embedding(self, embedding: List[float], limit: 
int = 10, **kwargs: Any
         Returns:
             List of documents matching the search criteria
         """
+
+    @abstractmethod
+    def add_embedding(
+        self,
+        *,
+        documents: List[Document],
+        embeddings: List[List[float]],
+        collection_name: str | None = None,
+        **kwargs: Any,
+    ) -> List[str]:
+        """Add documents with pre-computed embeddings to the vector store.
+
+        Args:
+            documents: Documents to add to the vector store
+            embeddings: Pre-computed embedding vector for each document
+            collection_name: The collection name of the documents to add. 
Optional.
+            **kwargs: Vector store-specific parameters (collection, namespace, 
etc.)
+
+        Returns:
+            List of document IDs that were added to the vector store
+        """
+
+
+class Collection(BaseModel):
+    """Represents a collection of documents."""
+    name: str
+    size: int
+    metadata: Dict[str, Any] | None = None
+
+
+class CollectionManageableVectorStore(BaseVectorStore, ABC):
+    """Base abstract class for vector store which support collection 
management."""
+
+    @abstractmethod
+    def get_or_create_collection(
+        self, name: str, metadata: Dict[str, Any]
+    ) -> Collection:
+        """Get a collection, or create it if it doesn't exist.
+
+        Args:
+            name: Name of the collection
+            metadata: Metadata of the collection
+        Returns:
+            The retrieved or created collection
+        """
+
+    @abstractmethod
+    def get_collection(self, name: str) -> Collection:
+        """Get a collection, raise an exception if it doesn't exist.
+
+        Args:
+            name: Name of the collection
+        Returns:
+            The retrieved collection
+        """
+
+    @abstractmethod
+    def delete_collection(self, name: str) -> Collection:
+        """Delete a collection.
+
+        Args:
+            name: Name of the collection
+        Returns:
+            The deleted collection
+        """
+
+def maybe_cast_to_list(value: Any | List[Any]) -> List[Any] | None:

Review Comment:
   Should this be private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to