WCF PollingDuplexHttp Services, Silverlight, and the task parallel library-lessons learned

The PollingDuplexHttp binding has been in the Silverlight SDK for a while now, and it’s a great way to have a service that can send messages to a client.  While Silverlight does support sockets and net.tcp, the restriction to ports 4502-4534 can be problematic, since firewall ports may need to be opened on both client machines and in the data center.  With duplex HTTP though, everything goes through port 443 (or 80 if you don’t care about encryption), so all your traffic is likely to just make it through, which helps ease deployment headaches.  If you’re not familiar with it, there are some great blog posts out there that you should check out that describe it in detail.

That being said, the binding isn’t without its issues, and having built some services with the binding I’ve figured out a few things that would have been nice to know going into the process:

A simple duplex service:

To start with, consider a service like this one:

    [ServiceContract(CallbackContract=typeof(IDuplexClient))]
    public interface ITestService
    {
        [OperationContract]
        string GetCurrentTimeString();
        [OperationContract]
        DateTime GetCurrentTime();
        [OperationContract]
        void RequestTimeUpdates();
        [OperationContract]
        void StopTimeUpdates();
    }

    [ServiceContract]
    public interface IDuplexClient
    {
        [OperationContract(IsOneWay = true)]
        void PublishTime(DateTime time);
    }

This service obviously has a couple of methods to get the current time, as well as ones to subscribe and stop time updates.  The callback contract has an event that can be fired to notify that the time has changed.  A sample implementation of the RequestTimeUpdates() method might look like this:

List<IDuplexClient> clients = new List<IDuplexClient>();
public void RequestTimeUpdates()
{
    // Grab the client callback channel. 
    //this works-each client generates a unique callback channel...
    IDuplexClient c= OperationContext.Current.GetCallbackChannel<IDuplexClient>();
    lock (clients)
    {
        if (clients.Contains(c) == false)
            clients.Add(c);
    }
}

In this case, the duplex channel is unique per client proxy, and storing it on a list is one possibility (more on this later).  Another method could do something like this to publish to all registered Silverlight clients:

private void Publish()
{
    lock(clients)
    {
        foreach (IDuplexClient c in clients)
            c.PublishTime(DateTime.Now);
    }
}

Now, at this point the service will work, but there are a couple of questions I started to wonder about.  For example, what happens if a client registers, and then exits without calling your obviously named deregistration method?  As it turns out, if you do this, you’ll get a timeout on the method call, which blocks your service.  To handle that case, you can try something like this:

private void Publish()
{
    lock(clients)
    {
        List<IDuplexClient> toRemove = new List<IDuplexClient>();
        foreach (IDuplexClient c in clients)
        {
            try
            {
                c.PublishTime(DateTime.Now);
            }
            catch (Exception ex)
            {
                toRemove.Add(c);
            }
        }
        foreach (IDuplexClient c in toRemove)
            clients.Remove(c);
    }
}

Yes, this is a bit extreme, but now ANY exception when calling the duplex event will result in the client getting removed from the callback subscriptions, so at most you’ll time out once per dead client (and not time out perpetually).  Of course, now the problem is that if you have, say, 10 clients, and 9 of them exit without unregistering, then these timeouts (which default to 60 seconds) happen sequentially.  A third iteration of the publish method might look like this:

private void Publish()
{
    lock (clients)
    {
        List<IDuplexClient> toRemove = new List<IDuplexClient>();
        Action<IDuplexClient> invokeAction = (callback) =>
        {
            try
            {
                callback.PublishTime(DateTime.Now);
            }
            catch (Exception ex)
            {
                string s = ex.ToString();
                toRemove.Add(callback);
            }
        };
        System.Threading.Tasks.Parallel.ForEach<IDuplexClient>(clients, invokeAction);

        foreach (IDuplexClient r in toRemove)
            clients.Remove(r);
    }
}

Now, by invoking the callbacks in parallel, you can have, at most, one 60 second timeout.  This means that in our previous example, instead of the 10th client having to wait 9 minutes for the event, it should receive its event immediately.  The service, however, will still be blocked for up to 60 seconds while the publish method fires, which,  depending on your architecture, might mean yet another level of threading.  On top of that, the code to fire an event is starting to get messy, and avoiding having to duplicate it for a service with a large number of events is a good idea.

The Callback Manager

To combat these problems, I’ve wrapped these ideas into a callback manager for duplex services:

public class CallbackManager<KEYTYPE, VALUETYPE>
{
    internal CallbackManager()
    {

    }
    public bool AddSubscription(KEYTYPE key, VALUETYPE value)
    {
        if (m_dict.ContainsKey(key) == false)
        {
            if (m_dict.TryAdd(key, new ConcurrentDictionary<VALUETYPE, bool>()) == false)
                return false;
        }
        return m_dict[key].TryAdd(value, true);
    }

    public bool RemoveSubscription(KEYTYPE key, VALUETYPE value)
    {
        if (m_dict.ContainsKey(key) == true)
        {
            bool b;
            return m_dict[key].TryRemove(value, out b);
        }
        return false;
    }
    //remove ALL subscribers for a key
    public bool RemoveSubscription(KEYTYPE key)
    {
        if (m_dict.ContainsKey(key) == true)
        {
            ConcurrentDictionary<VALUETYPE, bool> b;
            m_dict.TryRemove(key, out b);
            if (b != null)
                return true;
        }
        return false;
    }
    public ICollection<KEYTYPE> GetKeys()
    {
        return m_dict.Keys;
    }

    //if you fire an event, it will only ever be for one key...
    private ICollection<VALUETYPE> GetSubscribers(KEYTYPE key)
    {
        if (m_dict.ContainsKey(key))
            return m_dict[key].Keys;
        return new List<VALUETYPE>(); //empty
    }
    private ICollection<KeyValuePair<VALUETYPE, bool>> GetSubscribersEx(KEYTYPE key)
    {
        if (m_dict.ContainsKey(key))
        {
            return (ICollection<KeyValuePair<VALUETYPE,bool>>)m_dict[key];
        }
        return new List<KeyValuePair<VALUETYPE, bool>>(); //empty
    }

    public void FireEvent(KEYTYPE key,Action<VALUETYPE,bool> action)
    {
        try
        {
            using (BackgroundWorker bg = new BackgroundWorker())
            {
                bg.DoWork += (sender, ea) =>
                    {
                        Action<KeyValuePair<VALUETYPE, bool>> a = callback =>
                            {
                                try
                                {
                                    action(callback.Key, callback.Value);
                                }
                                catch (Exception ex)
                                {
                                    Logger.LogError(ErrorLevel.Information, "Exception sending to callback channel-removing subscription-" + ex.ToString());
                                    RemoveSubscription(key, callback.Key);
                                }
                            };
                        Parallel.ForEach<KeyValuePair<VALUETYPE, bool>>(GetSubscribersEx(key), a);
                    };
                bg.RunWorkerAsync();
            }
        }
        catch (Exception ex)
        {
            Logger.LogError(ErrorLevel.Information, "Exception invoking callback-" + ex.ToString());
        }
    }

    private ConcurrentDictionary<KEYTYPE, ConcurrentDictionary<VALUETYPE, bool>> m_dict = new ConcurrentDictionary<KEYTYPE, ConcurrentDictionary<VALUETYPE, bool>>();
}
}

This class contains methods to manage a collection of subscribers, as well as the main FireEvent method that wraps some of the earlier findings into one generic method.  It takes a delegate for the callback event that you want to fire, and then calls those delegates in parallel.  It also invokes the parallel loop on a background worker thread, which ensures that the service will not block when firing an event.  One side effect of this is that firing multiple events could result in multiple attempts on the same dead proxy, but the code handles these cases, and it’s better than the alternative.

It’s also interesting to note that the delegate that gets passed into the method must itself get wrapped in a delegate (i.e. I can’t just parallel.foreach the action that I get passed in), since it won’t be able to catch a service exception in that case.

One other thing to note-the ConcurrentDictionary class gets used here with a second (bool) template parameter simply because there isn’t an equivalent non-ordered type (like ConcurrentList) in the library.  I’m not sure why, but for the meantime I just added the second type, because the concurrent classes are incredibly useful.  You can modify the collection while it’s being iterated elsewhere, and you aren’t at the mercy of someone forgetting a lock.

So finally, this means that firing an event in the service code looks something like this:

    m_TimeUpdateChannels = new CallbackManager<string, IDuplexClient>();
    m_TimeUpdateChannels.FireEvent("key", (callback, b) =>
    {
        //do something else if I want
        callback.PublishTime(DateTime.Now);
    });

Which is much cleaner than having to write an individual method to fire each event.

So, with a scheme like this in place, writing a duplex service has gotten a whole lot easier to manage.  If you do need to keep track of subscribers, this at least encapsulates it a little, and it keeps most of the WCF plumbing in one place, meaning that the service can concentrate on business logic.

About these ads
This entry was posted in Silverlight, WCF. Bookmark the permalink.

5 Responses to WCF PollingDuplexHttp Services, Silverlight, and the task parallel library-lessons learned

  1. Pingback: WCF PollingDuplexHttp Services, Silverlight, and the task parallel library-lessons learned

  2. Pingback: WCF PollingDuplexHttp Services, Silverlight, and the task parallel library-lessons learned – www.nalli.net

  3. Werner says:

    There is still a problem with duplex services that I find a solution for on the web.

    If a client makes a call to the server that causes a duplex call to a dead client, that timeout exception causes the entire service to break. Subsequent calls from the client to the server fail because the client state is faulted. How can I get around this problem?

    • chrisbardon says:

      You’re saying that your client A is calling a method on service B that tries to notify client C with a duplex event. The call to C fails with a timeout, which faults the connection A has to B, right? If this is the case, you should just be able to catch/handle the timeoutException on the service if you don’t want it to fault your channel (although you may want this to generate a fault to indicate an error condition). In my example, I have my callback manager eating all exceptions generated by a callback, since I don’t want firing an event to fault a proxy.

      Of course, you should always be prepared that if you’re using a long-lived client proxy, there’s always the possibility that your proxy could be faulted at any point, so you should be prepared to recreate it if necessary.

  4. Werner says:

    You understand the problem 100% correctly! I made lots of mistakes though. Yesterday I was writing up this massive response full of complexities and things, but every time I write something down I check it first as to not waste time with stuff that I could have checked. Eventually I never got to make the post because it became clear to me that there were lots of things going on I did not understand. WCF is tricky because it is prone to concurrency issues, and you don’t want to waste someone’s time on a blog with a problem where strange concurrency issues are lurking about.

    In any case your post put me on the right path where I found that I had my service time-outs incorrectly configured. I was also unaware of the full implications of the new multiple message per poll. I also had concurrency issues in my logger causing some of the channel failures that I was complaining about in my post above. (Which means what I said there was wrong) And the biggest mistake I made, which you highlighted, is I never really coded the service to handle random DCs from either side. Once I started thinking about how the system would handle those, it forced me to design my approach better which eventually led me to (what I believe is) a working version.

    My usage scenario is slightly different that yours though. I need clients to subscribe to each other, where yours only caters for the server to subscribe to a client. Ugly hacks were needed in the callbackmanager to make this work, but in essence the key to making it work was your original findings and implementation above which I am very grateful for. Thanks for that.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s