using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
public class BasicMessageBus
{
private static readonly BasicMessageBus _instance = new();
public static BasicMessageBus Inst => _instance;
private readonly Subject<object> _messages = new();
public IObservable<T> Subscribe<T>() => _messages.OfType<T>();
public void Send(object message) => _messages.OnNext(message);
}
1
hez2010 15 天前
没有看到哪里有内存泄露的风险。
调用 Subscribe 的方法所在的对象如果被回收了那对 _messages 的引用也就自动没了,除非你是在哪个具有 static 生命周期的对象中调用了 Subscribe 。 |
2
coder001 12 天前
进程内队列? 为啥不用 Channel
https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.channels.channel?view=net-8.0 另外,如果是事件总线,可以考虑引入泛型之类的花样类型匹配订阅筛选器 这是自用的事件总线实现,目前大规模用在工作生产环境和玩具项目,未发现明显性能瓶颈 IEventBus.cs ``` public interface IEventBus { bool Subscript<T>(Action<T> callBack); bool UnSubscript<T>(Action<T> callBack); bool Publish<T>(); bool Publish<T>(T obj); } ``` AnyPublishEvent.cs ``` /// <summary> /// 任何事件发布,用于统计或通配 /// </summary> [DisplayName("*")] public record AnyPublishEvent(Type Type, object? Obj); ``` InProcessEventBusBase.cs ``` public abstract class InProcessEventBusBase(ILogger<InProcessEventBusBase> logger) : IEventBus { private readonly Dictionary<Type, HashSet<Delegate>> _dicTypeToHandlers = []; public bool Subscript<T>(Action<T> callBack) { var type = typeof(T); lock (_dicTypeToHandlers) { if (!_dicTypeToHandlers.TryGetValue(type, out var handlers)) { handlers = _dicTypeToHandlers[type] = []; } return handlers.Add(callBack); // 忽略重复 } } public bool UnSubscript<T>(Action<T> callBack) { lock (_dicTypeToHandlers) { if (_dicTypeToHandlers.TryGetValue(typeof(T), out var handlers)) { var unSubscript = handlers.Remove(callBack); if (handlers.Count == 0) _dicTypeToHandlers.Remove(typeof(T)); return unSubscript; } return false; } } public bool Publish<T>() { PublishInternal(new AnyPublishEvent(typeof(T), default)); return PublishInternal<T?>(default); } public bool Publish<T>(T obj) { PublishInternal(new AnyPublishEvent(typeof(T), obj)); return PublishInternal(obj); } private bool PublishInternal<T>(T eventValue) { var type = typeof(T); Delegate[] subscripts; lock (_dicTypeToHandlers) { if (!_dicTypeToHandlers.TryGetValue(type, out var handlers)) return false; subscripts = [.. handlers]; } foreach (var del in subscripts) { try { ((Action<T>)del)(eventValue); } catch (Exception e) { logger.LogError(e, nameof(Publish)); } } return true; } } ``` |
3
coder001 12 天前
(看来似乎回帖没有代码格式支持,而且 gist 连接展开的特性似乎也没了,凑合看吧🌚)
|