import os
import uvicorn
from import SseServerTransport
from import connect, Error
from import Server
from import Tool, TextContent
from import Starlette
from import Route, Mount
from dotenv import load_dotenv
def get_db_config():
"""Get database configuration information from environment variables
return:
dict: Contains the configuration information required for database connection
- host: database host address
- port: database port
- user: database username
- password: database password
- database: database name
abnormal:
ValueError: Thrown when required configuration information is missing"""
#Loading .env file
load_dotenv()
config = {
"host": ("MYSQL_HOST", "localhost"),
"port": int(("MYSQL_PORT", "3306")),
"user": ("MYSQL_USER"),
"password": ("MYSQL_PASSWORD"),
"database": ("MYSQL_DATABASE"),
}
print(config)
if not all([config["user"], config["password"], config["database"]]):
raise ValueError("The required database configuration is missing")
return config
def execute_sql(query: str) -> list[TextContent]:
"""Execute SQL query statements
parameter:
query (str): SQL statement to be executed, supports multiple statements separated by semicolons
return:
list[TextContent]: A list of TextContents containing the query results
- For SELECT query: Returns the result in CSV format, including column names and data
- For SHOW TABLES: Return all table names in the database
- For other queries: Return execution status and number of rows affected
- The results of multiple statements are separated by "---"
abnormal:
Error: Thrown when database connection or query execution fails"""
config = get_db_config()
try:
with connect(**config) as conn:
with () as cursor:
statements = [() for stmt in (";") if ()]
results = []
for statement in statements:
try:
(statement)
#Check whether the statement returns a result set (SELECT, SHOW, EXPLAIN, etc.)
if :
columns = [desc[0] for desc in ]
rows = ()
#Convert the data of each line into a string, and specializes in processing None values
formatted_rows = []
for row in rows:
formatted_row = [
"NULL" if value is None else str(value)
for value in row
]
formatted_rows.append(",".join(formatted_row))
#Merge column names and data into CSV format
(
"\n".join([",".join(columns)] + formatted_rows)
)
#If the statement does not return the result set (INSERT, UPDATE, DELETE, etc.)
else:
() #Submit only when non-query statements
(f"The query execution was successful. Influence number of rows: {}")
except Error as stmt_error:
#When an error occurs in a single statement, the error is recorded and the execution continues.
(
f"Error occurred when executing statement '{statement}': {str(stmt_error)}"
)
#You can choose whether to continue executing subsequent statements here, currently it is to continue
return [TextContent(type="text", text="\n---\n".join(results))]
except Error as e:
print(f"An error occurred while executing SQL '{query}': {e}")
return [TextContent(type="text", text=f"An error occurred while executing the query: {str(e)}")]
def get_table_name(text: str) -> list[TextContent]:
"""Search the table name in the database based on the Chinese annotation of the table
parameter:
text (str): Chinese annotation keywords to search for
return:
list[TextContent]: A list of TextContents containing the query results
- Return matching table name, database name and table annotation information
- The result is returned in CSV format, containing column names and data"""
config = get_db_config()
sql = "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_COMMENT "
sql += f"FROM information_schema.TABLES WHERE TABLE_SCHEMA = '{config['database']}' AND TABLE_COMMENT LIKE '%{text}%';"
return execute_sql(sql)
def get_table_desc(text: str) -> list[TextContent]:
"""Get the field structure information of the specified table
parameter:
text (str): The table name to be queryed, multiple table names are separated by commas
return:
list[TextContent]: A list of TextContents containing the query results
- Return the information of the table's field name, field comment, etc.
- Results are sorted by table name and field order
- The result is returned in CSV format, containing column names and data"""
config = get_db_config()
#Split the entered table name into a list by comma
table_names = [() for name in (",")]
#Build IN conditions
table_condition = "','".join(table_names)
sql = "SELECT TABLE_NAME, COLUMN_NAME, COLUMN_COMMENT "
sql += (
f"FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = '{config['database']}' "
)
sql += f"AND TABLE_NAME IN ('{table_condition}') ORDER BY TABLE_NAME, ORDINAL_POSITION;"
return execute_sql(sql)
def get_lock_tables() -> list[TextContent]:
sql = """SELECT
p2.`HOST` AS is blocked host,
p2.`USER` AS blocked user,
r.trx_id AS The transaction id of the blocked party,
r.trx_mysql_thread_id AS thread number,
TIMESTAMPDIFF(SECOND, r.trx_wait_started, CURRENT_TIMESTAMP) AS Waiting time,
r.trx_query AS blocked query,
l.OBJECT_NAME AS The blocking party locked table,
m.LOCK_MODE AS The lock mode of the blocked party,
m.LOCK_TYPE AS 'The lock type of the blocked party (table lock or row lock)',
m.INDEX_NAME AS The index locked by the blocking party,
m.OBJECT_SCHEMA AS The database name of the blocked square lock object,
m.OBJECT_NAME AS The table name of the blocked square lock object,
m.LOCK_DATA AS The primary key value of the blocked transaction locked record,
p.`HOST` AS blocks the host,
p.`USER` AS blocking party user,
b.trx_id AS blocking party transaction id,
b.trx_mysql_thread_id AS blocking party thread number,
b.trx_query AS blocking party query,
l.LOCK_MODE AS blocking party lock mode,
l.LOCK_TYPE AS 'Blocking party lock type (table lock or row lock)',
l.INDEX_NAME AS The blocking party locked index,
l.OBJECT_SCHEMA AS The database name of the blocking square lock object,
l.OBJECT_NAME AS The table name of the blocking square lock object,
l.LOCK_DATA AS The primary key value of the blocking party transaction lock record,
IF( = 'Sleep', CONCAT(, 'seconds'), 0) AS The time when the blocking party's transaction is idle
FROM performance_schema.data_lock_waits w
INNER JOIN performance_schema.data_locks l ON w.BLOCKING_ENGINE_LOCK_ID = l.ENGINE_LOCK_ID
INNER JOIN performance_schema.data_locks m ON w.REQUESTING_ENGINE_LOCK_ID = m.ENGINE_LOCK_ID
INNER JOIN information_schema.INNODB_TRX b ON b.trx_id = w.BLOCKING_ENGINE_TRANSACTION_ID
INNER JOIN information_schema.INNODB_TRX r ON r.trx_id = w.REQUESTING_ENGINE_TRANSACTION_ID
INNER JOIN information_schema.PROCESSLIST p ON = b.trx_mysql_thread_id
INNER JOIN information_schema.PROCESSLIST p2 ON = r.trx_mysql_thread_id
ORDER BY Waiting time DESC;"""
return execute_sql(sql)
#Initialize the server
app = Server("operateMysql")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available MySQL tools
return:
list[Tool]: Tool list, currently only execute_sql tool"""
return [
Tool(
name="execute_sql",
description="Execute SQL on MySQL8.0 database",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "SQL statement to execute"}
},
"required": ["query"],
},
),
Tool(
name="get_table_name",
description="Search the corresponding table name in the database based on the table Chinese name",
inputSchema={
"type": "object",
"properties": {
"text": {"type": "string", "description": "Chinese name of the table to search"}
},
"required": ["text"],
},
),
Tool(
name="get_table_desc",
description="Search the corresponding table structure in the database according to the table name, support multi-table query",
inputSchema={
"type": "object",
"properties": {
"text": {"type": "string", "description": "Table name to search"}
},
"required": ["text"],
},
),
Tool(
name="get_lock_tables",
description="Get the row-level lock of the current mysql server InnoDB",
inputSchema={"type": "object", "properties": {}},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "execute_sql":
query = ("query")
if not query:
raise ValueError("Missing query statement")
return execute_sql(query)
elif name == "get_table_name":
text = ("text")
if not text:
raise ValueError("Missing table information")
return get_table_name(text)
elif name == "get_table_desc":
text = ("text")
if not text:
raise ValueError("Missing table information")
return get_table_desc(text)
elif name == "get_lock_tables":
return get_lock_tables()
raise ValueError(f"Unknown tool: {name}")
sse = SseServerTransport("/messages/")
# Handler for SSE connections
async def handle_sse(request):
async with sse.connect_sse(
, , request._send
) as streams:
await (streams[0], streams[1], app.create_initialization_options())
# Create Starlette app with routes
starlette_app = Starlette(
debug=True,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
if __name__ == "__main__":
(starlette_app, host="0.0.0.0", port=9000)