take
Currently a project in the data persistence using EF Core + MySQL, using CodeFirst pattern development, and the data is divided into libraries, the library will also be dynamically increased during the run, according to the current granularity of the division after the completion of the roughly one to two hundred libraries, each library's data are isolated from each other.
Borrowed from an open source repository on Githubarch/UnitOfWork Implementation of UnitOfWork, the core operation is that each api request with the library name, in the execution of CRUD before the first DbContext switch to the target database, we switch the database when adding some operations, such as checking whether the database has been created, checking whether the connection is available to determine whether the need toTable Structure Migration et al. (and other authors)
/// <summary>
/// Switching databases This requires the databases to be on the same machine Note: This only applies to MySQL.
/// </summary> /// This requires the database to be on the same machine.
/// <param name="database"> target database</param>
public void ChangeDatabase(string database)
{
// Check the connection
......
// Check if the database is created
......
var connection = _context.();
if (())
{
(database); }
}
if (()) { (database); }
} else
var connectionString = ((" ", ""), @"(? <=[Dd]atabase=)\w+(? =;)", database, );
= connectionString;
}
// Determine if a table structure migration needs to be performed
if(_context.. ().Any())
{
// Customize some logic for the migration
_context...(_context);
}
}
But when more than one operation at the same time on a library Migrate, there will be a problem, such as "add a table" operation has been executed by the first migration, the second execution of the migration does not know that the Migrate has been executed, the table will report an error already exists.
So consider in the implementation of Migrate, add a lock mechanism, the current database before the implementation of Migrate to obtain the lock, and then come to a decision on the next operation. Due to this side of the service can not access Redis, here to use the database to realize the lock mechanism, of course, with Redis to achieve better, add the lock mechanism is just a way to solve the problem of ideas.
Implementing Migration Locks with Databases
1. NewMigrationLocks
Table to implement migration locks
- Lock operations do not rely on DbContext instances
- Before executing Migrate, try to acquire a lock, and before acquiring a lock, if the table doesn't exist create the
CREATE TABLE IF NOT EXISTS MigrationLocks ( LockName VARCHAR(255) PRIMARY KEY, LockedAt DATETIME NOT NULL );
- Successful insertion of a row into the table is considered a successful lock acquisition, and the primary key is the name of the library to be migrated.
INSERT INTO MigrationLocks (LockName, LockedAt) VALUES (@database, NOW());
- Deleting this record after the migration is complete is considered a successful lock release;
DELETE FROM MigrationLocks WHERE LockName = @database;
- To prevent "deadlocks" from occurring, the status of locks is checked before each attempt to acquire a lock, and locks that have been released for more than 5 minutes are released (normally, the last migration does not take more than 5 minutes to execute).
SELECT COUNT(*) FROM MigrationLocks WHERE LockName = @database AND LockedAt > NOW() - INTERVAL 5 MINUTE;
2. Wrap up the MigrateLock implementation.
/// <summary>
/// migration lock
/// </summary>
public interface IMigrateLock
{
/// <summary>
/// Attempting to acquire a lock
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
bool TryAcquireLock(IDbConnection connection);
/// <summary>
/// Attempting to acquire a lock
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
Task<bool> TryAcquireLockAsync(IDbConnection connection);
/// <summary>
/// release a lock
/// </summary>
void ReleaseLock(IDbConnection connection);
/// <summary>
/// release a lock
/// </summary>
/// <returns></returns>
Task ReleaseLockAsync(IDbConnection connection);
}
/// <summary>
/// migration lock
/// </summary>
public class MigrateLock : IMigrateLock
{
private readonly ILogger<MigrateLock> _logger;
public MigrateLock(ILogger<MigrateLock> logger)
{
_logger = logger;
}
private const string CreateTableSql = @"
CREATE TABLE IF NOT EXISTS MigrationLocks (
LockName VARCHAR(255) PRIMARY KEY,
LockedAt DATETIME NOT NULL
);";
private const string CheckLockedSql = "SELECT COUNT(*) FROM MigrationLocks WHERE LockName = @database AND LockedAt > NOW() - INTERVAL 5 MINUTE;";
private const string AcquireLockSql = "INSERT INTO MigrationLocks (LockName, LockedAt) VALUES (@database, NOW());";
private const string ReleaseLockSql = "DELETE FROM MigrationLocks WHERE LockName = @database;";
/// <summary>
/// Attempting to acquire a lock
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
public bool TryAcquireLock(IDbConnection connection)
{
try
{
CheckLocked(connection);
var result = (AcquireLockSql, new { database = });
if (result == 1)
{
_logger.LogInformation("Lock acquired: {LockName}", );
return true;
}
_logger.LogWarning("Failed to acquire lock: {LockName}", );
return false;
}
catch (Exception ex)
{
if (("Duplicate"))
{
_logger.LogWarning("Failed acquiring lock due to duplicate entry: {LockName}", );
}
else
{
_logger.LogError(ex, "Error acquiring lock: {LockName}", );
}
return false;
}
}
/// <summary>
/// Attempting to acquire a lock
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
public async Task<bool> TryAcquireLockAsync(IDbConnection connection)
{
try
{
await CheckLockedAsync(connection);
var result = await (AcquireLockSql, new { database = });
if (result == 1)
{
_logger.LogInformation("Lock acquired: {LockName}", );
return true;
}
_logger.LogWarning("Failed to acquire lock: {LockName}", );
return false;
}
catch (Exception ex)
{
if (("Duplicate"))
{
_logger.LogWarning("Failed acquiring lock due to duplicate entry: {LockName}", );
}
else
{
_logger.LogError(ex, "Error acquiring lock: {LockName}", );
}
return false;
}
}
/// <summary>
/// release a lock
/// </summary>
public void ReleaseLock(IDbConnection connection)
{
try
{
(ReleaseLockSql, new { database = });
_logger.LogInformation("Lock released: {LockName}", );
}
catch (Exception ex)
{
_logger.LogError(ex, "Error releasing lock: {LockName}", );
}
}
/// <summary>
/// release a lock
/// </summary>
public async Task ReleaseLockAsync(IDbConnection connection)
{
try
{
await (ReleaseLockSql, new { database = });
_logger.LogInformation("Lock released: {LockName}", );
}
catch (Exception ex)
{
_logger.LogError(ex, "Error releasing lock: {LockName}", );
}
}
/// <summary>
/// Check locks
/// </summary>
private void CheckLocked(IDbConnection connection)
{
(CreateTableSql);
var databaseParam = new
{
database =
};
var lockExists = <int>(CheckLockedSql, databaseParam);
if (lockExists <= 0)
{
return;
}
_logger.LogWarning("Lock exists and is older than 5 minutes. Releasing old lock.");
(ReleaseLockSql, databaseParam);
}
/// <summary>
/// Check locks
/// </summary>
private async Task CheckLockedAsync(IDbConnection connection)
{
await (CreateTableSql);
var databaseParam = new
{
database =
};
var lockExists = await <int>(CheckLockedSql, databaseParam);
if (lockExists <= 0)
{
return;
}
_logger.LogWarning("Lock exists and is older than 5 minutes. Releasing old lock.");
await (ReleaseLockSql, databaseParam);
}
}
3. Wrap up the MigrateExecutor implementation.
/// <summary>
/// Database Migration Enforcer
/// </summary>
public interface IMigrateExcutor
{
/// <summary>
/// Perform migration
/// </summary>
/// <param name="dbContext"></param>
void Migrate(DbContext dbContext);
/// <summary>
/// Perform migration
/// </summary>
/// <param name="dbContext"></param>
/// <returns></returns>
Task MigrateAsync(DbContext dbContext);
/// <summary>
/// 并发场景Perform migration
/// </summary>
/// <param name="dbContext"></param>
/// <param name="wait">Whether to wait until the ongoing migration is complete</param>
void ConcurrentMigrate(DbContext dbContext, bool wait = true);
/// <summary>
/// 并发场景Perform migration
/// </summary>
/// <param name="dbContext"></param>
/// <param name="wait">Whether to wait until the ongoing migration is complete</param>
/// <returns></returns>
Task ConcurrentMigrateAsync(DbContext dbContext, bool wait = true);
/// <summary>
/// 并发场景Perform migration
/// </summary>
/// <param name="dbContext"></param>
/// <param name="connection"></param>
/// <param name="wait">Whether to wait until the ongoing migration is complete</param>
void ConcurrentMigrate(DbContext dbContext, IDbConnection connection, bool wait = true);
/// <summary>
/// 并发场景Perform migration
/// </summary>
/// <param name="dbContext"></param>
/// <param name="connection"></param>
/// <param name="wait">Whether to wait until the ongoing migration is complete</param>
Task ConcurrentMigrateAsync(DbContext dbContext, IDbConnection connection, bool wait = true);
}
/// <summary>
/// Database Migration Enforcer
/// </summary>
public class MigrateExcutor : IMigrateExcutor
{
private readonly IMigrateLock _migrateLock;
private readonly ILogger<MigrateExcutor> _logger;
public MigrateExcutor(
IMigrateLock migrateLock,
ILogger<MigrateExcutor> logger)
{
_migrateLock = migrateLock;
_logger = logger;
}
/// <summary>
/// Perform migration
/// </summary>
/// <param name="dbContext"></param>
/// <returns></returns>
public void Migrate(DbContext dbContext)
{
try
{
if (().Any())
{
();
}
}
catch (Exception e)
{
_logger.LogError(e, "Migration failed");
HandleError(dbContext, e);
}
}
/// <summary>
/// Perform migration
/// </summary>
/// <param name="dbContext"></param>
/// <returns></returns>
public async Task MigrateAsync(DbContext dbContext)
{
try
{
if ((await ()).Any())
{
await ();
}
}
catch (Exception e)
{
_logger.LogError(e, "Migration failed");
await HandleErrorAsync(dbContext, e);
}
}
/// <summary>
/// 并发场景Perform migration
/// </summary>
/// <param name="dbContext"></param>
/// <param name="wait">Whether to wait until the ongoing migration is complete</param>
/// <returns></returns>
public void ConcurrentMigrate(DbContext dbContext, bool wait = true)
{
if (!().Any())
{
return;
}
using var connection = (().Database);
ConcurrentMigrate(dbContext, connection, wait);
}
/// <summary>
/// 并发场景Perform migration
/// </summary>
/// <param name="dbContext"></param>
/// <param name="wait">Whether to wait until the ongoing migration is complete</param>
/// <returns></returns>
public async Task ConcurrentMigrateAsync(DbContext dbContext, bool wait = true)
{
if ((await ()).Any())
{
return;
}
await using var connection = await (().Database);
await ConcurrentMigrateAsync(dbContext, connection, wait);
}
/// <summary>
/// 并发场景Perform migration
/// </summary>
/// <param name="dbContext"></param>
/// <param name="connection"></param>
/// <param name="wait">Whether to wait until the ongoing migration is complete</param>
public void ConcurrentMigrate(DbContext dbContext, IDbConnection connection, bool wait = true)
{
if (!().Any())
{
return;
}
while (true)
{
if (_migrateLock.TryAcquireLock(connection))
{
try
{
Migrate(dbContext);
break;
}
finally
{
_migrateLock.ReleaseLock(connection);
}
}
if (wait)
{
_logger.LogWarning("Migration is locked, wait for 2 seconds");
(20000);
continue;
}
_logger.LogInformation("Migration is locked, skip");
}
}
/// <summary>
/// 并发场景Perform migration
/// </summary>
/// <param name="dbContext"></param>
/// <param name="connection"></param>
/// <param name="wait">Whether to wait until the ongoing migration is complete</param>
public async Task ConcurrentMigrateAsync(DbContext dbContext, IDbConnection connection, bool wait = true)
{
if ((await ()).Any())
{
return;
}
while (true)
{
if (await _migrateLock.TryAcquireLockAsync(connection))
{
try
{
await MigrateAsync(dbContext);
break;
}
finally
{
await _migrateLock.ReleaseLockAsync(connection);
}
}
if (wait)
{
_logger.LogWarning("Migration is locked, wait for 2 seconds");
(20000);
continue;
}
_logger.LogInformation("Migration is locked, skip");
break;
}
}
private void HandleError(DbContext dbContext, Exception e)
{
var needChangeList = ().ToList();
var allChangeList = ().ToList();
var hasChangeList = ().ToList();
if ( + > )
{
int errIndex = - ;
if ( - 1 == errIndex && hasChangeList[errIndex] != needChangeList[0])
{
int index = needChangeList[0].IndexOf("_", );
string errSuffix = needChangeList[0].Substring(index, needChangeList[0].Length - index);
if (hasChangeList[errIndex].EndsWith(errSuffix))
{
($"Update __EFMigrationsHistory set MigrationId = '{needChangeList[0]}' where MigrationId = '{hasChangeList[errIndex]}'");
();
}
else
{
throw e;
}
}
else
{
throw e;
}
}
else
{
throw e;
}
_logger.LogInformation("Migration failed, but success on second try.");
}
private async Task HandleErrorAsync(DbContext dbContext, Exception e)
{
var needChangeList = (await ()).ToList();
var allChangeList = ().ToList();
var hasChangeList = (await ()).ToList();
if ( + > )
{
int errIndex = - ;
if ( - 1 == errIndex && hasChangeList[errIndex] != needChangeList[0])
{
int index = needChangeList[0].IndexOf("_", );
string errSuffix = needChangeList[0].Substring(index, needChangeList[0].Length - index);
if (hasChangeList[errIndex].EndsWith(errSuffix))
{
await ($"Update __EFMigrationsHistory set MigrationId = '{needChangeList[0]}' where MigrationId = '{hasChangeList[errIndex]}'");
await ();
}
else
{
throw e;
}
}
else
{
throw e;
}
}
else
{
throw e;
}
_logger.LogInformation("Migration failed, but success on second try.");
}
}