Location>code7788 >text

A few small summaries of using sqlalchemy to manipulate databases in Python

Popularity:788 ℃/2024-08-01 19:13:08

While exploring the project built using FastAPI, SQLAlchemy, Pydantic,Redis, JWT where database access is done using SQLAlchemy with asynchronous approach. Database operations and controller operations, using base class inheritance to reduce duplicate code and improve code reusability. In the process of designing interfaces and testing, some issues are tracked and resolved, and recorded for reference.

1、SQLAlchemy transaction processing

In an asynchronous environment, batch update operations require the use of asynchronous methods to execute queries and commit transactions.

async def update_range(self, obj_in_list: List[DtoType], db: AsyncSession) -> bool:
    """Batch update objects"""
    try:
        async with ():  # Using Transaction Blocks to Ensure Consistency in Batch Operations
            for obj_in in obj_in_list:
                # query subject
                query = select().filter( == obj_in.id)
                result = await (query)
                db_obj = ().first()
                
                if db_obj:
                    # Getting updated data
                    update_data = obj_in.model_dump(skip_defaults=True)
                    
                    # Updating Object Fields
                    for field, value in update_data.items():
                        setattr(db_obj, field, value)
        
        return True
    except SQLAlchemyError as e:
        print(e)
        # Transactions are automatically rolled back when exceptions are handled
        return False

In this improved code:

  1. transaction block: Useasync with () Transaction blocks are created to ensure consistency in batch operations. Transaction blocks are automatically committed when the operation completes and rolled back in the event of an exception.
  2. query subject: Useselect().filter( == obj_in.id) Perform an asynchronous query and use theawait (query) Execute the query.
  3. Updating Object Fields: Use ofsetattr Updates the fields of the object.
  4. Exception handling: CaptureSQLAlchemyError exception and rolls back the transaction when the exception occurs. The transaction block handles rollbacks automatically, so there is no need to rollback manually.

This approach ensures the correctness and consistency of batch update operations in an asynchronous environment.

in usingasync with () When transaction management is performed, transactions are automatically committed. If all operations performed within a transaction block are successful, the transaction is automatically committed on exit; if an exception occurs, the transaction is automatically rolled back.

Therefore, manually calling theawait () is unnecessary because the transaction block handles these operations. If you don't use transaction blocks and want to manually control the commit of transactions, you can modify it as follows:

async def update_range(self, obj_in_list: List[DtoType], db: AsyncSession) -> bool:
    """Batch update objects"""
    try:
        for obj_in in obj_in_list:
            query = select().filter( == obj_in.id)
            result = await (query)
            db_obj = ().first()
            
            if db_obj:
                update_data = obj_in.model_dump(skip_defaults=True)
                
                for field, value in update_data.items():
                    setattr(db_obj, field, value)
        
        await ()  # Manual submission of transactions
        return True
    except SQLAlchemyError as e:
        print(e)
        await ()  # Ensure that transactions are rolled back in the event of an error
        return False

In this example of a manually committed transaction:

  • After the operation to update the object is complete, use theawait () to commit the transaction.
  • If an exception occurs, use theawait () Roll back transactions.

Choose the appropriate method for transaction management based on your needs. The transaction block approach is usually the safer and cleaner choice.

In an asynchronous environment, thecreate_update method requires an asynchronous query, update, or create operation on the database.

async def create_update(
    self, obj_in: DtoType, id: PrimaryKeyType, db: AsyncSession
) -> bool:
    """Creating or updating objects"""
    try:
        # query subject
        query = select().filter( == id)
        result = await (query)
        db_obj = ().first()
        
        if db_obj:
            # Update Objects
            return await (obj_in, db)
        else:
            # create an object
            return await (obj_in, db)
    except SQLAlchemyError as e:
        print(e)
        # Ensure that transactions are rolled back in the event of an error
        await ()
        return False

In this code:

  1. asynchronous query: Useselect().filter( == id) to build the query and use theawait (query) Execute the query.
  2. Getting objects: Use().first() to get the first object in the query result.
  3. Calling the update or create method: Depending on the presence or absence of query results, call the maybe method. Make sure both methods are asynchronous and are called with theawait
  4. Exception handling: CaptureSQLAlchemyError exception and use theawait () Roll back transactions.

In asynchronous environments, bulk insertion of objects usually requires the use of asynchronous methods to perform database operations. Since thebulk_insert_mappings may not be directly supported in the asynchronous version of SQLAlchemy, you can use theadd_all method to add objects in bulk.

async def save_import(self, data: List[DtoType], db: AsyncSession) -> bool:
    """Batch Import Objects"""
    try:
        # Converting DTOs to Model Instances
        db_objs = [(**obj_in.model_dump()) for obj_in in data]
        
        # Batch Add Objects
        db.add_all(db_objs)
        
        # Submission of transactions
        await ()
        
        return True
    except SQLAlchemyError as e:
        print(e)
        await ()  # Ensure that transactions are rolled back in the event of an error
        return False

Code Description:

  1. Converting a DTO to a model instance: Use[(**obj_in.model_dump()) for obj_in in data] commander-in-chief (military)data The list of DTOs is converted to a list of model instances.
  2. Batch Add Objects: Usedb.add_all(db_objs) Batch add objects to a database session.
  3. Submission of transactions: Useawait () Commit transactions asynchronously.
  4. Exception handling: CaptureSQLAlchemyError exception, use theawait () Rollback transactions to ensure consistency of database state in the event of an error.

This approach ensures that batch import operations are performed correctly in an asynchronous environment and that possible exceptions are handled.

 

2. In SQLAlchemyselect(...).where(...) respond in singingselect(...).filter(...)discrepancy

In SQLAlchemy.select(...).where(...) cap (a poem)select(...).filter(...) are both used to construct query conditions, but they have some subtle differences and application scenarios.

1. where(...)

  • definewhere is the name of the SQLAlchemyselect object's methods for adding conditions to the query.
  • usagequery = select().where( == id)
  • descriptivewhere method is used to specify the SQLWHERE condition of the clause. In most cases, its behavior is similar to that of thefilter is equivalent.

2. filter(...)

  • definefilter is the name of the SQLAlchemyQuery object's methods for adding conditions to the query.
  • usagequery = select().filter( == id)
  • descriptivefilter method is also used to specify the SQLWHERE condition of the clause. It is often used in more complex query construction, especially in ORM queries.

Key differences

  • (textual) contextwhere beselect object that is typically used to build SQL queries (SQLAlchemy Core). Whilefilter beQuery object, which is normally used for ORM queries (SQLAlchemy ORM). However, in SQLAlchemy 2.0+, theselect cap (a poem)filter of use becomes more consistent.

  • meaning of words: When using SQLAlchemy Core.where It is more explicit that you are adding the SQL statement in theWHERE clause. In an ORM query.filter does something similar, but it provides more ORM-related functionality.

utilizationwhere example (SQLAlchemy Core).

from  import select
from  import AsyncSession

async def get(self, id: int, db: AsyncSession) -> Optional[ModelType]:
    query = select().where( == id)
    result = await (query)
    return ().first()

utilizationfilter Examples of SQLAlchemy ORM.

from  import sessionmaker

async def get(self, id: int, db: AsyncSession) -> Optional[ModelType]:
    query = select().filter( == id)
    result = await (query)
    return ().first()

summarize

  • In SQLAlchemy Corewhere is the standard method for constructing query terms.
  • In SQLAlchemy ORMfilter is used to construct query conditions, but in Core, thefilter of relatively little use.

In SQLAlchemy 2.0 and later.select (used form a nominal expression)where cap (a poem)filter usage is becoming more and more consistent, and you can choose one or the other depending on your habits and needs. In practice, which method you choose usually depends on your code context and personal preferences.

 

3、model_dump(exclude_unset=True) and model_dump(skip_defaults=True) What is the difference?

model_dump(exclude_unset=True) cap (a poem)model_dump(skip_defaults=True) are serialization methods for working with model instances that have slightly different uses and behaviors. Both methods are typically used to convert model instances into dictionaries for further processing or transmission.

model_dump(exclude_unset=True)

exclude_unset=True is an option, typically used in serialization methods, that indicates the exclusion of fields that are not set when converting a model instance to a dictionary.

  • functionality: Exclude all fields that are not explicitly set (i.e., use default values).
  • Usage Scenarios: Applies to fields that need to be ignored if they have not been set by the user to avoid including default values in the output.
# Assuming that the model has fields 'name' and 'age' and 'age' uses the default values
model_instance = MyModel(name='Alice', age=25)
# If age defaults to 0, exclude_unset=True will only include 'name'.
serialized_data = model_instance.model_dump(exclude_unset=True)

model_dump(skip_defaults=True)

skip_defaults=True is another option that indicates to exclude fields that use default values when converting model instances to dictionaries.

  • functionality: Exclude all fields whose values are equal to their default values.
  • Usage Scenarios: Applies to fields that need to be excluded that are explicitly set to default values to reduce redundant information in the output.
# Assuming that the model has fields 'name' and 'age' and 'age' uses the default values
model_instance = MyModel(name='Alice', age=25)
# If age defaults to 0, skip_defaults=True will only include 'name'.
serialized_data = model_instance.model_dump(skip_defaults=True)

The main difference

  • exclusionary condition

    • exclude_unset=True Exclude fields that are not explicitly set in the model instance (i.e., field values are default or not assigned).
    • skip_defaults=True Exclude fields whose field values are equal to their default values.
  • Applicable Scenarios

    • utilizationexclude_unset=True time, the goal is to exclude fields that were not explicitly assigned during instantiation, which is often used to avoid including fields that have not yet been configured.
    • utilizationskip_defaults=True time, the goal is to remove fields that are explicitly set to default values to avoid outputting unnecessary information.

 

4. Use the **kwargs parameter to implement data soft deletion processing in the interface

For example, in our delete interface, if we pass thekwargs parameter, a soft delete (updating the record) is performed, otherwise a hard delete (deleting the record) is performed.

    async def delete_byid(self, id: PrimaryKeyType, db: AsyncSession, **kwargs) -> bool:
        """Deleting an object based on its primary key

        :param kwargs: for soft deletion only
"""
        if not kwargs:
            result = await (sa_delete().where( == id))
        else:
            result = await (
                sa_update().where( == id).values(**kwargs)
            )

        await ()
        return  > 0

The example code is shown below.

# sample model
from  import declarative_base
from sqlalchemy import Column, Integer, String, Boolean

Base = declarative_base()

class Customer(Base):
    __tablename__ = 'customer'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    is_deleted = Column(Boolean, default=False)

# Sample Use
async def main():
    async with AsyncSession(engine) as session:
        controller = BaseController(Customer)
        
        # hardcore deletion
        result = await controller.delete_byid(1, session)
        print(f"Hard delete successful: {result}")
        
        # soft delete
        result = await controller.delete_byid(2, session, is_deleted=True)
        print(f"Soft delete successful: {result}")

# Ensure that the main program is run
import asyncio
if __name__ == "__main__":
    (main())

caveat

  1. Model Definition: Make sure your model containsis_deleted field and the field name is correct.

  2. Transfer parameters: In the case of a call to thedelete_byid method when correctly passing thekwargs parameters. For example, if you want to do a soft delete, you can pass theis_deleted=True

  3. debug output: You can add some debugging output (e.g.print(kwargs)) to ensure that the parameters were passed correctly.

# Example Hard Delete Call
await controller.delete_byid(1, session)

# Example Soft Delete Call
await controller.delete_byid(2, session, is_deleted=True)

If ouris_deleted field is of type Int, as shown below, then the processing is different

class Customer(Base):
    __tablename__ = "t_customer"

    id = Column(String, primary_key=True, comment="primary key")
    name = Column(String, comment="name and surname")
    age = Column(Integer, comment="(a person's) age")
    creator = Column(String, comment="founder")
    createtime = Column(DateTime, comment="Creation time")
    is_deleted = Column(Integer, comment="Delete or not")

operating code

        # hardcore deletion
        result = await controller.delete_byid("1", session)
        print(f"Hard delete successful: {result}")
        
        # soft delete
        result = await controller.delete_byid("2", session, is_deleted=1)
        print(f"Soft delete successful: {result}")

caveat

  1. Model Definition: YoursCustomer The model definition looks correct, make sure all the fields and comments meet your requirements.

  2. Hard and soft deletion

    • hardcore deletion: Delete records directly from the database.
    • soft delete: By updating theis_deleted field to mark the record as deleted, rather than actually deleting the record.
  3. Passing parameters correctly

    • When hard deleting, no additional parameters need to be passed.
    • When soft deleting, pass theis_deleted=1 as a parameter.

By ensuring that the parameters are passed correctly and that the model contains the correct fields, you should be able to perform soft and hard delete operations correctly.

 

5、Python processing interface, Iterable and List what is the difference between

In Python, theIterable cap (a poem)List are two different concepts that have their own characteristics and uses:

Iterable

Iterable is a broader concept that refers to any object that can return an iterator. An iterator is an object that implements the__iter__() method of an object that can return elements one by one. Almost all container types (such as lists, tuples, dictionaries, collections, etc.) are iterable. To check if an object is iterable, use the to conduct the inspection.

specificities

  • universalityIterable is a generic interface that indicates that objects can be iterated.
  • inert (chemistry): someIterable may be computationally inert (e.g., generators), i.e., they do not compute all elements immediately, but generate elements on demand.
  • typical example: List (List), tuple (Tuple), dictionaries (Dict), the set (Set), generators (Generator), etc. are iterable objects.
from  import Iterable

print(isinstance([1, 2, 3], Iterable))  # True
print(isinstance((1, 2, 3), Iterable))  # True
print(isinstance({1, 2, 3}, Iterable))  # True
print(isinstance({'a': 1}, Iterable))   # True
print(isinstance((x for x in range(3)), Iterable))  # True

List

List is a concrete container type in Python that represents an ordered collection of elements that can contain duplicates. It is one of the most commonly used variable sequence types, and supports index access, slicing operations, and a variety of other ways to manipulate elements in lists.

specificities

  • concrete realizationList is a concrete type that represents a dynamic array that can store multiple objects.
  • successive: The list maintains the insertion order of the elements.
  • changeable: Elements in the list can be modified (e.g. added, deleted, updated).
  • typical example[1, 2, 3] is a list.
my_list = [1, 2, 3]
print(my_list)  # [1, 2, 3]
my_list.append(4)  # [1, 2, 3, 4]
my_list[0] = 10  # [10, 2, 3, 4]

To summarize:

  • Iterable: A broad concept denoting objects that can be iterated, not necessarily specific data structures. For example, a generator is iterable but not a list.
  • List: A concrete container type that is an ordered variable collection. A list isIterable One realization of this is that not allIterable It's all about the list.

Iterable is an abstract concept, and theList is a concrete implementation. You can find the implementation in theList on top of using many operations and methods to process the data, while theIterable The main concern is whether iteration is possible.

Therefore the handling of receiving combinations is a bit more generalized as we can use the Iterable interface.

    async def create_range(
        self, obj_in_list: Iterable[DtoType], db: AsyncSession
    ) -> bool:
        """Batch Create Objects"""
        try:
            # Converting DTOs to Model Instances
            db_objs = [(**obj_in.model_dump()) for obj_in in obj_in_list]

            # Batch add to database
            db.add_all(db_objs)
            await ()
            return True
        except SQLAlchemyError as e:
            print(e)
            await ()  # Ensure that transactions are rolled back in the event of an error
            return False

The above is a summary of some minor problems when using sqlalchemy to manipulate databases in Python for your reference.