Location>code7788 >text

Encapsulation of synchronous and asynchronous operations in SQLAlchemy and regular CRUD handling in Python development.

Popularity:954 ℃/2024-08-22 11:39:14
from  import sessionmaker

Session = sessionmaker(bind=engine)
session = Session()

(new_user)
()

With the above correspondence, SQLAlchemy allows developers to interact with the database in an object-oriented manner, providing a Pythonic interface to manipulate the database.

2、SQLAlchemy synchronization operations

SQLAlchemy provides both synchronous and asynchronous operations, which are suitable for different application scenarios. The following is a description of how to encapsulate the synchronous and asynchronous operations of SQLAlchemy:

For synchronization operations, SQLAlchemy uses the traditional blocking approach to database operations. First, define a baseSession cap (a poem)Engine Object:

from sqlalchemy import create_engine
from  import declarative_base, sessionmaker
from typing import Generator
from  import settings

# General Synchronization
engine = create_engine(settings.DB_URI)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def get_db() -> Generator:
    """Creating a SQLAlchemy Database Session - Synchronizing Processes."""
    try:
        db = SessionLocal()
        yield db
    finally:
        ()

As mentioned earlier, the use of SQLAlchemy can achieve a unified model of different databases, we can correspond to the creation of different databases connection (engine), the following is a regular several relational database connection processing.

# mysql database engine
engine = create_engine(
    "mysql+pymysql://root:[email protected]:3306/WinFramework",
    pool_recycle=3600,
    # echo=True,
)

# Sqlite Database Engine
engine = create_engine("sqlite:///testdir//")

# PostgreSQL Database Engine
engine = create_engine(
    "postgresql+psycopg2://postgres:123456@localhost:5432/winframework",
     # echo=True,
 )

# SQLServer Database Engine
engine = create_engine(
     "mssql+pymssql://sa:123456@localhost/WinFramework?tds_version=7.0",
     # echo=True,
 )

We can encapsulate some operations based on how the database operates with CRUD, as shown below.

class CRUDOperations:
    def __init__(self, model):
         = model

    def create(self, db, obj_in):
        db_obj = (**obj_in.dict())
        (db_obj)
        ()
        (db_obj)
        return db_obj

    def get(self, db, id):
        return ().filter( == id).first()

    def update(self, db, db_obj, obj_in):
        obj_data = obj_in.dict(exclude_unset=True)
        for field in obj_data:
            setattr(db_obj, field, obj_data[field])
        ()
        (db_obj)
        return db_obj

    def remove(self, db, id):
        obj = ().get(id)
        (obj)
        ()
        return obj

To use it, build the data access class to operate it, as shown in the test code below.

crud_user = CRUDOperations(User)

# Create
with get_db() as db:
    user = crud_user.create(db, user_data)

# Read
with get_db() as db:
    user = crud_user.get(db, user_id)

# Update
with get_db() as db:
    user = crud_user.update(db, user, user_data)

# Delete
with get_db() as db:
    crud_user.remove(db, user_id)

 

3、SQLAlchemy asynchronous operation package

For asynchronous operations, SQLAlchemy uses theAsyncSession to manage asynchronous transactions.

First, define an asynchronousSession cap (a poem)Engine Object:

from sqlalchemy import create_engine, URL
from  import AsyncSession, async_sessionmaker, create_async_engine
from typing import AsyncGenerator

def create_engine_and_session(url: str | URL):
    try:
        # database engine
        engine = create_async_engine(url, pool_pre_ping=True)
    except Exception as e:
        print("❌ Database link failure {}", e)
        ()
    else:
        db_session = async_sessionmaker(
            bind=engine, autoflush=False, expire_on_commit=False
        )
        return engine, db_session


# asynchronous processing
async_engine, async_session = create_engine_and_session(settings.DB_URI_ASYNC)


async def get_db() -> AsyncGenerator[AsyncSession, None]:
    """Creating a SQLAlchemy Database Session - Asynchronous Processing."""
    async with async_session() as session:
        yield session

Similar to the handling of synchronization, but with a different object to implement, and the function uses a combination of async await to implement asynchronous operations.

In order to realize a similar encapsulation pattern in my SQLSugar development framework, we refer to the way the base class CRUD is defined in the SQLSugar development framework to realize the encapsulation processing of multiple interfaces.

With reference to the above implementation, let's look at the code for handling wrapper classes in Python using generics.

ModelType = TypeVar("ModelType", bound=Base)
PrimaryKeyType = TypeVar("PrimaryKeyType", int, str, float)  # Limit the type of primary key
PageDtoType = TypeVar("PageDtoType", bound=BaseModel)
DtoType = TypeVar("DtoType", bound=BaseModel)


class BaseCrud(Generic[ModelType, PrimaryKeyType, PageDtoType, DtoType]):
    """
    Basic CRUD operation classes
"""

    def __init__(self, model: Type[ModelType]):
        """
        Base class object for database access operations (CRUD).
        **Parameters**
        * `model`: A SQLAlchemy model class
"""
         = model

In this way, we can define the different types, and the associated information about the processing classes, through generalizations.

The data interface for the asynchronous definition of get_all's return all in this base class function is shown below.

    async def get_all(
        self, sorting: Optional[str], db: AsyncSession
    ) -> List[ModelType] | None:
        """Get a list of objects based on a list of ID strings

        :param sorting: format: name asc or name asc,age desc
"""
        query = select()
        if sorting:
            query = self.apply_sorting(query, sorting)

        result = await (query)
        items = ().all()
        return items

And the operation functions that correspond to obtaining a single object are as follows.

    async def get(self, id: PrimaryKeyType, db: AsyncSession) -> Optional[ModelType]:
        """Get an object based on the primary key"""
        query = select().filter( == id)

        result = await (query)
        item = ().first()

        return item

And the operation function to create the object, as shown below.

    async def create(self, obj_in: DtoType, db: AsyncSession, **kwargs) -> bool:
        """Creating an object and using kwargs extends the fields used to create the object.

        :param obj_in: object input data
        :param kwargs: Extended fields, e.g. format: is_deleted=0, is_active=1
"""
        try:
            if kwargs:
                instance = (**obj_in.model_dump(), **kwargs)
            else:
                instance = (**obj_in.model_dump())  # type: ignore

            (instance)
            await ()
            return True
        except SQLAlchemyError as e:
            print(e)
            await ()
            return False

This asynchronous functioncreate Designed to create an object in a database via SQLAlchemy, while allowing the creation of an object via thekwargs parameter dynamically extends the fields when the object is created.

  • async def: indicates that this is an asynchronous function that can be used in conjunction with theawait Used together.
  • self: This is a class method, soself References instances of the class.
  • obj_in: DtoType: obj_in is a Data Transfer Object (DTO) that contains data that needs to be inserted into the database.DtoType is a generic type used to represent DTO objects.
  • db: AsyncSession: db is an asynchronous session of SQLAlchemy (AsyncSession) for interacting with the database.
  • **kwargs: Accepts any number of keyword arguments, allowing additional fields to be passed dynamically at object creation time.
  • obj_in.model_dump():: Assumptionsobj_in is a Pydantic model or a similar structure that can be used by themodel_dump() method into dictionary format for creating SQLAlchemy model instances.
  • (**obj_in.model_dump(), **kwargs): Useobj_in The fields in and through thekwargs The extension fields passed in to instantiate the SQLAlchemy model object. If thekwargs are non-empty, they are unpacked and passed into the model constructor as extra fields.
  • (instance): Adds the newly created object to the current database session.
  • await ():: Commit the transaction to save the new object to the database.
  • SQLAlchemyError: Catch all SQLAlchemy related errors.
  • await (): Roll back transactions in the event of an exception to prevent incomplete or incorrect data from being committed.

With the above encapsulation, we can test the processing example of the call

from  import customer as customer_crud
from  import Customer
from pydantic import BaseModel
from  import CustomerDto, CustomerPageDto

async def test_list_customer():
    async with get_db() as db:

        print("get_list")
        totalCount, items = await customer_crud.get_list(
            CustomerPageDto(skipCount=0, maxResultCount=10, name="test"),
            db,
        )
        print(totalCount, items)
        for customer in customers:
            print(, )

        print("get_by_name")
        name = "test"
        customer = await customer_crud.get_by_name(
            name,
            db,
        )
        if customer:
            print(, )
        else:
            print(f"{name} not found")

        print("soft delete")
        result = await customer_crud.delete_byid(, db, is_deleted=1)
        print("Operation results:", result)

        print("soft delete_byids")
        result = await customer_crud.delete_byids(
            ["11122", "2C5F8672-2AA7-4B14-85AD-DF56F5BF7F1F"], db, is_deleted=1
        )
        print(f"Soft delete successful: {result}")

        print("update_by_column")
        result = await customer_crud.update_by_column(
            "id", , {"age": 30}, db
        )
        print("Operation results:", result)

        await ()

Differences between synchronous and asynchronous processing:

  • synchronous operation For traditional blocking application scenarios such as command line tools or simple scripts.
  • asynchronous operation More suitable for asynchronous frameworks such asFastAPI, which can improve performance in high concurrency scenarios.

By encapsulating database operations, you can make the code more reusable and maintainable and support different types of operation scenarios.