from import sessionmaker Session = sessionmaker(bind=engine) session = Session() (new_user) ()
With the above correspondence, SQLAlchemy allows developers to interact with the database in an object-oriented manner, providing a Pythonic interface to manipulate the database.
2、SQLAlchemy synchronization operations
SQLAlchemy provides both synchronous and asynchronous operations, which are suitable for different application scenarios. The following is a description of how to encapsulate the synchronous and asynchronous operations of SQLAlchemy:
For synchronization operations, SQLAlchemy uses the traditional blocking approach to database operations. First, define a baseSession
cap (a poem)Engine
Object:
from sqlalchemy import create_engine from import declarative_base, sessionmaker from typing import Generator from import settings # General Synchronization engine = create_engine(settings.DB_URI) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) def get_db() -> Generator: """Creating a SQLAlchemy Database Session - Synchronizing Processes.""" try: db = SessionLocal() yield db finally: ()
As mentioned earlier, the use of SQLAlchemy can achieve a unified model of different databases, we can correspond to the creation of different databases connection (engine), the following is a regular several relational database connection processing.
# mysql database engine engine = create_engine( "mysql+pymysql://root:[email protected]:3306/WinFramework", pool_recycle=3600, # echo=True, ) # Sqlite Database Engine engine = create_engine("sqlite:///testdir//") # PostgreSQL Database Engine engine = create_engine( "postgresql+psycopg2://postgres:123456@localhost:5432/winframework", # echo=True, ) # SQLServer Database Engine engine = create_engine( "mssql+pymssql://sa:123456@localhost/WinFramework?tds_version=7.0", # echo=True, )
We can encapsulate some operations based on how the database operates with CRUD, as shown below.
class CRUDOperations: def __init__(self, model): = model def create(self, db, obj_in): db_obj = (**obj_in.dict()) (db_obj) () (db_obj) return db_obj def get(self, db, id): return ().filter( == id).first() def update(self, db, db_obj, obj_in): obj_data = obj_in.dict(exclude_unset=True) for field in obj_data: setattr(db_obj, field, obj_data[field]) () (db_obj) return db_obj def remove(self, db, id): obj = ().get(id) (obj) () return obj
To use it, build the data access class to operate it, as shown in the test code below.
crud_user = CRUDOperations(User) # Create with get_db() as db: user = crud_user.create(db, user_data) # Read with get_db() as db: user = crud_user.get(db, user_id) # Update with get_db() as db: user = crud_user.update(db, user, user_data) # Delete with get_db() as db: crud_user.remove(db, user_id)
3、SQLAlchemy asynchronous operation package
For asynchronous operations, SQLAlchemy uses theAsyncSession
to manage asynchronous transactions.
First, define an asynchronousSession
cap (a poem)Engine
Object:
from sqlalchemy import create_engine, URL from import AsyncSession, async_sessionmaker, create_async_engine from typing import AsyncGenerator def create_engine_and_session(url: str | URL): try: # database engine engine = create_async_engine(url, pool_pre_ping=True) except Exception as e: print("❌ Database link failure {}", e) () else: db_session = async_sessionmaker( bind=engine, autoflush=False, expire_on_commit=False ) return engine, db_session # asynchronous processing async_engine, async_session = create_engine_and_session(settings.DB_URI_ASYNC) async def get_db() -> AsyncGenerator[AsyncSession, None]: """Creating a SQLAlchemy Database Session - Asynchronous Processing.""" async with async_session() as session: yield session
Similar to the handling of synchronization, but with a different object to implement, and the function uses a combination of async await to implement asynchronous operations.
In order to realize a similar encapsulation pattern in my SQLSugar development framework, we refer to the way the base class CRUD is defined in the SQLSugar development framework to realize the encapsulation processing of multiple interfaces.
With reference to the above implementation, let's look at the code for handling wrapper classes in Python using generics.
ModelType = TypeVar("ModelType", bound=Base) PrimaryKeyType = TypeVar("PrimaryKeyType", int, str, float) # Limit the type of primary key PageDtoType = TypeVar("PageDtoType", bound=BaseModel) DtoType = TypeVar("DtoType", bound=BaseModel) class BaseCrud(Generic[ModelType, PrimaryKeyType, PageDtoType, DtoType]): """ Basic CRUD operation classes """ def __init__(self, model: Type[ModelType]): """ Base class object for database access operations (CRUD). **Parameters** * `model`: A SQLAlchemy model class """ = model
In this way, we can define the different types, and the associated information about the processing classes, through generalizations.
The data interface for the asynchronous definition of get_all's return all in this base class function is shown below.
async def get_all( self, sorting: Optional[str], db: AsyncSession ) -> List[ModelType] | None: """Get a list of objects based on a list of ID strings :param sorting: format: name asc or name asc,age desc """ query = select() if sorting: query = self.apply_sorting(query, sorting) result = await (query) items = ().all() return items
And the operation functions that correspond to obtaining a single object are as follows.
async def get(self, id: PrimaryKeyType, db: AsyncSession) -> Optional[ModelType]: """Get an object based on the primary key""" query = select().filter( == id) result = await (query) item = ().first() return item
And the operation function to create the object, as shown below.
async def create(self, obj_in: DtoType, db: AsyncSession, **kwargs) -> bool: """Creating an object and using kwargs extends the fields used to create the object. :param obj_in: object input data :param kwargs: Extended fields, e.g. format: is_deleted=0, is_active=1 """ try: if kwargs: instance = (**obj_in.model_dump(), **kwargs) else: instance = (**obj_in.model_dump()) # type: ignore (instance) await () return True except SQLAlchemyError as e: print(e) await () return False
This asynchronous functioncreate
Designed to create an object in a database via SQLAlchemy, while allowing the creation of an object via thekwargs
parameter dynamically extends the fields when the object is created.
-
async def
: indicates that this is an asynchronous function that can be used in conjunction with theawait
Used together. -
self
: This is a class method, soself
References instances of the class. -
obj_in: DtoType
:obj_in
is a Data Transfer Object (DTO) that contains data that needs to be inserted into the database.DtoType
is a generic type used to represent DTO objects. -
db: AsyncSession
:db
is an asynchronous session of SQLAlchemy (AsyncSession
) for interacting with the database. -
**kwargs
: Accepts any number of keyword arguments, allowing additional fields to be passed dynamically at object creation time.
-
obj_in.model_dump()
:: Assumptionsobj_in
is a Pydantic model or a similar structure that can be used by themodel_dump()
method into dictionary format for creating SQLAlchemy model instances. -
(**obj_in.model_dump(), **kwargs)
: Useobj_in
The fields in and through thekwargs
The extension fields passed in to instantiate the SQLAlchemy model object. If thekwargs
are non-empty, they are unpacked and passed into the model constructor as extra fields.
-
(instance)
: Adds the newly created object to the current database session. -
await ()
:: Commit the transaction to save the new object to the database.
-
SQLAlchemyError
: Catch all SQLAlchemy related errors. -
await ()
: Roll back transactions in the event of an exception to prevent incomplete or incorrect data from being committed.
With the above encapsulation, we can test the processing example of the call
from import customer as customer_crud from import Customer from pydantic import BaseModel from import CustomerDto, CustomerPageDto async def test_list_customer(): async with get_db() as db: print("get_list") totalCount, items = await customer_crud.get_list( CustomerPageDto(skipCount=0, maxResultCount=10, name="test"), db, ) print(totalCount, items) for customer in customers: print(, ) print("get_by_name") name = "test" customer = await customer_crud.get_by_name( name, db, ) if customer: print(, ) else: print(f"{name} not found") print("soft delete") result = await customer_crud.delete_byid(, db, is_deleted=1) print("Operation results:", result) print("soft delete_byids") result = await customer_crud.delete_byids( ["11122", "2C5F8672-2AA7-4B14-85AD-DF56F5BF7F1F"], db, is_deleted=1 ) print(f"Soft delete successful: {result}") print("update_by_column") result = await customer_crud.update_by_column( "id", , {"age": 30}, db ) print("Operation results:", result) await ()
Differences between synchronous and asynchronous processing:
- synchronous operation For traditional blocking application scenarios such as command line tools or simple scripts.
-
asynchronous operation More suitable for asynchronous frameworks such as
FastAPI
, which can improve performance in high concurrency scenarios.
By encapsulating database operations, you can make the code more reusable and maintainable and support different types of operation scenarios.