Location>code7788 >text

Implementing a lightweight local event bus in a .Net Web project Framework

Popularity:832 ℃/2024-09-27 11:30:32

一、事件总线设计方案

1.1、事件总线的概念

  • 事件总线是一个事件管理器,负责统一处理系统中所有事件的发布和订阅。
  • 事件总线模式通过提供一种松耦合的方式来促进系统内部的业务模块之间的通信,从而增强系统的灵活性和可维护性。

1.2、实现的功能目标

  • 注入事件总线服务到DI容器,自动注入整个程序集的事件;
  • 每个事件处理程序能够自动依赖注入;
  • 通过特性标注事件消息模型、事件处理器;
  • 事件总线服务提供一个发布事件的方法,根据消息模型,自动找到并触发对应的事件处理程序,并传递事件参数。

二、使用案例

2.1、事件消息模型

  • 需要继承 EventArgs
public class UserTestEventArgs : EventArgs
{
    public string UserId { get; set; }
    public string UserName { get; set; }
}

2.2、事件处理程序

  • 该事件模型,触发的事件处理程序,会自动依赖注入
[LocalEventHandler(typeof(UserTestEventArgs), 1)]
public class UserTest1EventHandler(SingletonTestService singletonService, ScopeTestService scopeService, TransientTestService transientService) : ILocalEventHandler<UserTestEventArgs>
{
    public Task OnEventHandlerAsync(object sender, UserTestEventArgs e)
    {
        ($"事件1被'{().Name}'触发,参数:" + (e));
        try
        {
            ();
            ();
            ();
        }
        catch (Exception ex)
        {
        }
        return ;
    }
}

2.3、注入事件总线服务到DI

  • 在 或 中,注入服务
(typeof(UserTest1EventHandler).Assembly); // 注入事件总线服务,自动注册这个程序集内的所有事件处理器。

2.4、使用事件总线服务,触发事件

  • 通过构造函数依赖注入,拿到事件总线服务 ILocalEventBus
  • 调用事件总线服务,发布事件消息,触发事件处理程序 (this, args);
// 事件总线测试控制器
public class EventTestController(
    ILocalEventBus eventBus, // 主构造函数,依赖注入事件总线服务
    IUserService userService // 测试服务
    ) : ControllerBase
{
    [HttpPost]
    public Task Test(UserTestEventArgs args) // UserTestEventArgs 事件消息模型
    {
        var users = ();
        return (this, args); // 发布事件消息,触发事件处理程序。 this:触发事件的对象  args:事件消息
    }
}

三、事件总线功能开发

3.1、本地事件总线 服务接口

  • 事件的发布方法设计,基于 .Net 标准事件模式 思想。
  • 这里需要泛型参数:事件消息模型类型,以便在触发事件时可以找到注册的该消息模型对应的事件处理器。
/// <summary>
/// 本地事件总线
/// </summary>
public interface ILocalEventBus
{
    /// <summary>
    /// 触发对应 事件消息模型 对应的 事件处理程序
    /// </summary>
    /// <typeparam name="TEventArgs">事件消息模型类型</typeparam>
    /// <param name="sender">触发事件的对象</param>
    /// <param name="args">事件消息模型</param>
    Task PublishAsync<TEventArgs>(object sender, TEventArgs args) where TEventArgs : EventArgs;
}

3.2、事件处理器 泛型接口

/// <summary>
/// 本地事件 事件处理程序接口
/// </summary>
/// <typeparam name="TEventArgs">事件消息模型</typeparam>
public interface ILocalEventHandler<TEventArgs> where TEventArgs : EventArgs
{
    /// <summary>
    /// 事件处理程序方法
    /// </summary>
    /// <param name="sender">事件触发者</param>
    /// <param name="e">事件消息</param>
    Task OnEventHandlerAsync(object sender, TEventArgs e);
}

3.3、本地事件处理程序 特性

  • 本地事件处理程序 特性 :用于在事件处理器上标注。
  • 【MessageType】 指定该消息处理器,接受的消息模型类型。 在事件触发时,通过消息类型,找到该消息处理器,并调用。
  • 【Sort】如果多个消息处理器,声明接受同一个类型的消息模型。那么当这个类型的消息发布时,会触发这些多个事件处理程序,会通过指定的该触发顺序挨个触发。其中一个事件处理器执行报错,不会影响其他的。
/// <summary>
/// 本地事件处理程序 特性
/// </summary>
[AttributeUsage(, AllowMultiple = false)]
public class LocalEventHandlerAttribute : Attribute
{
    /// <summary>
    /// 事件消息模型类,需要继承EventArgs
    /// </summary>
    public Type MessageType { get; set; }
    /// <summary>
    /// 触发顺序 正序
    /// </summary>
    public int Sort { get; set; }

    /// <summary>
    /// 构造函数
    /// </summary>
    /// <param name="messageType">事件消息类型</param>
    /// <param name="sort">触发顺序 正序</param>
    public LocalEventHandlerAttribute(Type messageType, int sort = 0)
    {
        if(!(typeof(EventArgs)))
        {
            throw new Exception($"【LocalEventBus】The MessageType '{}' can not assignable from '{nameof(EventArgs)}'");
        }
        MessageType = messageType;
        Sort = sort;
    }
}

3.4、事件处理程序信息

/// <summary>
/// 事件处理程序模型
/// </summary>
public class LocalEventHandlerModel
{
    /// <summary>
    /// 触发顺序
    /// </summary>
    public int Sort { get; set; }
    /// <summary>
    /// 事件处理程序类型
    /// </summary>
    public Type HandlerType { get; set; }
}

3.5、事件总线服务

using ;
using ;
using ;

namespace ;

/// <summary>
/// local event bus
/// </summary>
public sealed class LocalEventBus(IHttpContextAccessor httpContextAccessor) : ILocalEventBus
{
    /// <summary>
    /// Event Message Type - A collection of types of corresponding event handlers
    /// </summary>
    private ConcurrentDictionary<Type, ConcurrentBag<LocalEventHandlerModel>> Events = new ConcurrentDictionary<Type, ConcurrentBag<LocalEventHandlerModel>>();

    /// <summary>
    /// trigger correspondence event message model (EMM) corresponding event handler
    /// </summary>
    /// <typeparam name="TEventArgs">event message model (EMM)类型</typeparam>
    /// <param name="sender">The object that triggered the event</param>
    /// <param name="args">event message model (EMM)</param>
    public async Task PublishAsync<TEventArgs>(object sender, TEventArgs args)
        where TEventArgs : EventArgs
    {
        var argType = typeof(TEventArgs);
        if (!(argType, out ConcurrentBag<LocalEventHandlerModel>? handlers))
            return;
        if (handlers == null || == 0)
            return;
        foreach (var handlerModel in (x => ))
        {
            try
            {
                // at this time pass (a bill or inspection etc) DI 和event handler类型 get event handler实例
                var handlerInstance = httpContextAccessor?.HttpContext?.RequestServices?.GetService();
                if (handlerInstance != null)
                {
                    var method = ("OnEventHandlerAsync");
                    Task? task = (Task?)method?.Invoke(handlerInstance, [sender, args]);
                    if (task != null)
                        await task;
                }
            }
            catch (Exception)
            {
            }
        }
    }

    /// <summary>
    /// 向事件总线中添加多个event handler
    /// </summary>
    /// <param name="handlerDic">Event Message Type - correspondingevent handler模型列表 dictionaries</param>
    public void AddHandlers(Dictionary<Type, List<LocalEventHandlerModel>> handlerDic)
    {
        if (handlerDic == null || == 0)
            return;
        foreach (var item in handlerDic)
        {
            if ((, out ConcurrentBag<LocalEventHandlerModel> handlerModels))
            {
                foreach (var value in )
                {
                    (value);
                }
            }
            else
            {
                (, new ConcurrentBag<LocalEventHandlerModel>());
            }
        }
    }

}

3.6. Event Bus Service Dependency Injection Handling

/// <summary>
/// Local Event Bus Service Extensions
/// </summary> /// Local Event Bus Service Extensions
public static class LocalEventBusServiceExtensions
{
    /// <summary> /// LocalEventBusServiceExtensions {
    /// Registering the event bus
    /// Register the event handler classes in the entire assembly with the EventHandler feature with the event bus.
    /// <param name="handlerAssembly"> The assembly in which the event handler resides</param> /// </param>
    /// </summary>
    public static void AddLocalEventBus(this IServiceCollection services, Assembly handlerAssembly)
    {
        var handlerTypes = ().Where(x => <LocalEventHandlerAttribute>() ! = null);
        Dictionary<Type, List<LocalEventHandlerModel>> handlerDic = new();
        foreach (Type handlerType in handlerTypes)
        {
            (new ServiceDescriptor(handlerType, handlerType, )); // Inject the event handler into the container so that the event handler can also use dependency injection like a normal service
            var attribute = <LocalEventHandlerAttribute>();
            if (attribute == null)
                continue;

            var handlerModel = new LocalEventHandlerModel() { Sort = , HandlerType = handlerType }; var handlerModel = new LocalEventHandlerModel() { Sort = , HandlerType = handlerType }; }
            if (())
            {
                handlerDic[].Add(handlerModel);
            }
            else
            {
                (, new List<LocalEventHandlerModel>() { handlerModel }); }
            }
        }
        LocalEventBus? eventBus = ().GetService<ILocalEventBus>() as LocalEventBus; // This method may be called multiple times, single-case processing

        (); // Depend on the Http context, get the scope of the service at each event trigger via HttpContext
        <ILocalEventBus, LocalEventBus>(sp => // Register the event bus as a single instance service.
        {
            IHttpContextAccessor httpContextAccessor = <IHttpContextAccessor>();
            eventBus = eventBus ? new LocalEventBus(httpContextAccessor);
            (handlerDic); // Add the parsed event handler to the event bus
            return eventBus.
        }); }
    }
}