C# 发布-订阅模式

#CSharp #PubSub
发布-订阅模式
有些时候,发布-订阅模式也会被称为观察者模式,但其实,如果细分的话,这两者之间还是有些细微的差别的。
观察者模式是在主题者(Subject)内部,维护一个观察者(Observer)列表。然后当主题者的状态发生变更的时候,主题者可以通知观察者发生了变化。观察者可以主动访问主题者,获得变更。也可以在通知观察者的时候,直接带上变更的消息。
在观察者模式中,主题者和观察者还是存在部分的耦合。因此,我们可以引入一个中介,来接触这种耦合。
类 A(发布者)发生了变化,将这个变化以消息或者事件的形式,发送给中介类,然后中介类将这个消息,转发给订阅这个消息的类 B(订阅者),这样发布者和订阅者并不直接产生依赖,他们都仅依赖于中介类(这个类可以是一个消息队列,也可以是你自定义的一个类型)。
代码
发布者基类:
public class EventPublisher
{
private CustomEventHub EventHub { get; }
public EventPublisher()
{
EventHub = CustomEventHub.GetInstance();
}
public void PublishEvent<T>(EventType eventType, T value)
{
EventHub.Publish(new CustomEvent { Type = eventType, Value = value});
}
}
订阅者基类:
public class EventSubscriber
{
private CustomEventHub EventHub { get; }
public EventSubscriber()
{
EventHub = CustomEventHub.GetInstance();
}
public void Subscribe(EventType eventType, Action<CustomEvent> action)
{
EventHub.AddSubscriber(eventType, action);
}
}
中介类:(为保证只有一个,这里额外加入了单例模式)
public class CustomEventHub
{
private static readonly CustomEventHub Instance = new();
private readonly Dictionary<EventType, IList<Action<CustomEvent>>> _eventTypeAndSubscribers = new();
public static CustomEventHub GetInstance() => Instance;
private CustomEventHub() { }
public void Publish(CustomEvent customEvent)
{
var eventType = customEvent.Type;
if (!_eventTypeAndSubscribers.ContainsKey(eventType)) { return; }
var subscribers = _eventTypeAndSubscribers[eventType];
foreach (var subscriberAction in subscribers)
{
subscriberAction.Invoke(customEvent);
}
}
public void AddSubscriber(EventType eventType, Action<CustomEvent> action)
{
if (_eventTypeAndSubscribers.ContainsKey(eventType))
{
_eventTypeAndSubscribers[eventType].Add(action);
}
else
{
_eventTypeAndSubscribers.Add(eventType, new List<Action<CustomEvent>> { action });
}
}
}
自定义事件:
public class CustomEvent
{
public EventType Type { get; set; }
public object Value { get; set; }
}
事件类型:
public enum EventType
{
TimeChanged
}
实际应用
时间变化时,发出事件通知:
public class TimeProvider : EventPublisher
{
private Timer _timer = null!;
public double Interval { get; set; }
public DateTime CurrentTime { get; set; }
public void Init()
{
CurrentTime = new DateTime(1900, 1, 1);
_timer = new Timer(Interval)
{
AutoReset = true
};
_timer.Elapsed += OnTimeChanged;
}
private void OnTimeChanged(object? sender, ElapsedEventArgs e)
{
Console.Write($"TimeChanged From {CurrentTime :HH:mm:ss}");
CurrentTime = CurrentTime.Add(TimeSpan.FromMilliseconds(Interval));
Console.WriteLine($" To {CurrentTime :HH:mm:ss}");
PublishEvent(EventType.TimeChanged, CurrentTime);
}
public void Start()
{
_timer.Enabled = true;
GC.KeepAlive(_timer);
}
public void Stop()
{
_timer.Enabled = false;
}
}
收到时间变化的事件后,打印一个“Hello World”:
public class SimulatorSystem : EventSubscriber
{
private CustomEventHub _eventHub;
public void Init()
{
Subscribe(EventType.TimeChanged, OnTimeChanged);
}
private void OnTimeChanged(CustomEvent customEvent)
{
if (customEvent.Type != EventType.TimeChanged) { throw new ArgumentException("Invalid event type"); }
Console.WriteLine("Hello World");
}
}



