Operation throttle: part 2

Spread the love

Last time, I introduced the topic of fixed operation rates and how one might implement this functionality with ad-hoc code. Today, I will generalize the concept with a simple Throttle library.

As I mentioned before, we have to keep track of two time quantities if we want a properly functioning throttle — the time spent for the operation itself and the wait time. Let’s explore a few concrete scenarios to get a feel for how this would look.

Low latency, high rate

Imagine we want to perform 2000 operations per second, and each operation completes in 1/10 of a millisecond. If the operation took zero time to complete, the wait time would be 1/2000 seconds = 0.5 ms. Accounting for the true latency (0.1 ms), we can subtract the two and come up with an actual delay of 0.4 ms per operation. However, as we saw before, waiting for too small a time results in too much inaccuracy. So we will want to batch these delay times after sufficient accumulation. If our minimum wait time is 10 ms, we would have to perform 10/.4 = 25 operations before we would need to wait that long. If we were to graph this on a time chart it would look like so:

@ = op, z = sleep

|    @@@@@@@@@@@@@@@@@@@@@@@@@zzzzzzzz / zzzzzzz@@@@@@@  
|             :         :         :    /  :         :    
| T |----:----:----:----:----:----:--- / -:----:----:--  . . . ->
|(ms)   0.5  1.0  1.5  2.0  2.5  3.0    12.0 12.5 13.0

The pattern would repeat like this, giving us an effective rate of 25 ops per 12.5 ms = 2000 ops/sec, as intended.

High latency, low rate

Let’s slow things down immensely and imagine a scenario of a two second long operation to be performed 10 times per minute. Using similar calculations as above, we can plan to delay for four seconds after every operation. With such a long delay, the amount of error introduced should be very low, so no batching is necessary.

Low latency, low rate

Imagine the previous scenario but with an operation that only takes 1/10 of a second. We would simply delay for 5.9 seconds instead of 4.0.

High latency, high rate

This is more of a degenerate case. We have an operation that we would like to perform, say, 100 times per second, but the operation itself takes 100 ms. The best rate we could ever achieve is 1 op/100 ms = 10 ops/sec. So in this case, we would keep missing our target rate and therefore never actually delay.

The above scenarios are rather idealized in that the waiting time is perfectly accurate and all operations have uniform latency. In reality this is unlikely. Let’s look at the first scenario again through a more realistic lens.

Low latency, high rate with jitter

Let’s imagine the 2000 ops/sec scenario again but with alternating operation latency of 0.1 and 0.2 ms and an observed delay that alternates between -0.2 ms and +0.1 ms on every attempt to sleep. Given the alternating latency, we would have a corresponding alternating delay of 0.4 and 0.3 ms, respectively — or 0.7 ms every two operations. After 14 operations, we’d accumulate 4.9 ms of delay, and 9.8 ms of delay after 28 operations. After operation 29, we would have accumulated 10.2 ms of delay, so we would sleep for that long. However, we would only sleep 10.0 ms in reality (-0.2 ms), putting us 0.2 ms early for the next operation. This means 0.2 ms of extra delay to carry forward into the next delay calculations. Since we ended at operation 29 before, our latency pattern starting at operation 30 is 0.2 and 0.1 ms with delay of 0.3 and 0.4 ms. As before, after 28 operations, we would have accumulated 9.8 ms of delay. With the extra 0.2 ms carried forward, we would now have exactly enough to sleep for 10 ms. This time our sleep ends 0.1 ms late. Thus, we would carry forward a negative delay and continue our calculations in a similar way as before.

I admit, the above prose makes the situation seem very complicated. But the implementation is actually not terribly difficult. Writing the correct tests with proper expected results is really the hardest part, and we have basically set them up in English above. So let’s get coding!

First, we need a Rate struct to express the operation rate. Starting with the tests:

    [TestClass]
    public class RateTest
    {
        [TestMethod]
        public void PerSecondTest()
        {
            Rate.PerSecond(0.5).Time.Should().Be(TimeSpan.FromSeconds(2.0d));
            Rate.PerSecond(1).Time.Should().Be(TimeSpan.FromSeconds(1.0d));
            Rate.PerSecond(2).Time.Should().Be(TimeSpan.FromSeconds(0.5d));
            Rate.PerSecond(100).Time.Should().Be(TimeSpan.FromSeconds(0.01d));
            Rate.PerSecond(-1).Time.Should().Be(TimeSpan.MaxValue);
            Rate.PerSecond(0).Time.Should().Be(TimeSpan.MaxValue);
            Rate.PerSecond(1000000000).Time.Should().Be(TimeSpan.Zero);
        }

        [TestMethod]
        public void PerMinuteTest()
        {
            Rate.PerMinute(0.5).Time.Should().Be(TimeSpan.FromSeconds(120.0d));
            Rate.PerMinute(1).Time.Should().Be(TimeSpan.FromSeconds(60.0d));
            Rate.PerMinute(2).Time.Should().Be(TimeSpan.FromSeconds(30.0d));
            Rate.PerMinute(120).Time.Should().Be(TimeSpan.FromSeconds(0.5d));
            Rate.PerMinute(-1).Time.Should().Be(TimeSpan.MaxValue);
            Rate.PerMinute(0).Time.Should().Be(TimeSpan.MaxValue);
            Rate.PerMinute(1000000000).Time.Should().Be(TimeSpan.Zero);
        }
    }

This implementation passes the tests, including the given boundary conditions (negative, zero, and too large):

    public struct Rate
    {
        private readonly TimeSpan time;

        private Rate(TimeSpan time)
        {
            this.time = time;
        }

        public TimeSpan Time => this.time;

        public static Rate PerSecond(double ops)
        {
            long ticks = (long)(10000000 / ops);
            if (ticks < 0)
            {
                ticks = long.MaxValue;
            }

            return new Rate(TimeSpan.FromTicks(ticks));
        }

        public static Rate PerMinute(double ops)
        {
            return PerSecond(ops / 60);
        }
    }

Since the Throttle needs a time reference and a method to initiate a delay, we need a clock abstraction. Let’s start with that:

    public interface IClock
    {
        // Gets the elapsed time so far.
        TimeSpan Elapsed { get; }

        // Delays execution for the given interval.
        void Delay(TimeSpan interval);
    }

The real version of the clock looks like this:

    public sealed class RealClock : IClock
    {
        private readonly Stopwatch stopwatch;

        public RealClock()
        {
            this.stopwatch = Stopwatch.StartNew();
        }

        public TimeSpan Elapsed => this.stopwatch.Elapsed;

        public void Delay(TimeSpan interval) => Thread.Sleep(interval);
    }

We also need a fake version for testing purposes:

    internal sealed class FakeClock : IClock
    {
        private readonly Func<TimeSpan, TimeSpan> onDelay;

        public FakeClock(Func<TimeSpan, TimeSpan> onDelay)
        {
            this.onDelay = onDelay;
        }

        public TimeSpan Elapsed { get; set; }

        public void Delay(TimeSpan interval)
        {
            this.Elapsed += this.onDelay(interval);
        }
    }

The fake version allows us to set the elapsed time explicitly. It also provides a callback on every delay to determine how much time should be added to the clock (useful once we get to the “jitter” scenario).

Now to implement the first test scenario above:

    [TestClass]
    public class ThrottleTest
    {
        [TestMethod]
        public void LowLatencyHighRate()
        {
            StringBuilder log = new StringBuilder();
            TimeSpan latency = MS(0.1d);
            Func<TimeSpan, TimeSpan> callback = (TimeSpan t) =>
            {
                log.AppendFormat("[z*{0}]", t.TotalMilliseconds);
                return t;
            };
            FakeClock clock = new FakeClock(callback);
            Throttle throttle = new Throttle(Rate.PerSecond(2000.0d), clock);

            for (int i = 0; i < 60; ++i)
            {
                clock.Elapsed += latency;
                log.Append("@");
                throttle.Wait();
            }

            log.ToString().Should().Be(
                "@@@@@@@@@@@@@@@@@@@@@@@@@[z*10]" +
                "@@@@@@@@@@@@@@@@@@@@@@@@@[z*10]" +
                "@@@@@@@@@@");
            clock.Elapsed.Should().Be(MS(26.0d));
        }

        private static TimeSpan MS(double ms) => TimeSpan.FromTicks((long)(ms * 10000));
    }

Note the use of the MS helper method for dealing with fractional milliseconds. This is necessary because of the tricky behavior of TimeSpan.FromMilliseconds:

The value parameter is converted to ticks, and that number of ticks is used to initialize the new TimeSpan. Therefore, value will only be considered accurate to the nearest millisecond.

This implementation is sufficient to pass this test (and indeed, any test, as we will see later):

    public sealed class Throttle
    {
        private static readonly TimeSpan MinimumDelay = TimeSpan.FromMilliseconds(10.0d);

        private readonly TimeSpan delay;
        private readonly IClock clock;

        private TimeSpan mark1;
        private TimeSpan nextDelay;

        public Throttle(Rate rate, IClock clock)
        {
            this.delay = rate.Time;
            this.clock = clock;
        }

        public void Wait()
        {
            TimeSpan mark2 = this.clock.Elapsed;
            this.nextDelay += this.delay - (mark2 - this.mark1);
            if (this.nextDelay >= MinimumDelay)
            {
                this.clock.Delay(this.nextDelay);
                this.mark1 = this.clock.Elapsed;
                this.nextDelay -= this.mark1 - mark2;
            }
            else
            {
                this.mark1 = this.clock.Elapsed;
            }
        }
    }

Next test:

        [TestMethod]
        public void HighLatencyLowRate()
        {
            StringBuilder log = new StringBuilder();
            TimeSpan latency = MS(2000.0d);
            Func<TimeSpan, TimeSpan> callback = (TimeSpan t) =>
            {
                log.AppendFormat("[z*{0}]", t.TotalMilliseconds);
                return t;
            };
            FakeClock clock = new FakeClock(callback);
            Throttle throttle = new Throttle(Rate.PerMinute(10.0d), clock);

            for (int i = 0; i < 10; ++i)
            {
                clock.Elapsed += latency;
                log.Append("@");
                throttle.Wait();
            }

            log.ToString().Should().Be(
                "@[z*4000]@[z*4000]@[z*4000]@[z*4000]@[z*4000]" +
                "@[z*4000]@[z*4000]@[z*4000]@[z*4000]@[z*4000]");
            clock.Elapsed.Should().Be(MS(60000.0d));
        }

This one passes right out of the gate. How about this next test?

        [TestMethod]
        public void LowLatencyLowRate()
        {
            StringBuilder log = new StringBuilder();
            TimeSpan latency = MS(100.0d);
            Func<TimeSpan, TimeSpan> callback = (TimeSpan t) =>
            {
                log.AppendFormat("[z*{0}]", t.TotalMilliseconds);
                return t;
            };
            FakeClock clock = new FakeClock(callback);
            Throttle throttle = new Throttle(Rate.PerMinute(10.0d), clock);

            for (int i = 0; i < 10; ++i)
            {
                clock.Elapsed += latency;
                log.Append("@");
                throttle.Wait();
            }

            log.ToString().Should().Be(
                "@[z*5900]@[z*5900]@[z*5900]@[z*5900]@[z*5900]" +
                "@[z*5900]@[z*5900]@[z*5900]@[z*5900]@[z*5900]");
            clock.Elapsed.Should().Be(MS(60000.0d));
        }

It passes as well. Now for the degenerate case:

        [TestMethod]
        public void HighLatencyHighRate()
        {
            StringBuilder log = new StringBuilder();
            TimeSpan latency = MS(100.0d);
            Func<TimeSpan, TimeSpan> callback = (TimeSpan t) =>
            {
                log.AppendFormat("[z*{0}]", t.TotalMilliseconds);
                return t;
            };
            FakeClock clock = new FakeClock(callback);
            Throttle throttle = new Throttle(Rate.PerSecond(100.0d), clock);

            for (int i = 0; i < 75; ++i)
            {
                clock.Elapsed += latency;
                log.Append("@");
                throttle.Wait();
            }

            log.ToString().Should().Be(
                "@@@@@@@@@@@@@@@@@@@@@@@@@" +
                "@@@@@@@@@@@@@@@@@@@@@@@@@" +
                "@@@@@@@@@@@@@@@@@@@@@@@@@");
            clock.Elapsed.Should().Be(MS(7500.0d));
        }

Still passes! Now for the more complicated jitter scenario:

        [TestMethod]
        public void LowLatencyHighRateWithJitter()
        {
            StringBuilder log = new StringBuilder();
            int delayCount = 0;
            Func<TimeSpan, TimeSpan> callback = (TimeSpan t) =>
            {
                // Clock jitter
                t += (delayCount++ % 2 == 0) ? MS(-0.2d) : MS(0.1d);
                log.AppendFormat("[z*{0}]", t.TotalMilliseconds);
                return t;
            };
            FakeClock clock = new FakeClock(callback);
            Throttle throttle = new Throttle(Rate.PerSecond(2000.0d), clock);

            for (int i = 0; i < 100; ++i)
            {
                // Operation jitter
                TimeSpan latency = (i % 2 == 0) ? MS(0.1d) : MS(0.2d);
                clock.Elapsed += latency;
                log.Append("@");
                throttle.Wait();
            }

            log.ToString().Should().Be(
                "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@[z*10]" +
                "@@@@@@@@@@@@@@@@@@@@@@@@@@@@[z*10.1]" +
                "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@[z*9.8]" +
                "@@@@@@@@@@@@@@");
            clock.Elapsed.Should().Be(
                MS(15.0d) + // total operation time = (0.1 + 0.2) * 50
                MS(10.0d + 10.1d + 9.8d)); // total wait time
        }

This one also works. At this point, we should be confident in our basic implementation.

For good measure, let’s go back to our original benchmark and replace the Thread.Sleep with Throttle.Wait. Since Throttle is stateful, we have to modify our benchmark slightly to avoid side-effects:

public class ThrottlePerf
{
    private UdpClient client;
 
    [Params(2000)]
    public int R; // rate per second

    [Params(1000)]
    public int N;
 
    [GlobalSetup]
    public void Setup()
    {
        this.client = new UdpClient("localhost", 17);
    }
 
    [Benchmark]
    public void Run()
    {
        Throttle throttle = new Throttle(Rate.PerSecond(this.R), new RealClock());
        for (int i = 0; i < this.N; ++i)
        {
            byte[] request = Encoding.ASCII.GetBytes("QOTD");
            this.client.Send(request, request.Length);
            throttle.Wait();
        }
    }
 
    [GlobalCleanup]
    public void Cleanup()
    {
        this.client.Close();
    }
}

The results are pretty close, only about 1% faster than the expected rate:

Method |    R |    N |     Mean |     Error |    StdDev |
------ |----- |----- |---------:|----------:|----------:|
   Run | 2000 | 1000 | 494.6 ms | 0.2404 ms | 0.2131 ms |

Well, that’s more than enough for now. I will leave the async version as an exercise to the reader.

One thought on “Operation throttle: part 2

  1. Pingback: Performance experiments: queues and threads – WriteAsync .NET

Leave a Reply

Your email address will not be published. Required fields are marked *