This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-tools.git
The following commit(s) were added to refs/heads/main by this push:
new b969c64 [Feature][MCP] Add SeaTunnel mcp tool (#1)
b969c64 is described below
commit b969c64575f1590abf26e66a48c876eee4146f06
Author: ocean-zhc <[email protected]>
AuthorDate: Thu Aug 14 16:47:33 2025 +0800
[Feature][MCP] Add SeaTunnel mcp tool (#1)
---
README.md | 16 +
seatunnel-mcp/.env.example | 26 ++
seatunnel-mcp/.gitignore | 43 ++
seatunnel-mcp/.pre-commit-config.yaml | 53 +++
seatunnel-mcp/CHANGELOG.md | 49 ++
seatunnel-mcp/Dockerfile | 37 ++
seatunnel-mcp/LICENSE | 190 ++++++++
seatunnel-mcp/README.md | 190 ++++++++
seatunnel-mcp/README_CN.md | 244 ++++++++++
seatunnel-mcp/RESTful API V2.openapi.json | 508 +++++++++++++++++++++
seatunnel-mcp/docker-compose.yml | 52 +++
seatunnel-mcp/docs/DEVELOPER_GUIDE.md | 186 ++++++++
seatunnel-mcp/docs/QUICK_START.md | 176 +++++++
seatunnel-mcp/docs/USER_GUIDE.md | 273 +++++++++++
seatunnel-mcp/docs/img/img.png | Bin 0 -> 143625 bytes
seatunnel-mcp/docs/img/seatunnel-mcp-logo.png | Bin 0 -> 390468 bytes
seatunnel-mcp/examples/simple_job.conf | 44 ++
seatunnel-mcp/pyproject.toml | 82 ++++
seatunnel-mcp/requirements-dev.txt | 6 +
seatunnel-mcp/run.bat | 52 +++
seatunnel-mcp/run.sh | 49 ++
seatunnel-mcp/src/seatunnel_mcp/__init__.py | 22 +
seatunnel-mcp/src/seatunnel_mcp/__main__.py | 78 ++++
seatunnel-mcp/src/seatunnel_mcp/cli.py | 206 +++++++++
seatunnel-mcp/src/seatunnel_mcp/client.py | 313 +++++++++++++
seatunnel-mcp/src/seatunnel_mcp/schema.py | 94 ++++
seatunnel-mcp/src/seatunnel_mcp/tools.py | 404 ++++++++++++++++
seatunnel-mcp/tests/__init__.py | 17 +
seatunnel-mcp/tests/integration/__init__.py | 17 +
seatunnel-mcp/tests/integration/test_api_client.py | 239 ++++++++++
seatunnel-mcp/tests/test_client.py | 249 ++++++++++
seatunnel-mcp/tests/test_tools.py | 223 +++++++++
32 files changed, 4138 insertions(+)
diff --git a/README.md b/README.md
index 8372d24..9573521 100644
--- a/README.md
+++ b/README.md
@@ -2,4 +2,20 @@
The repository contains tools for Apache SeaTunnel.
+## Tool 1 - SeaTunnel MCP Server
+
+What is MCP?
+- MCP (Model Context Protocol) is an open protocol for connecting LLMs to
tools, data, and systems. With SeaTunnel MCP, you can operate SeaTunnel
directly from an LLM-powered interface while keeping the server-side logic
secure and auditable.
+- Learn more: https://github.com/modelcontextprotocol
+
+SeaTunnel MCP Server
+- Source folder: [seatunnel-mcp/](seatunnel-mcp/)
+- English README: [seatunnel-mcp/README.md](seatunnel-mcp/README.md)
+- Chinese: [seatunnel-mcp/README_CN.md](seatunnel-mcp/README_CN.md)
+- Quick Start:
[seatunnel-mcp/docs/QUICK_START.md](seatunnel-mcp/docs/QUICK_START.md)
+- User Guide:
[seatunnel-mcp/docs/USER_GUIDE.md](seatunnel-mcp/docs/USER_GUIDE.md)
+- Developer Guide:
[seatunnel-mcp/docs/DEVELOPER_GUIDE.md](seatunnel-mcp/docs/DEVELOPER_GUIDE.md)
+
+For screenshots, demo video, features, installation and usage instructions,
please refer to the README in the seatunnel-mcp directory.
+
Get the main project from [Apache
SeaTunnel](https://github.com/apache/seatunnel)
\ No newline at end of file
diff --git a/seatunnel-mcp/.env.example b/seatunnel-mcp/.env.example
new file mode 100644
index 0000000..23c2441
--- /dev/null
+++ b/seatunnel-mcp/.env.example
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+# SeaTunnel MCP Server Configuration
+
+# MCP server configuration
+MCP_HOST=127.0.0.1
+MCP_PORT=8080
+
+# SeaTunnel API configuration
+SEATUNNEL_API_URL=http://localhost:8090
+SEATUNNEL_API_KEY=your_api_key_here
\ No newline at end of file
diff --git a/seatunnel-mcp/.gitignore b/seatunnel-mcp/.gitignore
new file mode 100644
index 0000000..12e1866
--- /dev/null
+++ b/seatunnel-mcp/.gitignore
@@ -0,0 +1,43 @@
+# Python
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# Virtual environments
+venv/
+env/
+ENV/
+.venv/
+.env/
+
+# Testing
+.coverage
+htmlcov/
+.pytest_cache/
+.tox/
+
+# IDE
+.idea/
+.vscode/
+*.swp
+*.swo
+
+# Project specific
+.env
\ No newline at end of file
diff --git a/seatunnel-mcp/.pre-commit-config.yaml
b/seatunnel-mcp/.pre-commit-config.yaml
new file mode 100644
index 0000000..bc9bd62
--- /dev/null
+++ b/seatunnel-mcp/.pre-commit-config.yaml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+repos:
+- repo: https://github.com/pre-commit/pre-commit-hooks
+ rev: v4.4.0
+ hooks:
+ - id: trailing-whitespace
+ - id: end-of-file-fixer
+ - id: check-yaml
+ - id: check-added-large-files
+ - id: check-json
+ - id: debug-statements
+ - id: check-merge-conflict
+
+- repo: https://github.com/pycqa/isort
+ rev: 5.12.0
+ hooks:
+ - id: isort
+ args: ["--profile", "black", "--line-length", "100"]
+
+- repo: https://github.com/psf/black
+ rev: 23.3.0
+ hooks:
+ - id: black
+ args: ["--line-length", "100"]
+
+- repo: https://github.com/pycqa/flake8
+ rev: 6.0.0
+ hooks:
+ - id: flake8
+ args: ["--max-line-length", "100", "--extend-ignore", "E203,E501"]
+ exclude: ^tests/
+
+- repo: https://github.com/pre-commit/mirrors-mypy
+ rev: v1.3.0
+ hooks:
+ - id: mypy
+ additional_dependencies: [types-requests, types-setuptools,
types-PyYAML]
+ exclude: ^tests/
\ No newline at end of file
diff --git a/seatunnel-mcp/CHANGELOG.md b/seatunnel-mcp/CHANGELOG.md
new file mode 100644
index 0000000..e6acfe6
--- /dev/null
+++ b/seatunnel-mcp/CHANGELOG.md
@@ -0,0 +1,49 @@
+# Changelog
+
+记录 SeaTunnel MCP 项目的所有重要变更。
+
+格式基于 [Keep a Changelog](https://keepachangelog.com/zh-CN/1.0.0/),
+本项目遵循 [语义化版本](https://semver.org/lang/zh-CN/spec/v2.0.0.html)。
+
+## [1.1.0] - 2025-04-10
+
+### 新增
+
+- 添加 `submit-jobs` 和`submit-job/upload` 工具用于批量提交作业
+ - 允许通过单个 API 调用同时提交多个作业
+ - 用户输入直接作为请求体传递给 API,无需额外封装
+ - 支持标准 JSON 格式的作业配置
+
+### 改进
+
+- 更新文档,增加批量提交作业的示例和说明
+- 添加全面的测试覆盖新功能
+
+## [1.0.0] - 2025-04-01
+
+### 新增
+
+- 首个稳定正式版本
+- 完善文档和测试
+- 加强错误处理
+- 改进用户体验
+
+## [0.1.0] - 2023-07-15
+
+### 新增
+
+- 初始版本发布
+- 实现基本的 SeaTunnel REST API 客户端
+- 实现基于 MCP 的 SeaTunnel 工具集
+ - 连接管理工具
+ - 作业管理工具
+ - 系统监控工具
+- 创建项目文档
+ - 用户指南
+ - 开发者指南
+- 添加启动脚本和示例配置
+- 添加基本单元测试
+
+### 已修复
+
+- 无(初始版本)
\ No newline at end of file
diff --git a/seatunnel-mcp/Dockerfile b/seatunnel-mcp/Dockerfile
new file mode 100644
index 0000000..49af152
--- /dev/null
+++ b/seatunnel-mcp/Dockerfile
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+FROM python:3.12-slim
+
+WORKDIR /app
+
+# 复制项目文件
+COPY . .
+
+# 安装依赖
+RUN pip install --no-cache-dir -e .
+
+# 暴露端口
+EXPOSE 8080
+
+# 设置环境变量
+ENV MCP_HOST=0.0.0.0
+ENV MCP_PORT=8080
+ENV SEATUNNEL_API_URL=http://seatunnel:8090
+
+# 启动服务
+CMD ["python", "-m", "src.seatunnel_mcp"]
\ No newline at end of file
diff --git a/seatunnel-mcp/LICENSE b/seatunnel-mcp/LICENSE
new file mode 100644
index 0000000..0ea9a6d
--- /dev/null
+++ b/seatunnel-mcp/LICENSE
@@ -0,0 +1,190 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ Copyright 2023 SeaTunnel MCP Team
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
\ No newline at end of file
diff --git a/seatunnel-mcp/README.md b/seatunnel-mcp/README.md
new file mode 100644
index 0000000..91bea0c
--- /dev/null
+++ b/seatunnel-mcp/README.md
@@ -0,0 +1,190 @@
+# SeaTunnel MCP Server
+
+A Model Context Protocol (MCP) server for interacting with SeaTunnel through
LLM interfaces like Claude.
+
+
+
+
+
+## Operation Video
+
+To help you better understand the features and usage of SeaTunnel MCP, we
provide a video demonstration. Please refer to the link below or directly check
the video file in the project documentation directory.
+
+https://www.youtube.com/watch?v=JaLA8EkZD7Q
+
+[](https://www.youtube.com/watch?v=JaLA8EkZD7Q)
+
+
+> **Tip**: If the video does not play directly, make sure your device supports
MP4 format and try opening it with a modern browser or video player.
+
+
+## Features
+
+* Job management (submit, stop, monitor)
+* System monitoring and information retrieval
+* REST API interaction with SeaTunnel services
+* Built-in logging and monitoring tools
+* Dynamic connection configuration
+* Comprehensive job information and statistics
+
+## Installation
+
+```bash
+# Clone repository
+git clone <repository_url>
+cd seatunnel-mcp
+
+# Create virtual environment and install
+python -m venv .venv
+source .venv/bin/activate # On Windows: .venv\Scripts\activate
+pip install -e .
+```
+
+## Requirements
+
+* Python ≥ 3.12
+* Running SeaTunnel instance
+* Node.js (for testing with MCP Inspector)
+
+## Usage
+
+### Environment Variables
+
+```
+SEATUNNEL_API_URL=http://localhost:8090 # Default SeaTunnel REST API URL
+SEATUNNEL_API_KEY=your_api_key # Optional: Default SeaTunnel API key
+```
+
+### Dynamic Connection Configuration
+
+The server provides tools to view and update connection settings at runtime:
+
+* `get-connection-settings`: View current connection URL and API key status
+* `update-connection-settings`: Update URL and/or API key to connect to a
different SeaTunnel instance
+
+Example usage through MCP:
+
+```json
+// Get current settings
+{
+ "name": "get-connection-settings"
+}
+
+// Update connection settings
+{
+ "name": "update-connection-settings",
+ "arguments": {
+ "url": "http://new-host:8090",
+ "api_key": "new-api-key"
+ }
+}
+```
+
+### Job Management
+
+The server provides tools to submit and manage SeaTunnel jobs:
+
+* `submit-job`: Submit a new job with job configuration
+* `submit-jobs`: Submit multiple jobs in batch
+* `stop-job`: Stop a running job
+* `get-job-info`: Get detailed information about a specific job
+* `get-running-jobs`: List all currently running jobs
+* `get-finished-jobs`: List all finished jobs by state (FINISHED, CANCELED,
FAILED, etc.)
+
+### Running the Server
+
+```bash
+python -m src.seatunnel_mcp
+```
+
+### Usage with Claude Desktop
+
+To use this with Claude Desktop, add the following to your
`claude_desktop_config.json`:
+
+```json
+{
+ "mcpServers": {
+ "seatunnel": {
+ "command": "python",
+ "args": ["-m", "src.seatunnel_mcp"],
+ "cwd": "Project root directory"
+ }
+ }
+}
+```
+
+### Testing with MCP Inspector
+
+```bash
+npx @modelcontextprotocol/inspector python -m src.seatunnel_mcp
+```
+
+## Available Tools
+
+### Connection Management
+
+* `get-connection-settings`: View current SeaTunnel connection URL and API key
status
+* `update-connection-settings`: Update URL and/or API key to connect to a
different instance
+
+### Job Management
+
+* `submit-job`: Submit a new job with configuration in HOCON format
+* `submit-job/upload`: submit job source upload configuration file
+* `submit-jobs`: Submit multiple jobs in batch, directly passing user input as
request body
+* `stop-job`: Stop a running job with optional savepoint
+* `get-job-info`: Get detailed information about a specific job
+* `get-running-jobs`: List all currently running jobs
+* `get-running-job`: Get details about a specific running job
+* `get-finished-jobs`: List all finished jobs by state
+
+### System Monitoring
+
+* `get-overview`: Get an overview of the SeaTunnel cluster
+* `get-system-monitoring-information`: Get detailed system monitoring
information
+
+## Changelog
+
+### v1.2.0 (2025-06-10)
+
+**New Features in v1.2.0**
+- **SSE Support**: Added `st-mcp-sse` for real-time communication with
SeaTunnel MCP via Server-Sent Events (SSE). Corresponding sse branch
+- **UV/Studio Mode**: Added `st-mcp-uv` (or `st-mcp-studio`) to support
running the MCP server using the `uv` tool for improved performance and async
support. Corresponding to uv branch
+
+#### Example `claude_desktop_config.json`:
+
+```json
+{
+ "mcpServers": {
+ "st-mcp-sse": {
+ "url": "http://your-server:18080/sse"
+ },
+ "st-mcp-uv": {
+ "command": "uv",
+ "args": ["run", "seatunnel-mcp"],
+ "env": {
+ "SEATUNNEL_API_URL": "http://127.0.0.1:8080"
+ }
+ }
+ }
+}
+
+```
+
+### v1.1.0 (2025-04-10)
+
+- **New Feature**: Added `submit-jobs` and `submit-job/upload` tool for batch
job submission and Document submission operations
+ - Allows submitting multiple jobs at once with a single API call
+ - Input is passed directly as the request body to the API
+ - Supports JSON format for job configurations
+ - Allow submission of jobs based on documents
+
+### v1.0.0 (Initial Release)
+
+- Initial release with basic SeaTunnel integration capabilities
+- Job management tools (submit, stop, monitor)
+- System monitoring tools
+- Connection configuration utilities
+
+## License
+
+Apache License
\ No newline at end of file
diff --git a/seatunnel-mcp/README_CN.md b/seatunnel-mcp/README_CN.md
new file mode 100644
index 0000000..9f345dc
--- /dev/null
+++ b/seatunnel-mcp/README_CN.md
@@ -0,0 +1,244 @@
+# SeaTunnel MCP 服务器
+
+SeaTunnel MCP(Model Context Protocol)服务器,提供与大型语言模型(如 Claude)交互的能力,使其能够操作
SeaTunnel 任务。
+
+
+
+
+
+## 操作视频
+
+为了帮助您更好地了解 SeaTunnel MCP 的功能和使用方法,我们提供了一段操作视频演示。请参考以下链接或直接在项目文档目录中查看视频文件。
+
+https://www.bilibili.com/video/BV1UXZgY8EqS
+
+> **提示**:如果视频无法直接播放,请确保您的设备支持 MP4 格式,并尝试使用现代浏览器或视频播放器打开。
+
+
+## 功能特点
+
+* **作业管理**:提交、停止、监控 SeaTunnel 作业
+* **系统监控**:获取集群概览和详细的系统监控信息
+* **REST API 交互**:与 SeaTunnel 服务进行无缝交互
+* **内置日志和监控工具**:全面的日志和监控功能
+* **动态连接配置**:能够在运行时切换不同的 SeaTunnel 实例
+* **全面的作业信息**:提供详细的作业运行状态和统计数据
+
+## 安装
+
+```bash
+# 克隆仓库
+git clone <仓库URL>
+cd seatunnel-mcp
+
+# 创建虚拟环境并安装
+python -m venv .venv
+source .venv/bin/activate # Windows 系统: .venv\Scripts\activate
+pip install -e .
+
+# 或者直接使用提供的脚本
+./run.sh # Linux/Mac
+run.bat # Windows
+```
+
+## 系统要求
+
+* Python ≥ 3.12
+* 运行中的 SeaTunnel 实例
+* Node.js(用于 MCP Inspector 测试)
+
+## 使用方法
+
+### 环境变量配置
+
+```
+SEATUNNEL_API_URL=http://localhost:8090 # 默认 SeaTunnel REST API URL
+SEATUNNEL_API_KEY=your_api_key # 可选:默认 SeaTunnel API 密钥
+```
+
+### 命令行工具
+
+SeaTunnel MCP 提供了命令行工具,方便启动和配置服务器:
+
+```bash
+# 显示帮助信息
+seatunnel-mcp --help
+
+# 初始化环境配置文件
+seatunnel-mcp init
+
+# 运行 MCP 服务器
+seatunnel-mcp run --api-url http://your-seatunnel:8090
+
+# 为 Claude Desktop 配置 MCP 服务器
+seatunnel-mcp configure-claude
+```
+
+### 动态连接配置
+
+服务器提供了工具来查看和更新运行时的连接设置:
+
+* `get-connection-settings`:查看当前连接 URL 和 API 密钥状态
+* `update-connection-settings`:更新 URL 和/或 API 密钥以连接到不同的 SeaTunnel 实例
+
+MCP 使用示例:
+
+```json
+// 获取当前设置
+{
+ "name": "get-connection-settings"
+}
+
+// 更新连接设置
+{
+ "name": "update-connection-settings",
+ "arguments": {
+ "url": "http://new-host:8090",
+ "api_key": "new-api-key"
+ }
+}
+```
+
+### 作业管理
+
+服务器提供工具来提交和管理 SeaTunnel 作业:
+
+* `submit-job`:提交新作业
+* `submit-jobs`:批量提交多个作业
+* `stop-job`:停止运行中的作业
+* `get-job-info`:获取特定作业的详细信息
+* `get-running-jobs`:列出所有正在运行的作业
+* `get-running-job`:获取特定运行中作业的详情
+* `get-finished-jobs`:按状态列出已完成的作业
+
+### 运行服务器
+
+```bash
+python -m src.seatunnel_mcp
+# 或者使用命令行工具
+seatunnel-mcp run
+```
+
+### 与 Claude Desktop 集成
+
+要在 Claude Desktop 中使用,请在 `claude_desktop_config.json` 中添加以下内容:
+
+```json
+{
+ "mcpServers": {
+ "seatunnel": {
+ "command": "python",
+ "args": ["-m", "src.seatunnel_mcp"],
+ "cwd": "Project root directory"
+ }
+ }
+}
+```
+
+### 使用 MCP Inspector 测试
+
+```bash
+npx @modelcontextprotocol/inspector python -m src.seatunnel_mcp
+```
+
+## 可用工具
+
+### 连接管理
+
+* `get-connection-settings`:查看当前 SeaTunnel 连接 URL 和 API 密钥状态
+* `update-connection-settings`:更新 URL 和/或 API 密钥以连接到不同实例
+
+### 作业管理
+
+* `submit-job`:提交新作业
+* `submit-jobs`:批量提交多个作业,直接将用户输入作为请求体传递
+* `submit-job/upload`:提交作业来源上传配置文件
+* `stop-job`:停止运行中的作业
+* `get-job-info`:获取特定作业的详细信息
+* `get-running-jobs`:列出所有正在运行的作业
+* `get-running-job`:获取特定运行中作业的详情
+* `get-finished-jobs`:按状态列出已完成的作业
+
+### 系统监控
+
+* `get-overview`:获取 SeaTunnel 集群概览
+* `get-system-monitoring-information`:获取详细的系统监控信息
+
+
+
+## 开发
+
+如果您想为项目贡献代码:
+
+1. 克隆仓库并设置开发环境:
+ ```bash
+ python -m venv .venv
+ source .venv/bin/activate
+ pip install -e ".[dev]"
+ ```
+
+2. 安装预提交钩子:
+ ```bash
+ pip install pre-commit
+ pre-commit install
+ ```
+
+3. 运行测试:
+ ```bash
+ pytest -xvs tests/
+ ```
+
+详细的开发指南请参阅 [开发者指南](docs/DEVELOPER_GUIDE.md)。
+
+## 贡献
+
+1. Fork 仓库
+2. 创建功能分支
+3. 提交变更
+4. 创建 Pull Request
+
+## 更新日志
+
+### v1.2.0 (2025-06-09)
+**v1.2.0 新功能**
+
+- **SSE 实时通信**:新增 `st-mcp-sse`,支持通过 Server-Sent Events(SSE)协议与 SeaTunnel MCP
实现实时数据推送。 对应sse分支
+- **UV/Studio 模式**:新增 `st-mcp-uv`(或 `st-mcp-studio`),支持通过 `uv` 工具运行 MCP
服务器,提升异步和高性能场景下的运行效率。对应uv分支
+
+#### `claude_desktop_config.json` 配置示例:
+
+```json
+{
+ "mcpServers": {
+ "st-mcp-sse": {
+ "url": "http://your-server:18080/sse"
+ },
+ "st-mcp-uv": {
+ "command": "uv",
+ "args": ["run", "seatunnel-mcp"],
+ "env": {
+ "SEATUNNEL_API_URL": "http://127.0.0.1:8080"
+ }
+ }
+ }
+}
+```
+
+### v1.1.0 (2025-04-10)
+
+- **新功能**:添加了 `submit-jobs` 和`submit-job/upload` 工具用于批量提交作业 和 文件提交作业
+ - 允许通过单个 API 调用同时提交多个作业
+ - 用户输入直接作为请求体传递给 API
+ - 支持 JSON 格式的作业配置
+ - 允许根据文件提交作业
+
+### v1.0.0 (初始版本)
+
+- 初始版本,具备基本的 SeaTunnel 集成能力
+- 作业管理工具(提交、停止、监控)
+- 系统监控工具
+- 连接配置实用工具
+
+## 许可证
+
+Apache License
\ No newline at end of file
diff --git a/seatunnel-mcp/RESTful API V2.openapi.json b/seatunnel-mcp/RESTful
API V2.openapi.json
new file mode 100644
index 0000000..d81bb57
--- /dev/null
+++ b/seatunnel-mcp/RESTful API V2.openapi.json
@@ -0,0 +1,508 @@
+{
+ "openapi": "3.0.1",
+ "info": {
+ "title": "RESTful API V2",
+ "description": "",
+ "version": "1.0.0"
+ },
+ "tags": [],
+ "paths": {
+ "/submit-job": {
+ "post": {
+ "summary": "hive2es_qymp",
+ "deprecated": false,
+ "description": "",
+ "tags": [],
+ "parameters": [
+ {
+ "name": "jobId",
+ "in": "query",
+ "description": "",
+ "required": false,
+ "schema": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "jobName",
+ "in": "query",
+ "description": "",
+ "required": false,
+ "example": "hive2es_qymp",
+ "schema": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "isStartWithSavePoint",
+ "in": "query",
+ "description": "",
+ "required": false,
+ "schema": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "format",
+ "in": "query",
+ "description": "",
+ "required": false,
+ "example": "hocon",
+ "schema": {
+ "type": "string"
+ }
+ }
+ ],
+ "requestBody": {
+ "content": {
+ "text/plain": {
+ "schema": {
+ "type": "string"
+ },
+ "example": "env {\n job.mode = \"batch\"\n}\n \nsource {\n
Jdbc {\n url = \"jdbc:hive2://ip:10000/default\"\n user =
\"hive\"\n password = \"hive\"\n driver =
\"org.apache.hive.jdbc.HiveDriver\"\n connection_check_timeout_sec =
100\n query = \"select * from tabl1 limit 100\"\n }\n}\n\ntransform
{\n}\n\nsink {\n Elasticsearch {\n hosts = [\"http://ip:9200\"]\n
index = \"index1\"\n username = \"\"\n [...]
+ }
+ }
+ },
+ "responses": {
+ "200": {
+ "description": "",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {}
+ }
+ }
+ },
+ "headers": {}
+ }
+ },
+ "security": []
+ }
+ },
+ "/submit-jobs": {
+ "post": {
+ "summary": "批量提交作业",
+ "deprecated": false,
+ "description": "",
+ "tags": [],
+ "parameters": [],
+ "requestBody": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {}
+ },
+ "example": [
+ {
+ "params": {
+ "jobId": "123456",
+ "jobName": "SeaTunnel-01"
+ },
+ "env": {
+ "job.mode": "batch"
+ },
+ "source": [
+ {
+ "plugin_name": "FakeSource",
+ "plugin_output": "fake",
+ "row.num": 1000,
+ "schema": {
+ "fields": {
+ "name": "string",
+ "age": "int",
+ "card": "int"
+ }
+ }
+ }
+ ],
+ "transform": [],
+ "sink": [
+ {
+ "plugin_name": "Console",
+ "plugin_input": [
+ "fake"
+ ]
+ }
+ ]
+ },
+ {
+ "params": {
+ "jobId": "1234567",
+ "jobName": "SeaTunnel-02"
+ },
+ "env": {
+ "job.mode": "batch"
+ },
+ "source": [
+ {
+ "plugin_name": "FakeSource",
+ "plugin_output": "fake",
+ "row.num": 1000,
+ "schema": {
+ "fields": {
+ "name": "string",
+ "age": "int",
+ "card": "int"
+ }
+ }
+ }
+ ],
+ "transform": [],
+ "sink": [
+ {
+ "plugin_name": "Console",
+ "plugin_input": [
+ "fake"
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ }
+ },
+ "responses": {
+ "200": {
+ "description": "",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {}
+ }
+ }
+ },
+ "headers": {}
+ }
+ },
+ "security": []
+ }
+ },
+ "/submit-job/upload": {
+ "post": {
+ "summary": "提交作业来源上传配置文件",
+ "deprecated": false,
+ "description": "",
+ "tags": [],
+ "parameters": [
+ {
+ "name": "jobId",
+ "in": "query",
+ "description": "",
+ "required": false,
+ "example": "",
+ "schema": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "jobName",
+ "in": "query",
+ "description": "",
+ "required": false,
+ "example": "hive2es_qymp_file",
+ "schema": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "isStartWithSavePoint",
+ "in": "query",
+ "description": "",
+ "required": false,
+ "example": "",
+ "schema": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "format",
+ "in": "query",
+ "description": "",
+ "required": false,
+ "example": "hocon",
+ "schema": {
+ "type": "string"
+ }
+ }
+ ],
+ "requestBody": {
+ "content": {
+ "multipart/form-data": {
+ "schema": {
+ "type": "object",
+ "properties": {
+ "config_file": {
+ "format": "binary",
+ "type": "string",
+ "example": ""
+ }
+ }
+ },
+ "examples": {}
+ }
+ }
+ },
+ "responses": {
+ "200": {
+ "description": "",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {}
+ }
+ }
+ },
+ "headers": {}
+ }
+ },
+ "security": []
+ }
+ },
+ "/overview": {
+ "get": {
+ "summary": "返回Zeta集群的概览",
+ "deprecated": false,
+ "description": "",
+ "tags": [],
+ "parameters": [
+ {
+ "name": "tag1",
+ "in": "query",
+ "description": "",
+ "required": false,
+ "example": "value1",
+ "schema": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "tag2",
+ "in": "query",
+ "description": "",
+ "required": false,
+ "example": "value2",
+ "schema": {
+ "type": "string"
+ }
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {}
+ }
+ }
+ },
+ "headers": {}
+ }
+ },
+ "security": []
+ }
+ },
+ "/running-jobs": {
+ "get": {
+ "summary": "返回所有作业及其当前状态的概览",
+ "deprecated": false,
+ "description": "",
+ "tags": [],
+ "parameters": [],
+ "responses": {
+ "200": {
+ "description": "",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {}
+ }
+ }
+ },
+ "headers": {}
+ }
+ },
+ "security": []
+ }
+ },
+ "/job-info/{jobId}": {
+ "get": {
+ "summary": "返回作业的详细信息",
+ "deprecated": false,
+ "description": "",
+ "tags": [],
+ "parameters": [
+ {
+ "name": "jobId",
+ "in": "path",
+ "description": "",
+ "required": true,
+ "example": 934327465587769300,
+ "schema": {
+ "type": "number"
+ }
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {}
+ }
+ }
+ },
+ "headers": {}
+ }
+ },
+ "security": []
+ }
+ },
+ "/running-job/{jobId}": {
+ "get": {
+ "summary": "返回作业的详细信息",
+ "deprecated": false,
+ "description": "",
+ "tags": [],
+ "parameters": [
+ {
+ "name": "jobId",
+ "in": "path",
+ "description": "",
+ "required": true,
+ "example": 1,
+ "schema": {
+ "type": "number"
+ }
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {}
+ }
+ }
+ },
+ "headers": {}
+ }
+ },
+ "security": []
+ }
+ },
+ "/finished-jobs/{state}": {
+ "get": {
+ "summary": "返回所有已完成的作业信息",
+ "deprecated": false,
+ "description": "",
+ "tags": [],
+ "parameters": [
+ {
+ "name": "state",
+ "in": "path",
+ "description": "finished job status.
FINISHED,CANCELED,FAILED,UNKNOWABLE",
+ "required": true,
+ "example": "FINISHED",
+ "schema": {
+ "type": "string"
+ }
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {}
+ }
+ }
+ },
+ "headers": {}
+ }
+ },
+ "security": []
+ }
+ },
+ "/system-monitoring-information": {
+ "get": {
+ "summary": "返回系统监控信息",
+ "deprecated": false,
+ "description": "",
+ "tags": [],
+ "parameters": [],
+ "responses": {
+ "200": {
+ "description": "",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {}
+ }
+ }
+ },
+ "headers": {}
+ }
+ },
+ "security": []
+ }
+ },
+ "/stop-job": {
+ "post": {
+ "summary": "停止作业",
+ "deprecated": false,
+ "description": "",
+ "tags": [],
+ "parameters": [],
+ "requestBody": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {}
+ },
+ "example": {
+ "jobId": 944829786826473500,
+ "isStopWithSavePoint": false
+ }
+ }
+ }
+ },
+ "responses": {
+ "200": {
+ "description": "",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {}
+ }
+ }
+ },
+ "headers": {}
+ }
+ },
+ "security": []
+ }
+ }
+ },
+ "components": {
+ "schemas": {},
+ "securitySchemes": {}
+ },
+ "servers": [],
+ "security": []
+}
\ No newline at end of file
diff --git a/seatunnel-mcp/docker-compose.yml b/seatunnel-mcp/docker-compose.yml
new file mode 100644
index 0000000..e4e8955
--- /dev/null
+++ b/seatunnel-mcp/docker-compose.yml
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3'
+
+services:
+ seatunnel:
+ image: apache/seatunnel:latest
+ container_name: seatunnel
+ ports:
+ - "8090:8090"
+ volumes:
+ - ./examples:/opt/seatunnel/config
+ networks:
+ - seatunnel-net
+ healthcheck:
+ test: ["CMD", "curl", "-f", "http://localhost:8090/overview"]
+ interval: 10s
+ timeout: 5s
+ retries: 3
+ start_period: 30s
+
+ seatunnel-mcp:
+ build:
+ context: .
+ dockerfile: Dockerfile
+ container_name: seatunnel-mcp
+ ports:
+ - "8080:8080"
+ environment:
+ - SEATUNNEL_API_URL=http://seatunnel:8090
+ depends_on:
+ seatunnel:
+ condition: service_healthy
+ networks:
+ - seatunnel-net
+
+networks:
+ seatunnel-net:
+ driver: bridge
\ No newline at end of file
diff --git a/seatunnel-mcp/docs/DEVELOPER_GUIDE.md
b/seatunnel-mcp/docs/DEVELOPER_GUIDE.md
new file mode 100644
index 0000000..0c7e20e
--- /dev/null
+++ b/seatunnel-mcp/docs/DEVELOPER_GUIDE.md
@@ -0,0 +1,186 @@
+# SeaTunnel MCP 开发者指南
+
+本指南为 SeaTunnel MCP 项目的开发者提供指导,包括如何设置开发环境、代码贡献流程和测试方法。
+
+## 开发环境设置
+
+1. 克隆仓库并设置开发环境:
+
+```bash
+# 克隆仓库
+git clone <repository_url>
+cd seatunnel-mcp
+
+# 创建并激活虚拟环境
+python -m venv .venv
+source .venv/bin/activate # Windows: .venv\Scripts\activate
+
+# 安装开发依赖
+pip install -e ".[dev]"
+```
+
+2. 安装预提交钩子(可选但推荐):
+
+```bash
+pip install pre-commit
+pre-commit install
+```
+
+## 项目结构
+
+```
+seatunnel-mcp/
+├── docs/ # 文档
+├── examples/ # 示例配置文件
+├── src/
+│ └── seatunnel_mcp/ # 主要源代码
+│ ├── __init__.py
+│ ├── __main__.py # 入口点
+│ ├── client.py # SeaTunnel API 客户端
+│ ├── tools.py # MCP 工具定义
+│ └── schema.py # 数据模型定义
+├── tests/ # 测试
+├── .env.example # 环境变量示例
+├── pyproject.toml # 项目配置
+└── README.md # 项目说明
+```
+
+## 开发流程
+
+### 添加新的工具
+
+1. 首先在 `client.py` 中为 SeaTunnel API 添加相应的方法
+2. 在 `tools.py` 中创建相应的 MCP 工具
+3. 更新 `get_all_tools()` 函数以包含新工具
+4. 添加单元测试
+
+示例:添加一个新的工具来获取日志信息
+
+```python
+# 在 client.py 中
+def get_job_logs(self, jobId: Union[str, int]) -> Dict[str, Any]:
+ """获取作业日志信息。"""
+ response = self._make_request("GET", f"/job-logs/{jobId}")
+ return response.json()
+
+# 在 tools.py 中
+def get_job_logs_tool(client: SeaTunnelClient) -> Tool:
+ """获取作业日志的工具。"""
+ async def get_job_logs(jobId: Union[str, int]) -> Dict[str, Any]:
+ return client.get_job_logs(jobId=jobId)
+
+ return Tool(
+ name="get-job-logs",
+ description="获取指定作业的日志信息",
+ fn=AsyncToolFn(get_job_logs),
+ parameters=models.ParametersSchema(
+ properties={
+ "jobId": models.ParameterSchema(
+ type="string",
+ description="作业 ID",
+ ),
+ },
+ required=["jobId"],
+ ),
+ )
+
+# 更新 get_all_tools 函数
+def get_all_tools(client: SeaTunnelClient) -> List[Tool]:
+ return [
+ # ... 现有工具
+ get_job_logs_tool(client),
+ ]
+```
+
+### 代码风格
+
+- 使用 [Black](https://black.readthedocs.io/) 进行代码格式化
+- 使用 [isort](https://pycqa.github.io/isort/) 排序导入
+- 使用 [mypy](http://mypy-lang.org/) 进行类型检查
+- 使用 [flake8](https://flake8.pycqa.org/) 进行代码风格检查
+
+你可以使用以下命令执行这些检查:
+
+```bash
+# 代码格式化
+black src tests
+
+# 导入排序
+isort src tests
+
+# 类型检查
+mypy src
+
+# 代码风格检查
+flake8 src tests
+```
+
+## 测试
+
+### 单元测试
+
+使用 pytest 运行单元测试:
+
+```bash
+pytest -xvs tests/
+```
+
+带覆盖率报告:
+
+```bash
+pytest --cov=src tests/
+```
+
+### 手动测试
+
+使用 MCP Inspector 进行手动测试:
+
+```bash
+npx @modelcontextprotocol/inspector python -m src.seatunnel_mcp
+```
+
+### 集成测试
+
+对于集成测试,你需要一个运行中的 SeaTunnel 实例。可以使用 Docker 启动一个测试实例:
+
+```bash
+docker run -d --name seatunnel -p 8090:8090 apache/seatunnel:latest
+```
+
+然后运行集成测试:
+
+```bash
+pytest -xvs tests/integration/
+```
+
+## 文档
+
+- 所有公共函数、类和方法应有清晰的 docstring
+- 遵循 [Google Python 风格指南](https://google.github.io/styleguide/pyguide.html)
+- 保持 README.md 和用户指南的最新状态
+
+## 提交 Pull Request
+
+1. 确保所有测试通过
+2. 更新相关文档
+3. 如果添加了新功能,请同时添加相应的测试
+4. 提交 PR 并在描述中详细说明变更内容
+
+## 发布流程
+
+1. 更新版本号(在 `__init__.py` 和 `pyproject.toml` 中)
+2. 更新 CHANGELOG.md
+3. 创建一个新的 git tag
+4. 构建并上传到 PyPI:
+
+```bash
+python -m build
+twine upload dist/*
+```
+
+## 问题与支持
+
+如有问题或需要支持,可以:
+- 提交 GitHub Issue
+- 在 PR 中讨论
+- 联系项目维护者
\ No newline at end of file
diff --git a/seatunnel-mcp/docs/QUICK_START.md
b/seatunnel-mcp/docs/QUICK_START.md
new file mode 100644
index 0000000..e92e244
--- /dev/null
+++ b/seatunnel-mcp/docs/QUICK_START.md
@@ -0,0 +1,176 @@
+# SeaTunnel MCP 快速入门指南
+
+本指南将帮助您快速开始使用 SeaTunnel MCP 服务器,并通过 Claude 等大型语言模型与 SeaTunnel 进行交互。
+
+## 准备工作
+
+确保您已安装以下软件:
+
+- Python 3.9 或更高版本
+- 运行中的 SeaTunnel 实例(或使用 Docker Compose)
+- Claude Desktop(可选,如需与 Claude 集成)
+
+## 安装
+
+### 方法 1:直接安装
+
+```bash
+# 克隆仓库
+git clone <仓库URL>
+cd seatunnel-mcp
+
+# 创建虚拟环境并安装
+python -m venv .venv
+source .venv/bin/activate # Windows: .venv\Scripts\activate
+pip install -e .
+```
+
+### 方法 2:使用 Docker Compose
+
+```bash
+# 克隆仓库
+git clone <仓库URL>
+cd seatunnel-mcp
+
+# 启动 SeaTunnel 和 MCP 服务器
+docker-compose up -d
+```
+
+## 基本用法
+
+### 1. 启动 MCP 服务器
+
+```bash
+# 使用命令行工具
+seatunnel-mcp run
+
+# 或直接运行模块
+python -m src.seatunnel_mcp
+```
+
+### 2. 使用 MCP Inspector 测试
+
+MCP Inspector 是一个用于测试 MCP 服务器的工具。通过它,您可以直接与 MCP 服务器交互,无需 Claude 或其他 LLM。
+
+```bash
+# 安装 Node.js 和 npm 后
+npx @modelcontextprotocol/inspector python -m src.seatunnel_mcp
+```
+
+这将打开一个网页界面,您可以在其中测试各种 MCP 工具。
+
+### 3. 与 Claude Desktop 集成
+
+```bash
+# 配置 Claude Desktop
+seatunnel-mcp configure-claude
+```
+
+然后,重启 Claude Desktop,并在与 Claude 的对话中尝试使用 SeaTunnel。
+
+## 常见使用场景示例
+
+### 提交一个简单的作业
+
+在 Claude 中,您可以这样与 SeaTunnel 交互:
+
+> 请帮我提交一个从 HDFS 到 Elasticsearch 的 SeaTunnel 作业,将 users 表中的数据导入到 users-index
索引中。
+
+Claude 将帮助您创建配置并提交作业,例如:
+
+```
+我将帮您提交一个从 HDFS 到 Elasticsearch 的作业。
+
+首先,让我检查当前的连接设置。
+[Claude 获取连接设置]
+
+现在,我将提交作业配置。以下是我准备的配置:
+
+env {
+ job.mode = "batch"
+}
+
+source {
+ Hdfs {
+ path = "/data/users"
+ format = "json"
+ }
+}
+
+sink {
+ Elasticsearch {
+ hosts = ["http://elasticsearch:9200"]
+ index = "users-index"
+ }
+}
+
+[Claude 提交作业]
+
+作业已成功提交!作业 ID 是 123456。您可以稍后查询这个作业的状态。
+```
+
+### 查看运行中的作业
+
+> 请显示当前所有正在运行的 SeaTunnel 作业。
+
+Claude 将使用 `get-running-jobs` 工具获取并展示所有运行中的作业。
+
+### 停止一个作业
+
+> 请停止作业 ID 为 123456 的作业。
+
+Claude 将使用 `stop-job` 工具停止指定的作业。
+
+## 高级用法
+
+### 创建作业模板
+
+您可以要求 Claude 记住某些常用的作业配置作为模板:
+
+> 请记住这个配置作为 "hdfs-to-es" 模板:
+> ```
+> env {
+> job.mode = "batch"
+> }
+> source {
+> Hdfs {
+> path = "/data/${path}"
+> format = "${format}"
+> }
+> }
+> sink {
+> Elasticsearch {
+> hosts = ["${es_host}"]
+> index = "${index}"
+> }
+> }
+> ```
+
+然后,您可以这样使用模板:
+
+> 使用 "hdfs-to-es" 模板创建作业,参数是:path=users, format=json,
es_host=http://elasticsearch:9200, index=users-index
+
+### 动态切换 SeaTunnel 实例
+
+如果您需要连接到不同的 SeaTunnel 实例:
+
+> 请将连接切换到测试环境的 SeaTunnel,URL 是 http://test-seatunnel:8090
+
+Claude 将使用 `update-connection-settings` 工具更新连接设置。
+
+## 故障排除
+
+如果您遇到问题:
+
+1. 确保 SeaTunnel 实例正在运行并可访问
+2. 检查 MCP 服务器的日志输出
+3. 使用 MCP Inspector 测试各个工具是否正常工作
+4. 检查环境变量和连接设置
+
+详细的故障排除指南请参阅 [用户指南](USER_GUIDE.md) 的"故障排除"部分。
+
+## 下一步
+
+- 阅读 [用户指南](USER_GUIDE.md) 了解更多功能和选项
+- 查看 [示例文件夹](../examples/) 获取更多示例配置
+- 如果您想贡献代码,请参阅 [开发者指南](DEVELOPER_GUIDE.md)
\ No newline at end of file
diff --git a/seatunnel-mcp/docs/USER_GUIDE.md b/seatunnel-mcp/docs/USER_GUIDE.md
new file mode 100644
index 0000000..1a00659
--- /dev/null
+++ b/seatunnel-mcp/docs/USER_GUIDE.md
@@ -0,0 +1,273 @@
+# SeaTunnel MCP User Guide
+
+This guide explains how to use the SeaTunnel Model Context Protocol (MCP)
server with Claude and other LLM interfaces.
+
+## Overview
+
+The SeaTunnel MCP server provides a way for Large Language Models (LLMs) like
Claude to interact with a SeaTunnel cluster. It enables you to:
+
+- Submit and manage SeaTunnel jobs
+- Monitor job status and system health
+- Configure and manage SeaTunnel settings
+
+## Setting Up
+
+1. Install the SeaTunnel MCP server:
+ ```bash
+ git clone <repository_url>
+ cd seatunnel-mcp
+ python -m venv .venv
+ source .venv/bin/activate # On Windows: .venv\Scripts\activate
+ pip install -e .
+ ```
+
+2. Configure environment variables by copying `.env.example` to `.env` and
modifying as needed:
+ ```bash
+ cp .env.example .env
+ # Edit .env to set your SeaTunnel API URL and other settings
+ ```
+
+3. Start the MCP server:
+ ```bash
+ python -m src.seatunnel_mcp
+ ```
+
+## Using with Claude Desktop
+
+To use the SeaTunnel MCP server with Claude Desktop:
+
+1. Add the following to your `claude_desktop_config.json`:
+ ```json
+ {
+ "mcpServers": {
+ "seatunnel": {
+ "command": "python",
+ "args": ["-m", "src.seatunnel_mcp"]
+ }
+ }
+ }
+ ```
+
+2. Restart Claude Desktop for the changes to take effect.
+
+## Available Tools
+
+When interacting with Claude, you can use these tools through natural language:
+
+### Connection Management
+
+- **Get connection settings**: Ask Claude to show the current SeaTunnel
connection settings.
+
+ Example: "What are the current SeaTunnel connection settings?"
+
+- **Update connection settings**: Ask Claude to update the connection to a
different SeaTunnel instance.
+
+ Example: "Change the SeaTunnel connection to http://new-server:8090 with API
key 'my_new_key'"
+
+### Job Management
+
+- **Submit a job**: Ask Claude to submit a new SeaTunnel job.
+
+ Example:
+ ```
+ Please submit this job to SeaTunnel:
+
+ env {
+ job.mode = "batch"
+ }
+
+ source {
+ Jdbc {
+ url = "jdbc:hive2://host:10000/default"
+ query = "select * from test limit 100"
+ }
+ }
+
+ sink {
+ Elasticsearch {
+ hosts = ["http://elastic:9200"]
+ index = "my-index"
+ }
+ }
+ ```
+
+- **Submit multiple jobs in batch**: Ask Claude to submit multiple jobs at
once. Your input will be sent directly as the API request body.
+
+ Example:
+ ```
+ Please submit these jobs in batch:
+
+ [
+ {
+ "params": {
+ "jobId": "123456",
+ "jobName": "Job-1"
+ },
+ "env": {
+ "job.mode": "batch"
+ },
+ "source": [
+ {
+ "plugin_name": "FakeSource",
+ "plugin_output": "fake",
+ "row.num": 1000,
+ "schema": {
+ "fields": {
+ "name": "string",
+ "age": "int"
+ }
+ }
+ }
+ ],
+ "transform": [],
+ "sink": [
+ {
+ "plugin_name": "Console",
+ "plugin_input": ["fake"]
+ }
+ ]
+ },
+ {
+ "params": {
+ "jobId": "789012",
+ "jobName": "Job-2"
+ },
+ "env": {
+ "job.mode": "batch"
+ },
+ "source": [
+ {
+ "plugin_name": "FakeSource",
+ "plugin_output": "fake",
+ "row.num": 500,
+ "schema": {
+ "fields": {
+ "id": "int",
+ "value": "string"
+ }
+ }
+ }
+ ],
+ "transform": [],
+ "sink": [
+ {
+ "plugin_name": "Console",
+ "plugin_input": ["fake"]
+ }
+ ]
+ }
+ ]
+ ```
+
+- **Stop a job**: Ask Claude to stop a running job.
+
+ Example: "Please stop job with ID 12345"
+
+- **Get job information**: Ask Claude to show information about a specific job.
+
+ Example: "Show me details about job 12345"
+
+- **Get running jobs**: Ask Claude to list all running jobs.
+
+ Example: "What jobs are currently running?"
+
+- **Get finished jobs**: Ask Claude to list finished jobs by state.
+
+ Example: "Show me all completed jobs" or "List all failed jobs"
+
+### System Monitoring
+
+- **Get cluster overview**: Ask Claude to show an overview of the SeaTunnel
cluster.
+
+ Example: "Give me an overview of the SeaTunnel cluster"
+
+- **Get system monitoring information**: Ask Claude to show detailed system
monitoring information.
+
+ Example: "Show me the system monitoring information"
+
+## Example Conversations
+
+Here are some example conversations with Claude using the SeaTunnel MCP server:
+
+### Submitting a Job
+
+**You**: I need to transfer data from a Hive table to Elasticsearch. Can you
help me submit a job for this?
+
+**Claude**: I can help you submit a job to transfer data from Hive to
Elasticsearch. Let me walk you through creating and submitting a job
configuration.
+
+First, let me get the current connection settings to make sure we're connected
to the right SeaTunnel instance.
+
+[Claude gets connection settings]
+
+We're currently connected to [SeaTunnel URL]. Now, let's create and submit a
job configuration.
+
+Here's a job I can submit for you that will transfer data from a Hive table to
Elasticsearch. I'll need a few details:
+
+1. The Hive connection URL
+2. Which table you want to query
+3. The Elasticsearch host URL
+4. The target index name
+
+Based on the information you've provided, I'll submit this job:
+
+[Claude submits job with proper configuration]
+
+The job has been submitted successfully! The job ID is [job ID]. You can
monitor its status by asking me about this job's status.
+
+### Monitoring Jobs
+
+**You**: What jobs are currently running?
+
+**Claude**: Let me check the currently running jobs for you.
+
+[Claude gets running jobs]
+
+Here are the currently running jobs in your SeaTunnel cluster:
+
+[Claude displays job information in a formatted table]
+
+Would you like to see more details about any specific job?
+
+## Troubleshooting
+
+If you encounter issues:
+
+1. Check that the SeaTunnel API is accessible at the configured URL
+2. Verify that any API keys are correct
+3. Check the MCP server logs for error messages
+4. Ensure the job configurations follow the correct format
+
+## Advanced Usage
+
+### Custom Connection Settings
+
+You can configure multiple SeaTunnel connections by asking Claude to update
the connection settings:
+
+"Connect to our production SeaTunnel at https://prod-seatunnel:8090 with API
key 'prod_key'"
+
+"Connect to our development SeaTunnel at https://dev-seatunnel:8090"
+
+### Job Configuration Templates
+
+You can ask Claude to save and reuse job templates:
+
+"Remember this job configuration as 'hive-to-elastic' template:
+```
+env {
+ job.mode = "batch"
+}
+source {
+ Jdbc {
+ url = "jdbc:hive2://host:10000/default"
+ query = "select * from ${table} limit ${limit}"
+ }
+}
+sink {
+ Elasticsearch {
+ hosts = ["http://elastic:9200"]
+ index = "${index}"
+ }
+}
+```"
+
+Then later: "Submit a job using the 'hive-to-elastic' template with
table='users', limit=1000, and index='users-index'"
\ No newline at end of file
diff --git a/seatunnel-mcp/docs/img/img.png b/seatunnel-mcp/docs/img/img.png
new file mode 100644
index 0000000..dbc6a4c
Binary files /dev/null and b/seatunnel-mcp/docs/img/img.png differ
diff --git a/seatunnel-mcp/docs/img/seatunnel-mcp-logo.png
b/seatunnel-mcp/docs/img/seatunnel-mcp-logo.png
new file mode 100644
index 0000000..ee33653
Binary files /dev/null and b/seatunnel-mcp/docs/img/seatunnel-mcp-logo.png
differ
diff --git a/seatunnel-mcp/examples/simple_job.conf
b/seatunnel-mcp/examples/simple_job.conf
new file mode 100644
index 0000000..6372c37
--- /dev/null
+++ b/seatunnel-mcp/examples/simple_job.conf
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+env {
+ job.mode = "batch"
+}
+
+source {
+ Jdbc {
+ url = "jdbc:hive2://localhost:10000/default"
+ user = "hive"
+ password = "password"
+ driver = "org.apache.hive.jdbc.HiveDriver"
+ connection_check_timeout_sec = 100
+ query = "select * from test_table limit 100"
+ }
+}
+
+transform {
+ # No transformations in this simple example
+}
+
+sink {
+ Elasticsearch {
+ hosts = ["http://localhost:9200"]
+ index = "test-index"
+ username = ""
+ password = ""
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "APPEND"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-mcp/pyproject.toml b/seatunnel-mcp/pyproject.toml
new file mode 100644
index 0000000..6bd416f
--- /dev/null
+++ b/seatunnel-mcp/pyproject.toml
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+[build-system]
+requires = ["setuptools>=42", "wheel"]
+build-backend = "setuptools.build_meta"
+
+[project]
+name = "seatunnel-mcp"
+version = "0.1.0"
+description = "A Model Context Protocol (MCP) server for interacting with
SeaTunnel through LLM interfaces"
+readme = "README.md"
+requires-python = ">=3.12"
+license = {text = "Apache License"}
+authors = [
+ {name = "SeaTunnel MCP Team"}
+]
+dependencies = [
+ "fastapi>=0.95.0",
+ "uvicorn>=0.21.1",
+ "pydantic>=2.0.0",
+ "httpx>=0.24.0",
+ "python-dotenv>=1.0.0",
+ "requests>=2.28.2",
+ "mcp>=0.1.0",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=7.0.0",
+ "black>=23.1.0",
+ "isort>=5.12.0",
+ "mypy>=1.0.1",
+ "flake8>=6.0.0",
+ "pytest-cov>=4.1.0",
+ "pytest-asyncio>=0.21.0",
+ "build>=1.0.3",
+ "twine>=4.0.2",
+ "pre-commit>=3.3.2",
+]
+
+[project.scripts]
+seatunnel-mcp = "src.seatunnel_mcp.cli:main"
+
+[tool.setuptools]
+packages = ["src.seatunnel_mcp"]
+
+[tool.black]
+line-length = 100
+target-version = ["py39"]
+
+[tool.isort]
+profile = "black"
+line_length = 100
+
+[tool.mypy]
+python_version = "3.12"
+warn_return_any = true
+warn_unused_configs = true
+disallow_untyped_defs = true
+disallow_incomplete_defs = true
+check_untyped_defs = true
+disallow_untyped_decorators = false
+no_implicit_optional = true
+strict_optional = true
+
+[tool.pytest.ini_options]
+testpaths = ["tests"]
+python_files = "test_*.py"
+python_functions = "test_*"
\ No newline at end of file
diff --git a/seatunnel-mcp/requirements-dev.txt
b/seatunnel-mcp/requirements-dev.txt
new file mode 100644
index 0000000..9b32d35
--- /dev/null
+++ b/seatunnel-mcp/requirements-dev.txt
@@ -0,0 +1,6 @@
+pytest>=7.0.0
+black>=23.1.0
+isort>=5.12.0
+mypy>=1.0.1
+flake8>=6.0.0
+pytest-cov>=4.1.0
\ No newline at end of file
diff --git a/seatunnel-mcp/run.bat b/seatunnel-mcp/run.bat
new file mode 100644
index 0000000..72683f5
--- /dev/null
+++ b/seatunnel-mcp/run.bat
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+@echo off
+REM Run the SeaTunnel MCP server
+REM This script sets up the environment and runs the server
+
+REM Check if virtual environment exists, if not create it
+if not exist .venv (
+ echo Creating virtual environment...
+ python -m venv .venv
+)
+
+REM Activate virtual environment
+call .venv\Scripts\activate.bat
+
+REM Install dependencies if needed
+pip show uvicorn >nul 2>&1
+if %ERRORLEVEL% neq 0 (
+ echo Installing dependencies...
+ pip install -e .
+)
+
+REM Check if .env file exists, if not create from example
+if not exist .env (
+ if exist .env.example (
+ echo Creating .env file from .env.example...
+ copy .env.example .env
+ echo Please review and update the .env file with your SeaTunnel API
settings.
+ ) else (
+ echo Warning: .env.example file not found. Please create a .env file
manually.
+ )
+)
+
+REM Run the server
+echo Starting SeaTunnel MCP server...
+python -m src.seatunnel_mcp
+
+REM Deactivate virtual environment when finished
+call deactivate
\ No newline at end of file
diff --git a/seatunnel-mcp/run.sh b/seatunnel-mcp/run.sh
new file mode 100755
index 0000000..20c3fa4
--- /dev/null
+++ b/seatunnel-mcp/run.sh
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#!/bin/bash
+
+# Run the SeaTunnel MCP server
+# This script sets up the environment and runs the server
+
+# Check if virtual environment exists, if not create it
+if [ ! -d ".venv" ]; then
+ echo "Creating virtual environment..."
+ python -m venv .venv
+fi
+
+# Activate virtual environment
+source .venv/bin/activate
+
+# Install dependencies if needed
+if ! command -v uvicorn &> /dev/null; then
+ echo "Installing dependencies..."
+ pip install -e .
+fi
+
+# Check if .env file exists, if not create from example
+if [ ! -f ".env" ]; then
+ if [ -f ".env.example" ]; then
+ echo "Creating .env file from .env.example..."
+ cp .env.example .env
+ echo "Please review and update the .env file with your SeaTunnel API
settings."
+ else
+ echo "Warning: .env.example file not found. Please create a .env file
manually."
+ fi
+fi
+
+# Run the server
+echo "Starting SeaTunnel MCP server..."
+python -m src.seatunnel_mcp
\ No newline at end of file
diff --git a/seatunnel-mcp/src/seatunnel_mcp/__init__.py
b/seatunnel-mcp/src/seatunnel_mcp/__init__.py
new file mode 100644
index 0000000..462f65e
--- /dev/null
+++ b/seatunnel-mcp/src/seatunnel_mcp/__init__.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SeaTunnel MCP Server.
+
+A Model Context Protocol server for interacting with SeaTunnel through LLM
interfaces.
+"""
+
+__version__ = "0.1.0"
\ No newline at end of file
diff --git a/seatunnel-mcp/src/seatunnel_mcp/__main__.py
b/seatunnel-mcp/src/seatunnel_mcp/__main__.py
new file mode 100644
index 0000000..b5ac93d
--- /dev/null
+++ b/seatunnel-mcp/src/seatunnel_mcp/__main__.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Main entry point for the SeaTunnel MCP server."""
+
+import os
+import sys
+import logging
+from typing import Dict, Any, Optional
+
+from dotenv import load_dotenv
+from mcp.server.fastmcp import FastMCP
+
+from .client import SeaTunnelClient
+from .tools import get_all_tools
+
+# Setup logging
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
+)
+logger = logging.getLogger(__name__)
+
+# Load environment variables
+load_dotenv()
+
+# Default values
+DEFAULT_HOST = "127.0.0.1"
+DEFAULT_PORT = 8080
+DEFAULT_API_URL = "http://localhost:8090" # Default SeaTunnel API URL
+
+
+def main():
+ """Run the SeaTunnel MCP server."""
+ # Get configuration from environment
+ host = os.environ.get("MCP_HOST", DEFAULT_HOST)
+ port = int(os.environ.get("MCP_PORT", DEFAULT_PORT))
+ api_url = os.environ.get("SEATUNNEL_API_URL", DEFAULT_API_URL)
+ api_key = os.environ.get("SEATUNNEL_API_KEY", None)
+
+ # Create SeaTunnel client
+ client = SeaTunnelClient(base_url=api_url, api_key=api_key)
+
+ # Create MCP server
+ server = FastMCP(
+ name="SeaTunnel MCP Server",
+ instructions="A Model Context Protocol server for interacting with
SeaTunnel through LLM interfaces",
+ log_level="INFO",
+ host=host,
+ port=port,
+ )
+
+ # Register all tools
+ tools = get_all_tools(client)
+ for tool_fn in tools:
+ # 直接添加函数作为工具
+ server.add_tool(tool_fn)
+
+ # Run server
+ logger.info(f"Starting SeaTunnel MCP server at http://{host}:{port}")
+ server.run()
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/seatunnel-mcp/src/seatunnel_mcp/cli.py
b/seatunnel-mcp/src/seatunnel_mcp/cli.py
new file mode 100644
index 0000000..246656d
--- /dev/null
+++ b/seatunnel-mcp/src/seatunnel_mcp/cli.py
@@ -0,0 +1,206 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SeaTunnel MCP 命令行界面。
+
+为 SeaTunnel MCP 服务器提供命令行工具,便于启动、管理和配置服务器。
+"""
+
+import os
+import sys
+import argparse
+import logging
+import json
+from typing import Optional, Dict, Any, List
+
+from dotenv import load_dotenv
+
+from . import __version__
+from .__main__ import main as run_server
+
+
+def setup_logging(level: str) -> None:
+ """设置日志级别。
+
+ Args:
+ level: 日志级别 (debug, info, warning, error, critical)
+ """
+ numeric_level = getattr(logging, level.upper(), None)
+ if not isinstance(numeric_level, int):
+ raise ValueError(f"无效的日志级别: {level}")
+
+ logging.basicConfig(
+ level=numeric_level,
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
+ )
+
+
+def print_version() -> None:
+ """打印版本信息。"""
+ print(f"SeaTunnel MCP 版本: {__version__}")
+
+
+def create_env_file(env_file: str) -> None:
+ """创建环境变量配置文件。
+
+ Args:
+ env_file: 环境变量文件路径
+ """
+ if os.path.exists(env_file):
+ print(f"错误: 文件已存在: {env_file}")
+ sys.exit(1)
+
+ example_file =
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
".env.example")
+
+ if os.path.exists(example_file):
+ with open(example_file, "r") as src, open(env_file, "w") as dst:
+ dst.write(src.read())
+ print(f"已创建环境变量文件: {env_file}")
+ else:
+ # 如果找不到示例文件,创建一个基本的配置
+ with open(env_file, "w") as f:
+ f.write("# SeaTunnel MCP 服务器配置\n\n")
+ f.write("# MCP 服务器配置\n")
+ f.write("MCP_HOST=127.0.0.1\n")
+ f.write("MCP_PORT=8080\n\n")
+ f.write("# SeaTunnel API 配置\n")
+ f.write("SEATUNNEL_API_URL=http://localhost:8090\n")
+ f.write("SEATUNNEL_API_KEY=your_api_key_here\n")
+ print(f"已创建基本环境变量文件: {env_file}")
+
+
+def configure_mcp_for_claude_desktop(config_file: Optional[str] = None) ->
None:
+ """为 Claude Desktop 配置 MCP 服务器。
+
+ Args:
+ config_file: Claude Desktop 配置文件路径,如果未提供则使用默认路径
+ """
+ if config_file is None:
+ # 尝试找到默认配置文件
+ home_dir = os.path.expanduser("~")
+ default_paths = [
+ os.path.join(home_dir, ".claude", "claude_desktop_config.json"),
+ os.path.join(home_dir, "AppData", "Roaming", "claude",
"claude_desktop_config.json"),
+ os.path.join(home_dir, "Library", "Application Support", "claude",
"claude_desktop_config.json"),
+ ]
+
+ for path in default_paths:
+ if os.path.exists(path):
+ config_file = path
+ break
+
+ if config_file is None:
+ print("错误: 无法找到 Claude Desktop 配置文件")
+ print("请手动指定配置文件路径: seatunnel-mcp configure-claude --config-file
PATH")
+ sys.exit(1)
+
+ # 读取现有配置或创建新配置
+ config = {}
+ if os.path.exists(config_file):
+ try:
+ with open(config_file, "r") as f:
+ config = json.load(f)
+ except json.JSONDecodeError:
+ print(f"警告: {config_file} 包含无效的 JSON,将创建新配置")
+
+ # 添加 SeaTunnel MCP 配置
+ if "mcpServers" not in config:
+ config["mcpServers"] = {}
+
+ config["mcpServers"]["seatunnel"] = {
+ "command": "python",
+ "args": ["-m", "src.seatunnel_mcp"]
+ }
+
+ # 保存配置
+ os.makedirs(os.path.dirname(config_file), exist_ok=True)
+ with open(config_file, "w") as f:
+ json.dump(config, f, indent=2)
+
+ print(f"已为 Claude Desktop 配置 SeaTunnel MCP 服务器: {config_file}")
+
+
+def main() -> None:
+ """命令行入口点。"""
+ parser = argparse.ArgumentParser(description="SeaTunnel MCP 服务器命令行工具")
+
+ # 全局选项
+ parser.add_argument("-v", "--version", action="store_true", help="显示版本信息")
+ parser.add_argument("--log-level", choices=["debug", "info", "warning",
"error", "critical"],
+ default="info", help="设置日志级别 (默认: info)")
+
+ # 子命令
+ subparsers = parser.add_subparsers(dest="command", help="命令")
+
+ # 运行服务器
+ run_parser = subparsers.add_parser("run", help="运行 MCP 服务器")
+ run_parser.add_argument("--host", help="监听主机 (默认: 从环境变量获取)")
+ run_parser.add_argument("--port", type=int, help="监听端口 (默认: 从环境变量获取)")
+ run_parser.add_argument("--api-url", help="SeaTunnel API URL (默认:
从环境变量获取)")
+ run_parser.add_argument("--api-key", help="SeaTunnel API 密钥 (默认: 从环境变量获取)")
+ run_parser.add_argument("--env-file", help="环境变量文件路径 (默认: .env)")
+
+ # 初始化环境变量文件
+ init_parser = subparsers.add_parser("init", help="初始化环境变量文件")
+ init_parser.add_argument("--env-file", default=".env", help="环境变量文件路径 (默认:
.env)")
+
+ # 为 Claude Desktop 配置 MCP
+ claude_parser = subparsers.add_parser("configure-claude", help="为 Claude
Desktop 配置 MCP 服务器")
+ claude_parser.add_argument("--config-file", help="Claude Desktop 配置文件路径")
+
+ args = parser.parse_args()
+
+ # 设置日志级别
+ setup_logging(args.log_level)
+
+ # 显示版本信息
+ if args.version:
+ print_version()
+ return
+
+ # 处理命令
+ if args.command == "run":
+ # 加载环境变量
+ if args.env_file:
+ load_dotenv(args.env_file)
+ else:
+ load_dotenv()
+
+ # 设置环境变量
+ if args.host:
+ os.environ["MCP_HOST"] = args.host
+ if args.port:
+ os.environ["MCP_PORT"] = str(args.port)
+ if args.api_url:
+ os.environ["SEATUNNEL_API_URL"] = args.api_url
+ if args.api_key:
+ os.environ["SEATUNNEL_API_KEY"] = args.api_key
+
+ # 运行服务器
+ run_server()
+
+ elif args.command == "init":
+ create_env_file(args.env_file)
+
+ elif args.command == "configure-claude":
+ configure_mcp_for_claude_desktop(args.config_file)
+
+ else:
+ parser.print_help()
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/seatunnel-mcp/src/seatunnel_mcp/client.py
b/seatunnel-mcp/src/seatunnel_mcp/client.py
new file mode 100644
index 0000000..8cd6336
--- /dev/null
+++ b/seatunnel-mcp/src/seatunnel_mcp/client.py
@@ -0,0 +1,313 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SeaTunnel API client for interacting with the REST API."""
+
+import json
+import logging
+from typing import Dict, List, Any, Optional, Union
+import httpx
+
+logger = logging.getLogger(__name__)
+
+
+class SeaTunnelClient:
+ """Client for interacting with the SeaTunnel REST API."""
+
+ def __init__(self, base_url: str, api_key: Optional[str] = None):
+ """Initialize the client.
+
+ Args:
+ base_url: Base URL of the SeaTunnel REST API.
+ api_key: Optional API key for authentication.
+ """
+ self.base_url = base_url
+ self.api_key = api_key
+ self.headers = {"Content-Type": "application/json"}
+ if api_key:
+ self.headers["Authorization"] = f"Bearer {api_key}"
+
+ def update_connection_settings(self, url: Optional[str] = None, api_key:
Optional[str] = None) -> Dict[str, Any]:
+ """Update connection settings.
+
+ Args:
+ url: New base URL for the SeaTunnel REST API.
+ api_key: New API key for authentication.
+
+ Returns:
+ Dict with updated connection settings.
+ """
+ if url:
+ self.base_url = url
+ if api_key:
+ self.api_key = api_key
+ self.headers["Authorization"] = f"Bearer {api_key}" if api_key
else None
+
+ return self.get_connection_settings()
+
+ def get_connection_settings(self) -> Dict[str, Any]:
+ """Get current connection settings.
+
+ Returns:
+ Dict with current connection settings.
+ """
+ return {
+ "url": self.base_url,
+ "has_api_key": self.api_key is not None,
+ }
+
+ def _make_request(self, method: str, endpoint: str, **kwargs) ->
httpx.Response:
+ """Make a request to the SeaTunnel API.
+
+ Args:
+ method: HTTP method.
+ endpoint: API endpoint.
+ **kwargs: Additional arguments for the request.
+
+ Returns:
+ Response from the API.
+
+ Raises:
+ httpx.HTTPStatusError: If the request fails.
+ """
+ url = f"{self.base_url}{endpoint}"
+ headers = kwargs.pop("headers", {})
+
+ # Don't add the default Content-Type header if we're uploading files
+ if "files" not in kwargs:
+ # Create a new dictionary with self.headers as the base
+ merged_headers = dict(self.headers)
+ # If custom headers exist, they override the default ones
+ merged_headers.update(headers)
+ headers = merged_headers
+ else:
+ # Only add the Authorization header when using files
+ if "Authorization" in self.headers:
+ headers["Authorization"] = self.headers["Authorization"]
+
+ try:
+ with httpx.Client() as client:
+ response = client.request(method, url, headers=headers,
**kwargs)
+ response.raise_for_status()
+ return response
+ except httpx.HTTPStatusError as e:
+ logger.error(f"HTTP error: {e}")
+ raise
+ except httpx.RequestError as e:
+ logger.error(f"Request error: {e}")
+ raise
+
+ def submit_job(
+ self,
+ job_content: str,
+ jobName: Optional[str] = None,
+ jobId: Optional[str] = None,
+ isStartWithSavePoint: Optional[bool] = None,
+ format: str = "hocon"
+ ) -> Dict[str, Any]:
+ """Submit a new job.
+
+ Args:
+ job_content: Job configuration content.
+ jobName: Optional job name.
+ jobId: Optional job ID.
+ isStartWithSavePoint: Whether to start with savepoint.
+ format: Job configuration format (hocon, json, yaml).
+
+ Returns:
+ Response from the API.
+ """
+ params = {}
+ if jobName:
+ params["jobName"] = jobName
+ if jobId is not None:
+ params["jobId"] = str(jobId) # Convert jobId to string
+ if isStartWithSavePoint is not None:
+ params["isStartWithSavePoint"] = str(isStartWithSavePoint).lower()
+ if format:
+ params["format"] = format
+
+ response = self._make_request(
+ "POST",
+ "/submit-job",
+ params=params,
+ content=job_content,
+ headers={"Content-Type": "text/plain"}
+ )
+
+ return response.json()
+
+ def submit_jobs(
+ self,
+ request_body: Any
+ ) -> Dict[str, Any]:
+ """Submit multiple jobs in batch.
+
+ Args:
+ request_body: The direct request body to send to the API.
+ It will be used as-is without modification.
+
+ Returns:
+ Response from the API.
+ """
+ response = self._make_request(
+ "POST",
+ "/submit-jobs",
+ json=request_body,
+ )
+
+ return response.json()
+
+ def submit_job_upload(
+ self,
+ config_file: Union[str, Any],
+ jobName: Optional[str] = None,
+ jobId: Optional[Union[str, int]] = None,
+ isStartWithSavePoint: Optional[bool] = None,
+ format: Optional[str] = None,
+ ) -> Dict[str, Any]:
+ """Submit a new job using file upload.
+
+ Args:
+ config_file: Either a file path string or a file-like object. If a
path string is provided,
+ the file will be opened and submitted in the
multipart/form-data request body.
+ jobName: Optional job name (sent as a query parameter).
+ jobId: Optional job ID (sent as a query parameter). Can be a
string or integer, will be converted to string.
+ isStartWithSavePoint: Whether to start with savepoint (sent as a
query parameter).
+ format: Job configuration format (hocon, json, yaml) (sent as a
query parameter).
+ If not provided, it will be determined from the file name.
+
+ Returns:
+ Response from the API.
+ """
+ params = {}
+ if jobName:
+ params["jobName"] = jobName
+ if jobId is not None:
+ params["jobId"] = str(jobId) # Convert jobId to string
+ if isStartWithSavePoint is not None:
+ params["isStartWithSavePoint"] = str(isStartWithSavePoint).lower()
+
+ if format:
+ params["format"] = format
+
+ # If config_file is a string, assume it's a file path and open the file
+ file_to_close = None
+ try:
+ if isinstance(config_file, str):
+ file_to_close = open(config_file, 'rb')
+ files = {'config_file': file_to_close}
+ else:
+ # Assume it's already a file-like object
+ files = {'config_file': config_file}
+
+ response = self._make_request(
+ "POST",
+ "/submit-job/upload",
+ params=params,
+ files=files
+ )
+
+ return response.json()
+ finally:
+ # Ensure we close the file if we opened it
+ if file_to_close:
+ file_to_close.close()
+
+ def stop_job(self, jobId: Union[str, int], isStartWithSavePoint: bool =
False) -> Dict[str, Any]:
+ """Stop a running job.
+
+ Args:
+ jobId: Job ID.
+ isStartWithSavePoint: Whether to stop with savepoint.
+
+ Returns:
+ Response from the API.
+ """
+ data = {
+ "jobId": jobId,
+ "isStopWithSavePoint": isStartWithSavePoint
+ }
+
+ response = self._make_request("POST", "/stop-job", json=data)
+ return response.json()
+
+ def get_job_info(self, jobId: Union[str, int]) -> Dict[str, Any]:
+ """Get information about a job.
+
+ Args:
+ jobId: Job ID.
+
+ Returns:
+ Response from the API.
+ """
+ response = self._make_request("GET", f"/job-info/{jobId}")
+ return response.json()
+
+ def get_running_job(self, jobId: Union[str, int]) -> Dict[str, Any]:
+ """Get information about a running job.
+
+ Args:
+ jobId: Job ID.
+
+ Returns:
+ Response from the API.
+ """
+ response = self._make_request("GET", f"/running-job/{jobId}")
+ return response.json()
+
+ def get_running_jobs(self) -> Dict[str, Any]:
+ """Get all running jobs.
+
+ Returns:
+ Response from the API.
+ """
+ response = self._make_request("GET", "/running-jobs")
+ return response.json()
+
+ def get_finished_jobs(self, state: str) -> Dict[str, Any]:
+ """Get all finished jobs by state.
+
+ Args:
+ state: Job state (FINISHED, CANCELED, FAILED, UNKNOWABLE).
+
+ Returns:
+ Response from the API.
+ """
+ response = self._make_request("GET", f"/finished-jobs/{state}")
+ return response.json()
+
+ def get_overview(self, tags: Optional[Dict[str, str]] = None) -> Dict[str,
Any]:
+ """Get cluster overview.
+
+ Args:
+ tags: Optional tags for filtering.
+
+ Returns:
+ Response from the API.
+ """
+ params = tags or {}
+ response = self._make_request("GET", "/overview", params=params)
+ return response.json()
+
+ def get_system_monitoring_information(self) -> Dict[str, Any]:
+ """Get system monitoring information.
+
+ Returns:
+ Response from the API.
+ """
+ response = self._make_request("GET", "/system-monitoring-information")
+ return response.json()
\ No newline at end of file
diff --git a/seatunnel-mcp/src/seatunnel_mcp/schema.py
b/seatunnel-mcp/src/seatunnel_mcp/schema.py
new file mode 100644
index 0000000..1099efe
--- /dev/null
+++ b/seatunnel-mcp/src/seatunnel_mcp/schema.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""MCP schemas for the SeaTunnel MCP tools."""
+
+from typing import Dict, List, Any, Optional, Union
+from pydantic import BaseModel, Field
+
+__all__ = [
+ "ConnectionSettings",
+ "SubmitJobRequest",
+ "SubmitJobUploadRequest",
+ "SubmitJobsRequest",
+ "StopJobRequest",
+ "JobStateType",
+]
+
+
+class ConnectionSettings(BaseModel):
+ """Connection settings for the SeaTunnel API."""
+
+ url: str = Field(..., description="Base URL of the SeaTunnel REST API")
+ has_api_key: bool = Field(..., description="Whether an API key is set")
+
+
+class UpdateConnectionSettings(BaseModel):
+ """Update connection settings for the SeaTunnel API."""
+
+ url: Optional[str] = Field(None, description="New base URL for the
SeaTunnel REST API")
+ api_key: Optional[str] = Field(None, description="New API key for
authentication")
+
+
+class SubmitJobRequest(BaseModel):
+ """Request for submitting a job."""
+
+ job_content: str = Field(..., description="Job configuration content in
specified format")
+ jobName: Optional[str] = Field(None, description="Optional job name")
+ jobId: Optional[Union[str, int]] = Field(None, description="Optional job
ID. Can be a string or integer.")
+ is_start_with_save_point: Optional[bool] = Field(None,
description="Whether to start with savepoint")
+ format: str = Field("hocon", description="Job configuration format (hocon,
json, yaml)")
+
+
+class SubmitJobUploadRequest(BaseModel):
+ """Request for submitting a job via file upload."""
+
+ config_file: Union[str, Any] = Field(..., description="Configuration file
path or file object (multipart/form-data body parameter)")
+ jobName: Optional[str] = Field(None, description="Optional job name (query
parameter)")
+ jobId: Optional[Union[str, int]] = Field(None, description="Optional job
ID (query parameter). Can be a string or integer.")
+ is_start_with_save_point: Optional[bool] = Field(None,
description="Whether to start with savepoint (query parameter)")
+ format: Optional[str] = Field(None, description="Job configuration format
(hocon, json, yaml) (query parameter). If not provided, determined from the
file name")
+
+
+class SubmitJobsRequest(BaseModel):
+ """Request for submitting multiple jobs in batch."""
+
+ request_body: Any = Field(..., description="Direct request body to send to
the API")
+
+
+class StopJobRequest(BaseModel):
+ """Request for stopping a job."""
+
+ jobId: Union[str, int] = Field(..., description="Job ID")
+ isStartWithSavePoint: bool = Field(False, description="Whether to stop
with savepoint")
+
+
+class JobInfoRequest(BaseModel):
+ """Request for getting job information."""
+
+ jobId: Union[str, int] = Field(..., description="Job ID")
+
+
+class FinishedJobsRequest(BaseModel):
+ """Request for getting finished jobs."""
+
+ state: str = Field(..., description="Job state (FINISHED, CANCELED,
FAILED, UNKNOWABLE)")
+
+
+class OverviewRequest(BaseModel):
+ """Request for getting cluster overview."""
+
+ tags: Optional[Dict[str, str]] = Field(None, description="Optional tags
for filtering")
\ No newline at end of file
diff --git a/seatunnel-mcp/src/seatunnel_mcp/tools.py
b/seatunnel-mcp/src/seatunnel_mcp/tools.py
new file mode 100644
index 0000000..a6b37b2
--- /dev/null
+++ b/seatunnel-mcp/src/seatunnel_mcp/tools.py
@@ -0,0 +1,404 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""MCP tools for interacting with the SeaTunnel REST API."""
+
+import json
+import logging
+from typing import Dict, List, Any, Optional, Union, Callable, Iterable
+from functools import wraps
+
+from mcp.server.fastmcp.tools import Tool
+from mcp.types import TextContent, ImageContent, EmbeddedResource
+
+from .client import SeaTunnelClient
+
+logger = logging.getLogger(__name__)
+
+
+def get_connection_settings_tool(client: SeaTunnelClient) -> Callable:
+ """Get a tool for retrieving connection settings.
+
+ Args:
+ client: SeaTunnel client instance.
+
+ Returns:
+ Function that can be registered as a tool.
+ """
+ async def get_connection_settings() -> Dict[str, Any]:
+ """Get current connection settings."""
+ result = client.get_connection_settings()
+ return result
+
+ get_connection_settings.__name__ = "get-connection-settings"
+ get_connection_settings.__doc__ = "Get current SeaTunnel connection URL
and API key status"
+
+ return get_connection_settings
+
+
+def update_connection_settings_tool(client: SeaTunnelClient) -> Callable:
+ """Get a tool for updating connection settings.
+
+ Args:
+ client: SeaTunnel client instance.
+
+ Returns:
+ Function that can be registered as a tool.
+ """
+ async def update_connection_settings(url: Optional[str] = None, api_key:
Optional[str] = None) -> Dict[str, Any]:
+ """Update connection settings.
+
+ Args:
+ url: New base URL for the SeaTunnel REST API.
+ api_key: New API key for authentication.
+
+ Returns:
+ Updated connection settings.
+ """
+ result = client.update_connection_settings(url=url, api_key=api_key)
+ return result
+
+ update_connection_settings.__name__ = "update-connection-settings"
+ update_connection_settings.__doc__ = "Update URL and/or API key to connect
to a different SeaTunnel instance"
+
+ return update_connection_settings
+
+
+def submit_job_tool(client: SeaTunnelClient) -> Callable:
+ """Get a tool for submitting a job.
+
+ Args:
+ client: SeaTunnel client instance.
+
+ Returns:
+ Function that can be registered as a tool.
+ """
+ async def submit_job(
+ job_content: str,
+ jobName: Optional[str] = None,
+ jobId: Optional[Union[str, int]] = None,
+ isStartWithSavePoint: Optional[bool] = None,
+ format: str = "hocon",
+ ) -> Dict[str, Any]:
+ """Submit a new job.
+
+ Args:
+ job_content: Job configuration content.
+ jobName: Optional job name.
+ jobId: Optional job ID. Can be a string or integer, will be
converted to string.
+ isStartWithSavePoint: Whether to start with savepoint.
+ format: Job configuration format (hocon, json, yaml).
+
+ Returns:
+ Response from the API.
+ """
+ result = client.submit_job(
+ job_content=job_content,
+ jobName=jobName,
+ jobId=jobId,
+ isStartWithSavePoint=isStartWithSavePoint,
+ format=format,
+ )
+ return result
+
+ submit_job.__name__ = "submit-job"
+ submit_job.__doc__ = "Submit a new job to the SeaTunnel cluster with
configuration content"
+
+ return submit_job
+
+
+def submit_job_upload_tool(client: SeaTunnelClient) -> Callable:
+ """Get a tool for submitting a job using file upload.
+
+ Args:
+ client: SeaTunnel client instance.
+
+ Returns:
+ Function that can be registered as a tool.
+ """
+ async def submit_job_upload(
+ config_file: Union[str, Any],
+ jobName: Optional[str] = None,
+ jobId: Optional[Union[str, int]] = None,
+ isStartWithSavePoint: Optional[bool] = None,
+ format: Optional[str] = None,
+ ) -> Dict[str, Any]:
+ """Submit a new job using file upload.
+
+ Args:
+ config_file: Either a file path string or a file-like object. If a
path string is provided,
+ the file will be opened and submitted in the
multipart/form-data request body.
+ jobName: Optional job name (sent as a query parameter).
+ jobId: Optional job ID (sent as a query parameter). Can be a
string or integer, will be converted to string.
+ isStartWithSavePoint: Whether to start with savepoint (sent as a
query parameter).
+ format: Job configuration format (hocon, json, yaml) (sent as a
query parameter).
+ If not provided, it will be determined from the file name.
+
+ Returns:
+ Response from the API.
+ """
+ result = client.submit_job_upload(
+ config_file=config_file,
+ jobName=jobName,
+ jobId=jobId,
+ isStartWithSavePoint=isStartWithSavePoint,
+ format=format,
+ )
+ return result
+
+ submit_job_upload.__name__ = "submit-job-upload"
+ submit_job_upload.__doc__ = "Submit a new job to the SeaTunnel cluster by
uploading a configuration file"
+
+ return submit_job_upload
+
+
+def submit_jobs_tool(client: SeaTunnelClient) -> Callable:
+ """Get a tool for submitting multiple jobs in batch.
+
+ Args:
+ client: SeaTunnel client instance.
+
+ Returns:
+ Function that can be registered as a tool.
+ """
+ async def submit_jobs(
+ request_body: Any
+ ) -> Dict[str, Any]:
+ """Submit multiple jobs in batch.
+
+ Args:
+ request_body: The direct request body to send to the API.
+ It will be used as-is without modification.
+
+ Returns:
+ Response from the API.
+ """
+ result = client.submit_jobs(request_body=request_body)
+ return result
+
+ submit_jobs.__name__ = "submit-jobs"
+ submit_jobs.__doc__ = "Submit multiple jobs in batch to the SeaTunnel
cluster. The input will be sent directly as the request body."
+
+ return submit_jobs
+
+
+def stop_job_tool(client: SeaTunnelClient) -> Callable:
+ """Get a tool for stopping a running job.
+
+ Args:
+ client: SeaTunnel client instance.
+
+ Returns:
+ Function that can be registered as a tool.
+ """
+ async def stop_job(jobId: Union[str, int], isStartWithSavePoint: bool =
False) -> Dict[str, Any]:
+ """Stop a running job.
+
+ Args:
+ jobId: Job ID. Can be a string or integer.
+ isStartWithSavePoint: Whether to stop with savepoint.
+
+ Returns:
+ Response from the API.
+ """
+ result = client.stop_job(jobId=jobId,
isStartWithSavePoint=isStartWithSavePoint)
+ return result
+
+ stop_job.__name__ = "stop-job"
+ stop_job.__doc__ = "Stop a running job by providing the jobId and optional
isStartWithSavePoint flag"
+
+ return stop_job
+
+
+def get_job_info_tool(client: SeaTunnelClient) -> Callable:
+ """Get a tool for retrieving job information.
+
+ Args:
+ client: SeaTunnel client instance.
+
+ Returns:
+ Function that can be registered as a tool.
+ """
+ async def get_job_info(jobId: Union[str, int]) -> Dict[str, Any]:
+ """Get information about a job.
+
+ Args:
+ jobId: Job ID (used as path parameter in /job-info/{jobId}). Can
be a string or integer.
+
+ Returns:
+ Response from the API.
+ """
+ result = client.get_job_info(jobId=jobId)
+ return result
+
+ get_job_info.__name__ = "get-job-info"
+ get_job_info.__doc__ = "Get detailed information about a specific job by
providing the jobId as a path parameter"
+
+ return get_job_info
+
+
+def get_running_job_tool(client: SeaTunnelClient) -> Callable:
+ """Get a tool for retrieving information about a running job.
+
+ Args:
+ client: SeaTunnel client instance.
+
+ Returns:
+ Function that can be registered as a tool.
+ """
+ async def get_running_job(jobId: Union[str, int]) -> Dict[str, Any]:
+ """Get information about a running job.
+
+ Args:
+ jobId: Job ID (used as path parameter in /running-job/{jobId}).
Can be a string or integer.
+
+ Returns:
+ Response from the API.
+ """
+ result = client.get_running_job(jobId=jobId)
+ return result
+
+ get_running_job.__name__ = "get-running-job"
+ get_running_job.__doc__ = "Get details about a specific running job by
providing the jobId as a path parameter"
+
+ return get_running_job
+
+
+def get_running_jobs_tool(client: SeaTunnelClient) -> Callable:
+ """Get a tool for retrieving all running jobs.
+
+ Args:
+ client: SeaTunnel client instance.
+
+ Returns:
+ Function that can be registered as a tool.
+ """
+ async def get_running_jobs() -> Dict[str, Any]:
+ """Get all running jobs.
+
+ Returns:
+ Response from the API.
+ """
+ result = client.get_running_jobs()
+ return result
+
+ get_running_jobs.__name__ = "get-running-jobs"
+ get_running_jobs.__doc__ = "List all currently running jobs"
+
+ return get_running_jobs
+
+
+def get_finished_jobs_tool(client: SeaTunnelClient) -> Callable:
+ """Get a tool for retrieving all finished jobs by state.
+
+ Args:
+ client: SeaTunnel client instance.
+
+ Returns:
+ Function that can be registered as a tool.
+ """
+ async def get_finished_jobs(state: str) -> Dict[str, Any]:
+ """Get all finished jobs by state.
+
+ Args:
+ state: Job state (FINISHED, CANCELED, FAILED, UNKNOWABLE) (used as
path parameter in /finished-jobs/{state}).
+
+ Returns:
+ Response from the API.
+ """
+ result = client.get_finished_jobs(state=state)
+ return result
+
+ get_finished_jobs.__name__ = "get-finished-jobs"
+ get_finished_jobs.__doc__ = "List all finished jobs by state (FINISHED,
CANCELED, FAILED, UNKNOWABLE) using the state as a path parameter"
+
+ return get_finished_jobs
+
+
+def get_overview_tool(client: SeaTunnelClient) -> Callable:
+ """Get a tool for retrieving cluster overview.
+
+ Args:
+ client: SeaTunnel client instance.
+
+ Returns:
+ Function that can be registered as a tool.
+ """
+ async def get_overview(tags: Optional[Dict[str, str]] = None) -> Dict[str,
Any]:
+ """Get cluster overview.
+
+ Args:
+ tags: Optional tags for filtering.
+
+ Returns:
+ Response from the API.
+ """
+ result = client.get_overview(tags=tags)
+ return result
+
+ get_overview.__name__ = "get-overview"
+ get_overview.__doc__ = "Get an overview of the SeaTunnel cluster"
+
+ return get_overview
+
+
+def get_system_monitoring_information_tool(client: SeaTunnelClient) ->
Callable:
+ """Get a tool for retrieving system monitoring information.
+
+ Args:
+ client: SeaTunnel client instance.
+
+ Returns:
+ Function that can be registered as a tool.
+ """
+ async def get_system_monitoring_information() -> Dict[str, Any]:
+ """Get system monitoring information.
+
+ Returns:
+ Response from the API.
+ """
+ result = client.get_system_monitoring_information()
+ return result
+
+ get_system_monitoring_information.__name__ =
"get-system-monitoring-information"
+ get_system_monitoring_information.__doc__ = "Get detailed system
monitoring information"
+
+ return get_system_monitoring_information
+
+
+def get_all_tools(client: SeaTunnelClient) -> List[Callable]:
+ """Get all MCP tools.
+
+ Args:
+ client: SeaTunnelClient instance.
+
+ Returns:
+ List of all tool functions.
+ """
+ return [
+ get_connection_settings_tool(client),
+ update_connection_settings_tool(client),
+ submit_job_tool(client),
+ submit_job_upload_tool(client),
+ submit_jobs_tool(client),
+ stop_job_tool(client),
+ get_job_info_tool(client),
+ get_running_job_tool(client),
+ get_running_jobs_tool(client),
+ get_finished_jobs_tool(client),
+ get_overview_tool(client),
+ get_system_monitoring_information_tool(client),
+ ]
\ No newline at end of file
diff --git a/seatunnel-mcp/tests/__init__.py b/seatunnel-mcp/tests/__init__.py
new file mode 100644
index 0000000..c7de8e2
--- /dev/null
+++ b/seatunnel-mcp/tests/__init__.py
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Test package for SeaTunnel MCP."""
\ No newline at end of file
diff --git a/seatunnel-mcp/tests/integration/__init__.py
b/seatunnel-mcp/tests/integration/__init__.py
new file mode 100644
index 0000000..37ca2f5
--- /dev/null
+++ b/seatunnel-mcp/tests/integration/__init__.py
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""集成测试包。"""
\ No newline at end of file
diff --git a/seatunnel-mcp/tests/integration/test_api_client.py
b/seatunnel-mcp/tests/integration/test_api_client.py
new file mode 100644
index 0000000..967406e
--- /dev/null
+++ b/seatunnel-mcp/tests/integration/test_api_client.py
@@ -0,0 +1,239 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SeaTunnel API 客户端集成测试。
+
+注意:这些测试需要一个运行中的 SeaTunnel 实例。
+您可以使用 Docker 启动一个测试实例:
+
+```bash
+docker run -d --name seatunnel -p 8090:8090 apache/seatunnel:latest
+```
+
+或者,您可以在测试中使用模拟服务器。
+"""
+
+import os
+import pytest
+import httpx
+from unittest.mock import patch
+
+from src.seatunnel_mcp.client import SeaTunnelClient
+
+
+# 跳过集成测试,除非明确启用
+pytestmark = pytest.mark.skipif(
+ os.environ.get("ENABLE_INTEGRATION_TESTS") != "1",
+ reason="需要设置 ENABLE_INTEGRATION_TESTS=1 环境变量来运行集成测试"
+)
+
+# SeaTunnel API URL,可以通过环境变量覆盖
+API_URL = os.environ.get("SEATUNNEL_API_URL", "http://localhost:8090")
+API_KEY = os.environ.get("SEATUNNEL_API_KEY", None)
+
+
[email protected]
+def client():
+ """创建一个 SeaTunnel 客户端实例用于测试。"""
+ return SeaTunnelClient(base_url=API_URL, api_key=API_KEY)
+
+
+def test_connection(client):
+ """测试与 SeaTunnel API 的连接。"""
+ try:
+ # 获取概览信息(这是一个简单的端点,通常可用)
+ response = client.get_overview()
+ assert isinstance(response, dict)
+ except httpx.RequestError as e:
+ pytest.skip(f"无法连接到 SeaTunnel API: {e}")
+ except httpx.HTTPStatusError as e:
+ pytest.skip(f"SeaTunnel API 响应错误: {e}")
+
+
+def test_get_running_jobs(client):
+ """测试获取运行中的作业。"""
+ try:
+ response = client.get_running_jobs()
+ assert isinstance(response, dict)
+ except httpx.HTTPStatusError as e:
+ if e.response.status_code == 404:
+ pytest.skip("get_running_jobs 端点不可用")
+ raise
+
+
+def test_get_finished_jobs(client):
+ """测试获取已完成的作业。"""
+ try:
+ response = client.get_finished_jobs(state="FINISHED")
+ assert isinstance(response, dict)
+ except httpx.HTTPStatusError as e:
+ if e.response.status_code == 404:
+ pytest.skip("get_finished_jobs 端点不可用")
+ raise
+
+
+def test_get_system_monitoring_information(client):
+ """测试获取系统监控信息。"""
+ try:
+ response = client.get_system_monitoring_information()
+ assert isinstance(response, dict)
+ except httpx.HTTPStatusError as e:
+ if e.response.status_code == 404:
+ pytest.skip("get_system_monitoring_information 端点不可用")
+ raise
+
+
+def test_submit_and_stop_job(client):
+ """测试提交和停止作业。
+
+ 注意:这个测试会提交一个真实的作业,可能会消耗资源。
+ """
+ # 定义一个简单的测试作业配置
+ job_config = """
+ env {
+ job.mode = "batch"
+ }
+
+ source {
+ FakeSource {
+ row.num = 10
+ schema = {
+ fields {
+ id = int
+ name = string
+ }
+ }
+ }
+ }
+
+ sink {
+ Console {}
+ }
+ """
+
+ try:
+ # 提交作业
+ submit_response = client.submit_job(
+ job_content=job_config,
+ jobName="integration_test_job",
+ format="hocon"
+ )
+ assert isinstance(submit_response, dict)
+
+ # 检查是否获取到作业 ID
+ assert "jobId" in submit_response or "jobId" in submit_response
+
+ # 获取作业 ID
+ jobId = submit_response.get("jobId", submit_response.get("jobId"))
+
+ # 尝试停止作业(可能已经完成)
+ try:
+ stop_response = client.stop_job(jobId=jobId)
+ assert isinstance(stop_response, dict)
+ except httpx.HTTPStatusError as e:
+ if e.response.status_code == 404:
+ # 作业可能已经完成,这是正常的
+ pass
+ elif e.response.status_code == 400:
+ # 作业可能已经停止,这也是正常的
+ pass
+ else:
+ raise
+ except httpx.HTTPStatusError as e:
+ if e.response.status_code == 404:
+ pytest.skip("submit_job 或 stop_job 端点不可用")
+ raise
+
+
+def test_submit_jobs(client):
+ """测试批量提交作业。
+
+ 注意:这个测试会提交真实的作业,可能会消耗资源。
+ """
+ # 定义两个简单的测试作业作为请求体
+ request_body = [
+ {
+ "params": {
+ "jobId": "batch_test_1",
+ "jobName": "batch_test_job_1"
+ },
+ "env": {
+ "job.mode": "batch"
+ },
+ "source": [
+ {
+ "plugin_name": "FakeSource",
+ "plugin_output": "fake",
+ "row.num": 10,
+ "schema": {
+ "fields": {
+ "id": "int",
+ "name": "string"
+ }
+ }
+ }
+ ],
+ "transform": [],
+ "sink": [
+ {
+ "plugin_name": "Console",
+ "plugin_input": ["fake"]
+ }
+ ]
+ },
+ {
+ "params": {
+ "jobId": "batch_test_2",
+ "jobName": "batch_test_job_2"
+ },
+ "env": {
+ "job.mode": "batch"
+ },
+ "source": [
+ {
+ "plugin_name": "FakeSource",
+ "plugin_output": "fake",
+ "row.num": 10,
+ "schema": {
+ "fields": {
+ "id": "int",
+ "name": "string"
+ }
+ }
+ }
+ ],
+ "transform": [],
+ "sink": [
+ {
+ "plugin_name": "Console",
+ "plugin_input": ["fake"]
+ }
+ ]
+ }
+ ]
+
+ try:
+ # 批量提交作业
+ submit_response = client.submit_jobs(request_body=request_body)
+ assert isinstance(submit_response, dict)
+
+ # 验证响应包含作业ID
+ assert "jobIds" in submit_response or "jobs" in submit_response
+
+ except httpx.HTTPStatusError as e:
+ if e.response.status_code == 404:
+ pytest.skip("submit_jobs 端点不可用")
+ raise
\ No newline at end of file
diff --git a/seatunnel-mcp/tests/test_client.py
b/seatunnel-mcp/tests/test_client.py
new file mode 100644
index 0000000..c4fbe09
--- /dev/null
+++ b/seatunnel-mcp/tests/test_client.py
@@ -0,0 +1,249 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for the SeaTunnel client."""
+
+import pytest
+import httpx
+from unittest.mock import patch, MagicMock
+
+from src.seatunnel_mcp.client import SeaTunnelClient
+
+
[email protected]
+def client():
+ """Create a client for testing."""
+ return SeaTunnelClient(base_url="http://localhost:8090",
api_key="test_key")
+
+
+def test_init(client):
+ """Test client initialization."""
+ assert client.base_url == "http://localhost:8090"
+ assert client.api_key == "test_key"
+ assert client.headers == {
+ "Content-Type": "application/json",
+ "Authorization": "Bearer test_key",
+ }
+
+
+def test_get_connection_settings(client):
+ """Test get_connection_settings."""
+ settings = client.get_connection_settings()
+ assert settings == {
+ "url": "http://localhost:8090",
+ "has_api_key": True,
+ }
+
+
+def test_update_connection_settings(client):
+ """Test update_connection_settings."""
+ settings = client.update_connection_settings(
+ url="http://new-host:8090",
+ api_key="new_key",
+ )
+ assert client.base_url == "http://new-host:8090"
+ assert client.api_key == "new_key"
+ assert client.headers["Authorization"] == "Bearer new_key"
+ assert settings == {
+ "url": "http://new-host:8090",
+ "has_api_key": True,
+ }
+
+
+@patch("httpx.Client")
+def test_submit_job(mock_client, client):
+ """Test submit_job."""
+ mock_response = MagicMock()
+ mock_response.json.return_value = {"jobId": "123"}
+ mock_response.raise_for_status.return_value = None
+
+ mock_client_instance = MagicMock()
+ mock_client_instance.request.return_value = mock_response
+ mock_client.return_value.__enter__.return_value = mock_client_instance
+
+ job_content = "env { job.mode = \"batch\" }"
+ result = client.submit_job(
+ job_content=job_content,
+ jobName="test_job",
+ format="hocon",
+ )
+
+ mock_client_instance.request.assert_called_once_with(
+ "POST",
+ "http://localhost:8090/submit-job",
+ headers={
+ "Content-Type": "text/plain",
+ "Authorization": "Bearer test_key",
+ },
+ params={"jobName": "test_job", "format": "hocon"},
+ content=job_content,
+ )
+
+ assert result == {"jobId": "123"}
+
+
+@patch("httpx.Client")
+def test_submit_jobs(mock_client, client):
+ """Test submit_jobs."""
+ mock_response = MagicMock()
+ mock_response.json.return_value = {"jobIds": ["123", "456"]}
+ mock_response.raise_for_status.return_value = None
+
+ mock_client_instance = MagicMock()
+ mock_client_instance.request.return_value = mock_response
+ mock_client.return_value.__enter__.return_value = mock_client_instance
+
+ # 直接作为请求体的任意数据
+ request_body = [
+ {
+ "params": {"jobId": "123", "jobName": "job-1"},
+ "env": {"job.mode": "batch"},
+ "source": [{"plugin_name": "FakeSource", "plugin_output": "fake"}],
+ "transform": [],
+ "sink": [{"plugin_name": "Console", "plugin_input": ["fake"]}]
+ },
+ {
+ "params": {"jobId": "456", "jobName": "job-2"},
+ "env": {"job.mode": "batch"},
+ "source": [{"plugin_name": "FakeSource", "plugin_output": "fake"}],
+ "transform": [],
+ "sink": [{"plugin_name": "Console", "plugin_input": ["fake"]}]
+ }
+ ]
+
+ result = client.submit_jobs(request_body=request_body)
+
+ mock_client_instance.request.assert_called_once_with(
+ "POST",
+ "http://localhost:8090/submit-jobs",
+ headers={
+ "Content-Type": "application/json",
+ "Authorization": "Bearer test_key",
+ },
+ json=request_body,
+ )
+
+ assert result == {"jobIds": ["123", "456"]}
+
+
+@patch("httpx.Client")
+def test_submit_job_upload(mock_client, client):
+ """Test submit_job_upload."""
+ mock_response = MagicMock()
+ mock_response.json.return_value = {"jobId": "123"}
+ mock_response.raise_for_status.return_value = None
+
+ mock_client_instance = MagicMock()
+ mock_client_instance.request.return_value = mock_response
+ mock_client.return_value.__enter__.return_value = mock_client_instance
+
+ # Mock file-like object
+ config_file = MagicMock()
+ config_file.name = "test_job.conf"
+
+ result = client.submit_job_upload(
+ config_file=config_file,
+ jobName="test_job",
+ )
+
+ mock_client_instance.request.assert_called_once_with(
+ "POST",
+ "http://localhost:8090/submit-job/upload",
+ headers={
+ "Authorization": "Bearer test_key",
+ },
+ params={"jobName": "test_job"},
+ files={'config_file': config_file},
+ )
+
+ assert result == {"jobId": "123"}
+
+
+@patch("httpx.Client")
+def test_submit_job_upload_json(mock_client, client):
+ """Test submit_job_upload with JSON file."""
+ mock_response = MagicMock()
+ mock_response.json.return_value = {"jobId": "123"}
+ mock_response.raise_for_status.return_value = None
+
+ mock_client_instance = MagicMock()
+ mock_client_instance.request.return_value = mock_response
+ mock_client.return_value.__enter__.return_value = mock_client_instance
+
+ # Mock file-like object with json extension
+ config_file = MagicMock()
+ config_file.name = "test_job.json"
+
+ result = client.submit_job_upload(
+ config_file=config_file,
+ jobName="test_job",
+ format="json", # Explicitly specify format
+ )
+
+ mock_client_instance.request.assert_called_once_with(
+ "POST",
+ "http://localhost:8090/submit-job/upload",
+ headers={
+ "Authorization": "Bearer test_key",
+ },
+ params={"jobName": "test_job", "format": "json"},
+ files={'config_file': config_file},
+ )
+
+ assert result == {"jobId": "123"}
+
+
+@patch("httpx.Client")
+@patch("builtins.open")
+def test_submit_job_upload_path(mock_open, mock_client, client):
+ """Test submit_job_upload with a file path."""
+ mock_response = MagicMock()
+ mock_response.json.return_value = {"jobId": "123"}
+ mock_response.raise_for_status.return_value = None
+
+ mock_client_instance = MagicMock()
+ mock_client_instance.request.return_value = mock_response
+ mock_client.return_value.__enter__.return_value = mock_client_instance
+
+ # Mock the file object returned by open()
+ mock_file = MagicMock()
+ mock_open.return_value = mock_file
+
+ file_path = "/path/to/test_job.conf"
+ result = client.submit_job_upload(
+ config_file=file_path,
+ jobName="test_job",
+ jobId="987654321",
+ )
+
+ # Check that open was called with the file path
+ mock_open.assert_called_once_with(file_path, 'rb')
+
+ # Check that the request was made with the proper parameters
+ mock_client_instance.request.assert_called_once_with(
+ "POST",
+ "http://localhost:8090/submit-job/upload",
+ headers={
+ "Authorization": "Bearer test_key",
+ },
+ params={"jobName": "test_job", "jobId": "987654321"},
+ files={'config_file': mock_file},
+ )
+
+ # Verify the mock file was closed after the request
+ mock_file.close.assert_called_once()
+
+ assert result == {"jobId": "123"}
\ No newline at end of file
diff --git a/seatunnel-mcp/tests/test_tools.py
b/seatunnel-mcp/tests/test_tools.py
new file mode 100644
index 0000000..9801cdb
--- /dev/null
+++ b/seatunnel-mcp/tests/test_tools.py
@@ -0,0 +1,223 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for the SeaTunnel MCP tools."""
+
+import pytest
+from unittest.mock import MagicMock
+
+from src.seatunnel_mcp.client import SeaTunnelClient
+from src.seatunnel_mcp.tools import (
+ get_connection_settings_tool,
+ update_connection_settings_tool,
+ submit_job_tool,
+ submit_job_upload_tool,
+ submit_jobs_tool,
+ stop_job_tool,
+ get_job_info_tool,
+ get_running_job_tool,
+ get_running_jobs_tool,
+ get_finished_jobs_tool,
+ get_overview_tool,
+ get_system_monitoring_information_tool,
+ get_all_tools,
+)
+
+
[email protected]
+def mock_client():
+ """Create a mock client for testing."""
+ client = MagicMock(spec=SeaTunnelClient)
+ client.get_connection_settings.return_value = {
+ "url": "http://localhost:8090",
+ "has_api_key": True,
+ }
+ client.update_connection_settings.return_value = {
+ "url": "http://new-host:8090",
+ "has_api_key": True,
+ }
+ client.submit_job.return_value = {"jobId": "123"}
+ client.submit_job_upload.return_value = {"jobId": "123"}
+ client.submit_jobs.return_value = {"jobIds": ["123", "456"]}
+ client.stop_job.return_value = {"status": "success"}
+ client.get_job_info.return_value = {"jobId": "123", "status": "RUNNING"}
+ client.get_running_job.return_value = {"jobId": "123", "status": "RUNNING"}
+ client.get_running_jobs.return_value = {"jobs": [{"jobId": "123",
"status": "RUNNING"}]}
+ client.get_finished_jobs.return_value = {"jobs": [{"jobId": "456",
"status": "FINISHED"}]}
+ client.get_overview.return_value = {"cluster": "info"}
+ client.get_system_monitoring_information.return_value = {"system": "info"}
+ return client
+
+
[email protected]
+async def test_get_connection_settings_tool(mock_client):
+ """Test get_connection_settings_tool."""
+ tool = get_connection_settings_tool(mock_client)
+ assert tool.name == "get-connection-settings"
+ result = await tool.fn()
+ mock_client.get_connection_settings.assert_called_once()
+ assert result == {
+ "url": "http://localhost:8090",
+ "has_api_key": True,
+ }
+
+
[email protected]
+async def test_update_connection_settings_tool(mock_client):
+ """Test update_connection_settings_tool."""
+ tool = update_connection_settings_tool(mock_client)
+ assert tool.name == "update-connection-settings"
+ result = await tool.fn(url="http://new-host:8090", api_key="new_key")
+ mock_client.update_connection_settings.assert_called_once_with(
+ url="http://new-host:8090", api_key="new_key"
+ )
+ assert result == {
+ "url": "http://new-host:8090",
+ "has_api_key": True,
+ }
+
+
[email protected]
+async def test_submit_job_tool(mock_client):
+ """Test submit_job_tool."""
+ tool = submit_job_tool(mock_client)
+ assert tool.name == "submit-job"
+ job_content = "env { job.mode = \"batch\" }"
+ result = await tool.fn(
+ job_content=job_content,
+ jobName="test_job",
+ format="hocon",
+ )
+ mock_client.submit_job.assert_called_once_with(
+ job_content=job_content,
+ jobName="test_job",
+ jobId=None,
+ isStartWithSavePoint=None,
+ format="hocon",
+ )
+ assert result == {"jobId": "123"}
+
+
[email protected]
+async def test_submit_job_upload_tool(mock_client):
+ """Test submit_job_upload_tool."""
+ tool = submit_job_upload_tool(mock_client)
+ assert tool.name == "submit-job-upload"
+
+ # Mock file-like object
+ config_file = MagicMock()
+ config_file.name = "test_job.conf"
+
+ result = await tool.fn(
+ config_file=config_file,
+ jobName="test_job",
+ )
+ mock_client.submit_job_upload.assert_called_once_with(
+ config_file=config_file,
+ jobName="test_job",
+ jobId=None,
+ isStartWithSavePoint=None,
+ format=None,
+ )
+ assert result == {"jobId": "123"}
+
+
[email protected]
+async def test_submit_job_upload_tool_path(mock_client):
+ """Test submit_job_upload_tool with a file path."""
+ tool = submit_job_upload_tool(mock_client)
+ assert tool.name == "submit-job-upload"
+
+ file_path = "/path/to/config.conf"
+ result = await tool.fn(
+ config_file=file_path,
+ jobName="test_job",
+ jobId="987654321",
+ )
+ mock_client.submit_job_upload.assert_called_once_with(
+ config_file=file_path,
+ jobName="test_job",
+ jobId="987654321",
+ isStartWithSavePoint=None,
+ format=None,
+ )
+ assert result == {"jobId": "123"}
+
+
[email protected]
+async def test_submit_jobs_tool(mock_client):
+ """Test submit_jobs_tool."""
+ # Set up return value for submit_jobs
+ mock_client.submit_jobs.return_value = {"jobIds": ["123", "456"]}
+
+ # Create the tool
+ tool = submit_jobs_tool(mock_client)
+ assert tool.name == "submit-jobs"
+
+ # 直接作为请求体的任意数据
+ request_body = [
+ {
+ "params": {"jobId": "123", "jobName": "job-1"},
+ "env": {"job.mode": "batch"},
+ "source": [{"plugin_name": "FakeSource", "plugin_output": "fake"}],
+ "transform": [],
+ "sink": [{"plugin_name": "Console", "plugin_input": ["fake"]}]
+ },
+ {
+ "params": {"jobId": "456", "jobName": "job-2"},
+ "env": {"job.mode": "batch"},
+ "source": [{"plugin_name": "FakeSource", "plugin_output": "fake"}],
+ "transform": [],
+ "sink": [{"plugin_name": "Console", "plugin_input": ["fake"]}]
+ }
+ ]
+
+ # Call the tool
+ result = await tool.fn(request_body=request_body)
+
+ # Verify the client method was called correctly
+ mock_client.submit_jobs.assert_called_once_with(request_body=request_body)
+
+ # Check the result
+ assert result == {"jobIds": ["123", "456"]}
+
+
[email protected]
+async def test_stop_job_tool(mock_client):
+ """Test stop_job_tool."""
+ tool = stop_job_tool(mock_client)
+ assert tool.name == "stop-job"
+ result = await tool.fn(jobId="123")
+ mock_client.stop_job.assert_called_once_with(jobId="123")
+ assert result == {"status": "success"}
+
+
+def test_get_all_tools(mock_client):
+ """Test get_all_tools."""
+ tools = get_all_tools(mock_client)
+ assert len(tools) == 12
+ tool_names = [tool.__name__ for tool in tools]
+ assert "get-connection-settings" in tool_names
+ assert "update-connection-settings" in tool_names
+ assert "submit-job" in tool_names
+ assert "submit-jobs" in tool_names
+ assert "stop-job" in tool_names
+ assert "get-job-info" in tool_names
+ assert "get-running-job" in tool_names
+ assert "get-running-jobs" in tool_names
+ assert "get-finished-jobs" in tool_names
+ assert "get-overview" in tool_names
+ assert "get-system-monitoring-information" in tool_names
\ No newline at end of file