Skip to main content
Glama

Alibaba Cloud DMS MCP Server

Official
by aliyun
server.py11.9 kB
import json import logging import os import sys import random import string from datetime import datetime from typing import Dict, Any, List from pydantic import Field from alibabacloud_dts20200101 import models as dts_20200101_models from alibabacloud_dts20200101.client import Client as DtsClient from alibabacloud_tea_openapi.models import Config from alibabacloud_tea_util import models as util_models from mcp.server.fastmcp import FastMCP logger = logging.getLogger(__name__) mcp = FastMCP( name="dts-mcp-server" ) g_db_list = '''{ "dtstest": { "name": "dtstest", "all": false, "Table": { "table1": { "name": "table1", "all": true } } } } ''' g_reserved = '''{ "targetTableMode": "0", "dbListCaseChangeMode": "default", "isAnalyzer": false, "eventMove": false, "tableAnalyze": false, "whitelist.dms.online.ddl.enable": false, "sqlparser.dms.original.ddl": true, "whitelist.ghost.online.ddl.enable": false, "sqlparser.ghost.original.ddl": false, "privilegeMigration": false, "definer": false, "privilegeDbList": "[]", "maxRetryTime": 43200, "retry.blind.seconds": 600, "srcSSL": "0", "srcMySQLType": "HighAvailability", "destSSL": "0", "a2aFlag": "2.0", "channelInfo": "mcp", "autoStartModulesAfterConfig": "none" } ''' def get_dts_client(region_id: str): config = Config( access_key_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), access_key_secret=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), security_token=os.getenv('ALIBABA_CLOUD_SECURITY_TOKEN'), region_id=region_id, protocol="https", connect_timeout=10 * 1000, read_timeout=300 * 1000 ) client = DtsClient(config) return client @mcp.tool(name="configureDtsJob", description="Configure a dts job.", annotations={"title": "配置DTS任务", "readOnlyHint": False, "destructiveHint": False}) async def configure_dts_job( region_id: str = Field(description="The region id of the dts job (e.g., 'cn-hangzhou')"), job_type: str = Field(description="The type of job (synchronization job: SYNC, migration job: MIGRATION, data check job: CHECK)"), source_endpoint_region: str = Field(description="The source endpoint region ID"), source_endpoint_instance_type: str = Field(description="The source endpoint instance type (RDS, ECS, EXPRESS, CEN, DG)"), source_endpoint_engine_name: str = Field(description="The source endpoint engine name (MySQL, PostgreSQL, SQLServer)"), source_endpoint_instance_id: str = Field(description="The source endpoint instance ID (e.g., 'rm-xxx')"), source_endpoint_user_name: str = Field(description="The source endpoint user name"), source_endpoint_password: str = Field(description="The source endpoint password"), destination_endpoint_region: str = Field(description="The destination endpoint region ID"), destination_endpoint_instance_type: str = Field(description="The destination endpoint instance type (RDS, ECS, EXPRESS, CEN, DG)"), destination_endpoint_engine_name: str = Field(description="The destination endpoint engine name (MySQL, PostgreSQL, SQLServer)"), destination_endpoint_instance_id: str = Field(description="The destination endpoint instance ID (e.g., 'rm-xxx')"), destination_endpoint_user_name: str = Field(description="The destination endpoint user name"), destination_endpoint_password: str = Field(description="The destination endpoint password"), db_list: Dict[str, Any] = Field(description='The database objects in JSON format, example 1: migration dtstest database, db_list should like {"dtstest":{"name":"dtstest","all":true}}; example 2: migration one table task01 in dtstest database, db_list should like {"dtstest":{"name":"dtstest","all":false,"Table":{"task01":{"name":"task01","all":true}}}}; example 3: migration two tables task01 and task02 in dtstest database, db_list should like {"dtstest":{"name":"dtstest","all":false,"Table":{"task01":{"name":"task01","all":true},"task02":{"name":"task02","all":true}}}}') ) -> Dict[str, Any]: '''Configure a dts job. Args: region_id: Region ID. job_type: The type of job (synchronization job: SYNC, migration job: MIGRATION, data check job: CHECK). source_endpoint_region: The source endpoint region ID. source_endpoint_instance_type: The source endpoint instance type (RDS, ECS, EXPRESS, CEN, DG) source_endpoint_engine_name: The source endpoint engine name (MySQL, PostgreSQL, SQLServer) source_endpoint_instance_id: The source endpoint instance ID (e.g., "rm-xxx"). source_endpoint_user_name: The source endpoint user name. source_endpoint_password: The source endpoint password. destination_endpoint_region: The destination endpoint region ID. destination_endpoint_instance_type: The destination endpoint instance type (RDS, ECS, EXPRESS, CEN, DG) destination_endpoint_engine_name: The destination endpoint engine name (MySQL, PostgreSQL, SQLServer) destination_endpoint_instance_id: The destination endpoint instance ID (e.g., "rm-xxx"). destination_endpoint_user_name: The destination endpoint user name. destination_endpoint_password: The destination endpoint password. db_list: The database objects in JSON format, example 1: migration dtstest database, db_list should like {"dtstest":{"name":"dtstest","all":true}}; example 2: migration one table task01 in dtstest database, db_list should like {"dtstest":{"name":"dtstest","all":false,"Table":{"task01":{"name":"task01","all":true}}}}; example 3: migration two tables task01 and task02 in dtstest database, db_list should like {"dtstest":{"name":"dtstest","all":false,"Table":{"task01":{"name":"task01","all":true},"task02":{"name":"task02","all":true}}}}. Returns: Dict[str, Any]: Response containing the configured job details. ''' try: db_list_str = json.dumps(db_list, separators=(',', ':')) logger.info(f"Configure dts job with db_list: {db_list_str}") # init dts client client = get_dts_client(region_id) runtime = util_models.RuntimeOptions() # create dts instance create_dts_instance_request = dts_20200101_models.CreateDtsInstanceRequest( region_id=region_id, type=job_type, source_region=source_endpoint_region, destination_region=destination_endpoint_region, source_endpoint_engine_name=source_endpoint_engine_name, destination_endpoint_engine_name=destination_endpoint_engine_name, pay_type='PostPaid', quantity=1, min_du=1, max_du=16, instance_class='micro' ) create_dts_instance_response = client.create_dts_instance_with_options(create_dts_instance_request, runtime) logger.info(f"Create dts instance response: {create_dts_instance_response.body.to_map()}") dts_job_id = create_dts_instance_response.body.to_map()['JobId'] # configure dts job ran_job_name = 'dtsmcp-' + ''.join(random.sample(string.ascii_letters + string.digits, 6)) custom_reserved = json.loads(g_reserved) dts_mcp_channel = os.getenv('DTS_MCP_CHANNEL') if dts_mcp_channel and len(dts_mcp_channel) > 0: logger.info(f"Configure dts job with custom dts mcp channel: {dts_mcp_channel}") custom_reserved['channelInfo'] = dts_mcp_channel custom_reserved_str = json.dumps(custom_reserved, separators=(',', ':')) logger.info(f"Configure dts job with reserved: {custom_reserved_str}") configure_dts_job_request = dts_20200101_models.ConfigureDtsJobRequest( region_id=region_id, dts_job_name=ran_job_name, source_endpoint_instance_type=source_endpoint_instance_type, source_endpoint_engine_name=source_endpoint_engine_name, source_endpoint_instance_id=source_endpoint_instance_id, source_endpoint_region=source_endpoint_region, source_endpoint_user_name=source_endpoint_user_name, source_endpoint_password=source_endpoint_password, destination_endpoint_instance_type=destination_endpoint_instance_type, destination_endpoint_instance_id=destination_endpoint_instance_id, destination_endpoint_engine_name=destination_endpoint_engine_name, destination_endpoint_region=destination_endpoint_region, destination_endpoint_user_name=destination_endpoint_user_name, destination_endpoint_password=destination_endpoint_password, structure_initialization=True, data_initialization=True, data_synchronization=False, job_type=job_type, db_list=db_list_str, reserve=custom_reserved_str ) if dts_job_id and len(dts_job_id) > 0: configure_dts_job_request.dts_job_id = dts_job_id configure_dts_job_response = client.configure_dts_job_with_options(configure_dts_job_request, runtime) logger.info(f"Configure dts job response: {configure_dts_job_response.body.to_map()}") return configure_dts_job_response.body.to_map() except Exception as e: logger.error(f"Error occurred while configure dts job: {str(e)}") raise e @mcp.tool(name="startDtsJob", description="Start a dts job.", annotations={"title": "启动DTS任务", "readOnlyHint": False, "destructiveHint": False}) async def start_dts_job( region_id: str = Field(description="The region id of the dts job (e.g., 'cn-hangzhou')"), dts_job_id: str = Field(description="The job id of the dts job") ) -> Dict[str, Any]: """Start a dts job. Args: region_id: Region ID. dts_job_id: the dts job id. Returns: Dict[str, Any]: Response containing the start result details. """ try: client = get_dts_client(region_id) request = dts_20200101_models.StartDtsJobRequest( region_id=region_id, dts_job_id=dts_job_id ) runtime = util_models.RuntimeOptions() response = client.start_dts_job_with_options(request, runtime) return response.body.to_map() except Exception as e: logger.error(f"Error occurred while start dts job: {str(e)}") raise e @mcp.tool(name="getDtsJob", description="Get a dts job detail information.", annotations={"title": "查询DTS任务详细信息", "readOnlyHint": True}) async def describe_dts_job_detail( region_id: str = Field(description="The region id of the dts job (e.g., 'cn-hangzhou')"), dts_job_id: str = Field(description="The job id of the dts job") ) -> Dict[str, Any]: """Get dts job detail information. Args: region_id: Region ID. dts_job_id: the dts job id. Returns: Dict[str, Any]: Response containing the dts job detail information. """ try: client = get_dts_client(region_id) request = dts_20200101_models.DescribeDtsJobDetailRequest( region_id=region_id, dts_job_id=dts_job_id ) runtime = util_models.RuntimeOptions() response = client.describe_dts_job_detail_with_options(request, runtime) return response.body.to_map() except Exception as e: logger.error(f"Error occurred while describe dts job detail: {str(e)}") raise e def main(): mcp.run(transport=os.getenv('SERVER_TRANSPORT', 'stdio')) if __name__ == '__main__': # Initialize and run the server main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aliyun/alibabacloud-dms-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server