Implementing a simple messenger component for WPF, UWP and Xamarin

The original requirement is that I need to develop a WPF application which implements the socket sender/receiver. First, I created a basic WPF application with MVVM pattern. Then I made a single project to do all the stuff related to socket communication. Next I have to integrate the socket library into the ViewModel project to operate socket connections.

Obviously, we can use event for this purpose. For example, we could have a class named SocketServer that has an event to receive socket packages, then subscribe to it in the ViewModel layer. But that means we have to create the instance of the SocketServerclass, which couples the ViewModel layer with the socket project. I hope to create a mediator to decouple them. So the publisher and subscriber do not need to know each other.

When I use MvvmCross as the MVVM Framework, I found that MvvmCross provides a plugin named Messenger to communicate between ViewModels. But it has dependencies on some MvvmCross libraries, which means if I want to use this plugin in other projects, I have to reference MvvmCross. It is not ideal for my current scenario because actually, the socket project has no requirements to reference MvvmCross. So I made a project that focuses on sub-pub pattern and removed the dependencies to MvvmCross. Now I can reuse it in any WPF, UWP and Xamarin projects. The project is available here: https://github.com/yanxiaodi/CoreMessenger. Let us dive into it for more details.

Sub-Pub pattern

Message

Message is an abstract class to represent a message in this system:

1
2
3
4
5
6
7
8
public abstract class Message
{
public object Sender { get; private set; }
protected Message(object sender)
{
Sender = sender ?? throw new ArgumentNullException(nameof(sender));
}
}

We should create instances of different messages derived from this abstract class. It has an argument called sender, so the subscriber is able to get the instance of the sender. But it is not mandatory.

Subscriptions

BaseSubscription is the base class for subscriptions. The code looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract class BaseSubscription
{
public Guid Id { get; private set; }
public SubscriptionPriority Priority { get; private set; }
public string Tag { get; private set; }
public abstract Task<bool> Invoke(object message);
protected BaseSubscription(SubscriptionPriority priority, string tag)
{
Id = Guid.NewGuid();
Priority = priority;
Tag = tag;
}
}

It has an Id property and a tag property so you could put some tags to differentiate or group the instances of subscriptions. The Priority property is an enum type which is used to indicate the priority of the subscription so the subscriptions will be invoked by the expected order.

There are two types of Subscriptions. One is StrongSubscription:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class StrongSubscription<TMessage> : BaseSubscription where TMessage : Message
{
private readonly Action<TMessage> _action;

public StrongSubscription(Action<TMessage> action,
SubscriptionPriority priority, string tag): base(priority, tag)
{
_action = action;
}
public override async Task<bool> Invoke(object message)
{
var typedMessage = message as TMessage;
if (typedMessage == null)
{
throw new Exception($"Unexpected message {message.ToString()}");
}
await Task.Run(() => _action?.Invoke(typedMessage));
return true;
}
}

It inherits BaseSubscription and overrides the Invoke() method. Basically, it has a field named _action which is defined when you create the instance. When we publish the message, the subscription will call Invoke() method to execute the action. We use Task to wrap the action so we could leverage the benefit of asynchronous operations.

Here is another type of Subscription named WeakSubscription:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class WeakSubscription<TMessage> : BaseSubscription where TMessage : Message
{
private readonly WeakReference<Action<TMessage>> _weakReference;

public WeakSubscription(Action<TMessage> action,
SubscriptionPriority priority, string tag) : base(priority, tag)
{
_weakReference = new WeakReference<Action<TMessage>>(action);
}

public override async Task<bool> Invoke(object message)
{
var typedMessage = message as TMessage;
if (typedMessage == null)
{
throw new Exception($"Unexpected message {message.ToString()}");
}
Action<TMessage> action;
if (!_weakReference.TryGetTarget(out action))
{
return false;
}
await Task.Run(() => action?.Invoke(typedMessage));
return true;
}
}

The difference here is that the action is stored in a WeakReference field. You could learn more here: WeakReference Class . It is used to represent a typed weak reference, which references an object while still allowing that object to be reclaimed by Garbage Collection. Before we use it, we need to check if the target has been collected by GC by using TryGetTarget(T) method. If this method returns false, that means the reference has been collected by GC.

If you use StrongSubscription, the Messenger will keep the strong reference to the callback method and Garbage Collection will not destroy the subscription. In this case, you need to unsubscribe the subscription explicitly to avoid memory leaking. Otherwise, you could use WeakSubscription to remove the subscription when objects go out of scope.

MessengerHub

MessengerHub is a singleton instance in the whole application domain. We don’t need to use Dependency Injection to create the instance because its purpose is explicit and we only have this one instance. Here is an easy way to implement the singleton pattern:

1
2
3
4
5
6
7
8
9
10
11
12
public class MessengerHub
{
private static readonly Lazy<MessengerHub> lazy = new Lazy<MessengerHub>(() => new MessengerHub());
private MessengerHub() { }
public static MessengerHub Instance
{
get
{
return lazy.Value;
}
}
}

MessengerHub maintain a Dictionary to keep the instances of the subscriptions, as shown below:

1
2
private readonly ConcurrentDictionary<Type, ConcurrentDictionary<Guid, BaseSubscription>> _subscriptions =
new ConcurrentDictionary<Type, ConcurrentDictionary<Guid, BaseSubscription>>();

The key of the dictionary is the Type of the Message and the value is a Dictionary that contains a set of subscriptions for this specific Message. Obviously, one type could have multiple subscriptions.

Subscribe

MessageHub exposes several important methods to subscribe/unsubscribe/publish messages.

The Subscribe() method is shown below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public SubscriptionToken Subscribe<TMessage>(Action<TMessage> action,
ReferenceType referenceType = ReferenceType.Weak,
SubscriptionPriority priority = SubscriptionPriority.Normal, string tag = null) where TMessage : Message
{
if (action == null)
{
throw new ArgumentNullException(nameof(action));
}
BaseSubscription subscription = BuildSubscription(action, referenceType, priority, tag);
return SubscribeInternal(action, subscription);
}

private SubscriptionToken SubscribeInternal<TMessage>(Action<TMessage> action, BaseSubscription subscription)
where TMessage : Message
{
if (!_subscriptions.TryGetValue(typeof(TMessage), out var messageSubscriptions))
{
messageSubscriptions = new ConcurrentDictionary<Guid, BaseSubscription>();
_subscriptions[typeof(TMessage)] = messageSubscriptions;
}
messageSubscriptions[subscription.Id] = subscription;
return new SubscriptionToken(subscription.Id, async () => await UnsubscribeInternal<TMessage>(subscription.Id), action);
}

When we subscribe to the message, we create an instance of the Subscription and add it to the dictionary. It might be a strong reference or weak reference - depending on your choice. Then it will create a SubscriptionToken, which is a class that implements IDisposable interface to manage the subscriptions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public sealed class SubscriptionToken : IDisposable
{
public Guid Id { get; private set; }
private readonly Action _disposeMe;
private readonly object _dependentObject;

public SubscriptionToken(Guid id, Action disposeMe, object dependentObject)
{
Id = id;
_disposeMe = disposeMe;
_dependentObject = dependentObject;
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

private void Dispose(bool isDisposing)
{
if (isDisposing)
{
_disposeMe();
}
}
}

When we create the instance of SubscriptionToken, actually we pass a method to dispose itself - so when Dispose method is invoked, it will unsubscribe the subscription first.

Unsubscribe

The method to unsubscribe the message is shown below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public async Task Unsubscribe<TMessage>(SubscriptionToken subscriptionToken) where TMessage : Message
{
await UnsubscribeInternal<TMessage>(subscriptionToken.Id);
}
private async Task UnsubscribeInternal<TMessage>(Guid subscriptionId) where TMessage : Message
{
if (_subscriptions.TryGetValue(typeof(TMessage), out var messageSubscriptions))
{
if (messageSubscriptions.ContainsKey(subscriptionId))
{
var result = messageSubscriptions.TryRemove(subscriptionId, out BaseSubscription value);
}
}
}

It is quite straightforward. When we unsubscribe the message, the subscription will be removed from the dictionary.

Publish

Well, we have subscribed the message and created instances of subscriptions that are stored in the dictionary. We can publish messages now. The method to publish messages are shown as below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public async Task Publish<TMessage>(TMessage message) where TMessage : Message
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
List<BaseSubscription> toPublish = null;
Type messageType = message.GetType();

if (_subscriptions.TryGetValue(messageType, out var messageSubscriptions))
{
toPublish = messageSubscriptions.Values.OrderByDescending(x => x.Priority).ToList();
}

if (toPublish == null || toPublish.Count == 0)
{
return;
}

List<Guid> deadSubscriptionIds = new List<Guid>();
foreach (var subscription in toPublish)
{
// Execute the action for this message.
var result = await subscription.Invoke(message);
if (!result)
{
deadSubscriptionIds.Add(subscription.Id);
}
}

if (deadSubscriptionIds.Any())
{
await PurgeDeadSubscriptions(messageType, deadSubscriptionIds);
}
}

When we publish a message, MessageHub will query the dictionary to retrieve the subscriptions for this message, then execute the actions in a loop.

Another thing we need to note is that because some subscriptions might be weak references so we need to check the result of the execution. If it failed, we need to remove it from the subscriptions.

Usage

Install from NuGet:

1
PM> Install-Package FunCoding.CoreMessenger

Use MessengerHub.Instance as the singleton pattern in your whole app domain. It provides these methods:

  • Publish:
1
public async Task Publish<TMessage>(TMessage message)
  • Subscribe:
1
public SubscriptionToken Subscribe<TMessage>(Action<TMessage> action, ReferenceType referenceType = ReferenceType.Weak, SubscriptionPriority priority = SubscriptionPriority.Normal, string tag = null)
  • Unsubscribe:
1
public async Task Unsubscribe<TMessage>(SubscriptionToken subscriptionToken)

Creating the Message class

First, define a Message class inherited from Message between different components, like this:

1
2
3
4
5
6
7
8
public class TestMessage : Message
{
public string ExtraContent { get; private set; }
public TestMessage(object sender, string content) : base(sender)
{
ExtraContent = content;
}
}

Then create an instance of the Message in your component A, as shown below:

1
var message = new TestMessage(this, "Test Content");

Subscription

Define a SubscriptionToken instance to store the subscription. Subscribe the Message in your component B, like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class HomeViewModel
{
private readonly SubscriptionToken _subscriptionTokenForTestMessage;
public HomeViewModel()
{
_subscriptionTokenForTestMessage =
MessengerHub.Instance.Subscribe<TestMessage>(OnTestMessageReceived,
ReferenceType.Weak, SubscriptionPriority.Normal);
}

private void OnTestMessageReceived(TestMessage message)
{
#if DEBUG
System.Diagnostics.Debug.WriteLine($"Received messages of type {message.GetType().ToString()}. Content: {message.Content}");
#endif
}
}

Publishing the Message

Publish the Message in your component A:

1
2
3
4
public async Task PublishMessage()
{
await MessengerHub.Instance.Publish(new TestMessage(this, $"Hello World!"));
}

All done!

Parameters

The full signature of the Subscribe method is:

1
2
3
public SubscriptionToken Subscribe<TMessage>(Action<TMessage> action,
ReferenceType referenceType = ReferenceType.Weak,
SubscriptionPriority priority = SubscriptionPriority.Normal, string tag = null) where TMessage : Message

You can specify these parameters:

  • ReferenceType. The default value is ReferenceType.Weak so you do not need to worry about the memory leaking. Once the SubscriptionToken instance goes out of the scope, GC can collect it automatically(But not sure when). If you need to keep a strong reference, specify the parameter as ReferenceType.Strong so that GC cannot collect it.
  • SubscriptionPriority. The default value is SubscriptionPriority.Normal. Sometimes it is required to control the execution orders of the subscriptions for one Message. In this case, specify different priorities for the subscriptions to control the execution orders. Notice that this parameter is not for different Messages.
  • Tag. It is optional to inspect the current status for subscriptions.

Unsubscribe

You can use these methods to unsubscribe the subscription:

  • Use Unsubscribe method, as shown below:

    1
    await MessengerHub.Instance.Unsubscribe<TestMessage>(_subscriptionTokenForTestMessage);
  • Use Dispose method of the SubscriptionToken:

    1
    _subscriptionTokenForTestMessage.Dispose();

In many scenarios, you will not call these methods directly. If you are using the strong subscription type, it might cause memory leaking issue. So ReferenceType.Weak is recommended. Be aware that if the token is not stored in the context, it might be collected by GC immediately. For example:

1
2
3
4
5
6
7
8
9
public void MayNotEverReceiveAMessage()
{
var token = MessengerHub.Instance.Subscribe<TestMessage>((message) => {
// Do something here
});
// token goes out of scope now
// - so will be garbage collected *at some point*
// - so the action may never get called
}

Differences with MvvmCross.Messenger

If you are using MvvmCross to develop your application, please use MvvmCross.Messenger directly. I extracted some main methods and removed dependencies to MvvmCross components so it can be used in any WPF, UWP and Xamarin projects without MvvmCross. Also, the Publish method is always running in the background to avoid blocking the UI. But you should be aware of when you need to return to UI thread - especially when you need to interact with the UI controls. Another difference is that no need to use DI to create the instance of MessageHub which is a singleton instance in all the app domain. It is useful if the solution contains multiple components that need to communicate with each other. DI would make it more complicated.

Thanks