Location>code7788 >text

MCP application development, use python to deploy sse mode

Popularity:942 ℃/2025-04-11 18:39:02
import os import uvicorn from import SseServerTransport from import connect, Error from import Server from import Tool, TextContent from import Starlette from import Route, Mount from dotenv import load_dotenv def get_db_config(): """Get database configuration information from environment variables return: dict: Contains the configuration information required for database connection - host: database host address - port: database port - user: database username - password: database password - database: database name abnormal: ValueError: Thrown when required configuration information is missing""" #Loading .env file load_dotenv() config = { "host": ("MYSQL_HOST", "localhost"), "port": int(("MYSQL_PORT", "3306")), "user": ("MYSQL_USER"), "password": ("MYSQL_PASSWORD"), "database": ("MYSQL_DATABASE"), } print(config) if not all([config["user"], config["password"], config["database"]]): raise ValueError("The required database configuration is missing") return config def execute_sql(query: str) -> list[TextContent]: """Execute SQL query statements parameter: query (str): SQL statement to be executed, supports multiple statements separated by semicolons return: list[TextContent]: A list of TextContents containing the query results - For SELECT query: Returns the result in CSV format, including column names and data - For SHOW TABLES: Return all table names in the database - For other queries: Return execution status and number of rows affected - The results of multiple statements are separated by "---" abnormal: Error: Thrown when database connection or query execution fails""" config = get_db_config() try: with connect(**config) as conn: with () as cursor: statements = [() for stmt in (";") if ()] results = [] for statement in statements: try: (statement) #Check whether the statement returns a result set (SELECT, SHOW, EXPLAIN, etc.) if : columns = [desc[0] for desc in ] rows = () #Convert the data of each line into a string, and specializes in processing None values formatted_rows = [] for row in rows: formatted_row = [ "NULL" if value is None else str(value) for value in row ] formatted_rows.append(",".join(formatted_row)) #Merge column names and data into CSV format ( "\n".join([",".join(columns)] + formatted_rows) ) #If the statement does not return the result set (INSERT, UPDATE, DELETE, etc.) else: () #Submit only when non-query statements (f"The query execution was successful. Influence number of rows: {}") except Error as stmt_error: #When an error occurs in a single statement, the error is recorded and the execution continues. ( f"Error occurred when executing statement '{statement}': {str(stmt_error)}" ) #You can choose whether to continue executing subsequent statements here, currently it is to continue return [TextContent(type="text", text="\n---\n".join(results))] except Error as e: print(f"An error occurred while executing SQL '{query}': {e}") return [TextContent(type="text", text=f"An error occurred while executing the query: {str(e)}")] def get_table_name(text: str) -> list[TextContent]: """Search the table name in the database based on the Chinese annotation of the table parameter: text (str): Chinese annotation keywords to search for return: list[TextContent]: A list of TextContents containing the query results - Return matching table name, database name and table annotation information - The result is returned in CSV format, containing column names and data""" config = get_db_config() sql = "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_COMMENT " sql += f"FROM information_schema.TABLES WHERE TABLE_SCHEMA = '{config['database']}' AND TABLE_COMMENT LIKE '%{text}%';" return execute_sql(sql) def get_table_desc(text: str) -> list[TextContent]: """Get the field structure information of the specified table parameter: text (str): The table name to be queryed, multiple table names are separated by commas return: list[TextContent]: A list of TextContents containing the query results - Return the information of the table's field name, field comment, etc. - Results are sorted by table name and field order - The result is returned in CSV format, containing column names and data""" config = get_db_config() #Split the entered table name into a list by comma table_names = [() for name in (",")] #Build IN conditions table_condition = "','".join(table_names) sql = "SELECT TABLE_NAME, COLUMN_NAME, COLUMN_COMMENT " sql += ( f"FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = '{config['database']}' " ) sql += f"AND TABLE_NAME IN ('{table_condition}') ORDER BY TABLE_NAME, ORDINAL_POSITION;" return execute_sql(sql) def get_lock_tables() -> list[TextContent]: sql = """SELECT p2.`HOST` AS is blocked host, p2.`USER` AS blocked user, r.trx_id AS The transaction id of the blocked party, r.trx_mysql_thread_id AS thread number, TIMESTAMPDIFF(SECOND, r.trx_wait_started, CURRENT_TIMESTAMP) AS Waiting time, r.trx_query AS blocked query, l.OBJECT_NAME AS The blocking party locked table, m.LOCK_MODE AS The lock mode of the blocked party, m.LOCK_TYPE AS 'The lock type of the blocked party (table lock or row lock)', m.INDEX_NAME AS The index locked by the blocking party, m.OBJECT_SCHEMA AS The database name of the blocked square lock object, m.OBJECT_NAME AS The table name of the blocked square lock object, m.LOCK_DATA AS The primary key value of the blocked transaction locked record, p.`HOST` AS blocks the host, p.`USER` AS blocking party user, b.trx_id AS blocking party transaction id, b.trx_mysql_thread_id AS blocking party thread number, b.trx_query AS blocking party query, l.LOCK_MODE AS blocking party lock mode, l.LOCK_TYPE AS 'Blocking party lock type (table lock or row lock)', l.INDEX_NAME AS The blocking party locked index, l.OBJECT_SCHEMA AS The database name of the blocking square lock object, l.OBJECT_NAME AS The table name of the blocking square lock object, l.LOCK_DATA AS The primary key value of the blocking party transaction lock record, IF( = 'Sleep', CONCAT(, 'seconds'), 0) AS The time when the blocking party's transaction is idle FROM performance_schema.data_lock_waits w INNER JOIN performance_schema.data_locks l ON w.BLOCKING_ENGINE_LOCK_ID = l.ENGINE_LOCK_ID INNER JOIN performance_schema.data_locks m ON w.REQUESTING_ENGINE_LOCK_ID = m.ENGINE_LOCK_ID INNER JOIN information_schema.INNODB_TRX b ON b.trx_id = w.BLOCKING_ENGINE_TRANSACTION_ID INNER JOIN information_schema.INNODB_TRX r ON r.trx_id = w.REQUESTING_ENGINE_TRANSACTION_ID INNER JOIN information_schema.PROCESSLIST p ON = b.trx_mysql_thread_id INNER JOIN information_schema.PROCESSLIST p2 ON = r.trx_mysql_thread_id ORDER BY Waiting time DESC;""" return execute_sql(sql) #Initialize the server app = Server("operateMysql") @app.list_tools() async def list_tools() -> list[Tool]: """List available MySQL tools return: list[Tool]: Tool list, currently only execute_sql tool""" return [ Tool( name="execute_sql", description="Execute SQL on MySQL8.0 database", inputSchema={ "type": "object", "properties": { "query": {"type": "string", "description": "SQL statement to execute"} }, "required": ["query"], }, ), Tool( name="get_table_name", description="Search the corresponding table name in the database based on the table Chinese name", inputSchema={ "type": "object", "properties": { "text": {"type": "string", "description": "Chinese name of the table to search"} }, "required": ["text"], }, ), Tool( name="get_table_desc", description="Search the corresponding table structure in the database according to the table name, support multi-table query", inputSchema={ "type": "object", "properties": { "text": {"type": "string", "description": "Table name to search"} }, "required": ["text"], }, ), Tool( name="get_lock_tables", description="Get the row-level lock of the current mysql server InnoDB", inputSchema={"type": "object", "properties": {}}, ), ] @app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "execute_sql": query = ("query") if not query: raise ValueError("Missing query statement") return execute_sql(query) elif name == "get_table_name": text = ("text") if not text: raise ValueError("Missing table information") return get_table_name(text) elif name == "get_table_desc": text = ("text") if not text: raise ValueError("Missing table information") return get_table_desc(text) elif name == "get_lock_tables": return get_lock_tables() raise ValueError(f"Unknown tool: {name}") sse = SseServerTransport("/messages/") # Handler for SSE connections async def handle_sse(request): async with sse.connect_sse( , , request._send ) as streams: await (streams[0], streams[1], app.create_initialization_options()) # Create Starlette app with routes starlette_app = Starlette( debug=True, routes=[ Route("/sse", endpoint=handle_sse), Mount("/messages/", app=sse.handle_post_message), ], ) if __name__ == "__main__": (starlette_app, host="0.0.0.0", port=9000)