— java, data-science, statistics, cd — 4 min read
I've decided to force myself into the habit of blogging about my PhD topic, in the hope that people who find these posts have an easier time comprehending the subject matter than I did. This, and future posts, will attempt to focus on a particular method at a time. I will present the maths for reference, but generally speaking the explanations will use as little maths as possible. My hope is that this will be at a digestible level for software engineers who have a practical interest in these techniques.
What is the topic? Change Detection, specifically in streaming data. If that sounds like a pretty broad area to you, then you're absolutely right. We have a little bit of ground to cover first to explain what I mean by Change Detection and how it relates to other similar things we could do.
I will assume that you have some data that you collect incrementally. Let's say you record the time taken to process every request in your web application. You'd like to be alerted if there's a significant change in the average time taken to process a request - increases may indicate a DDoS, decreases may mean an error is causing requests to bail out prematurely.
What you have here is a data stream. We can write that like so:
$$ X = \left \{ \vec{x}_1, \vec{x}_2, ... \right \} $$
So, $ \vec{x}_t $ is a piece of data arriving at time $t$. The little arrow above the $x$ means that this is a vector, i.e. the piece of data could have one or more elements.
Anyway, we collect the request times for our web app. At midnight, a group of Russian hackers decide to pwn you with a botnet. Request completion times start to increase. Clearly this will be reflected in the data we collect. We will have what is called a Change Point at 12:00.
How do we define a change point? The data we are observing is changing in response to something. This purely conceptual "something" is what we call a Data Source (Class, Concept). We say that before the event is one data source $S_1$, and after the event is another, $S_2$. Therefore, a change point is where there is a switch from one data source to another. Our change point at time $t$ is denoted by:
$$\left \{ ..., \vec{x}_{t-2}, \vec{x}_{t-1} \right \} \in S_1$$
$$\left \{ \vec{x}_{t}, \vec{x}_{t+1}, ... \right \} \in S_2$$
($\in$ is used here to mean 'is drawn from')
Change Detection then, is to detect that $S_1$ has changed to $S_2$ as soon as possible after $t$. We don't know what's caused the change, and we don't know what that change will look like in the data. This isn't a classification problem, and we don't have any prior knowledge, heuristics or training data to help us.
If you decide to read around this subject, you'll quickly become confused about what the boundary is between this and
That's a story for another blog post. Just know that Change Detection is subtly different, and the methods we use are often cross-applicable. These each refer to a very similar problem with slightly tweaked context.
Ok, I think we've covered enough to get started now. We're going to implement a simple form of Cumulative Sum (CUSUM) to detect significant changes in the mean of our input data.
CUSUM is a technique for Sequential Analysis, which means it processes data one-at-a-time (sequentially) and it makes a decision about whether to continue. Remember our data stream:
$$ X = \left \{ \vec{x}_1, \vec{x}_2, ... \right \}$$
For each observation, we calculate a sum, and add it to the sum from the previous step. That is what makes it cumulative. The CUSUM procedure is:
$$ g_0 = 0 $$ $$ g_{t+1} = max(0,g_t + x_t - \omega_t)$$
Where $x_t$ is our observation, $g_t$ is the cumulative sum at time $t$ and $\omega_t$ is our target value at time $t$.
We're also going to define a threshold, $\lambda$ such that:
$$H_1 \text{ if } g_t > \lambda \text{ (else } H_0\text{)}$$
or in plain english,
$$\text{change if } g_t > \lambda \text{ (else continue)}$$
That's still a bit abstract - how do we get $\omega_t$, our target value? If we had all the data in advance, we could make this the mean. In the figure below, I've taken the mean and standard deviation of the first 25 data points as the basis for the CUSUM chart on the right.
You can see that what we end up with is an accumulation of the differences between the observed value and the target value. If this stacks up too high, we signal change. That's all there is to it. In this CUSUM chart, I used $5\sigma$ as the threshold.
In our request monitoring scenario, we're dealing with streaming data and we don't have anything in advance to calculate a mean. We're therefore going to approximate a mean as we go ($\mu$) and make that the target value. We'll also add an extra parameter, $\delta$, to specify the magnitude of change we find acceptable, ending up with:
$$ g_{t+1} = max(0,g_t + x_t - \mu - \delta)$$
Enough abstract stuff, let's write some code. For a start, we can define an interface for our change detection process.
1public interface ChangeDetector<T> {23 /**4 * Provide the next item in the stream5 * @param observation the next item in the stream6 */7 void update(T observation);89 /**10 * Did the detector signal change at the last item?11 * @return true, if it did.12 */13 boolean isChange();1415 /**16 * Has the detector seen enough items to detect change?17 * @return true, if it has.18 */19 boolean isReady();2021 /**22 * Reset the detector, wiping any memory component it retains.23 */24 void reset();2526}
And then putting our CUSUM detector together, we need to implement ChangeDetector<Double>
, because we will take a single numeric value at each time step.
1public class CUSUMChangeDetector implements ChangeDetector<Double> {23 private final static double DEFAULT_MAGNITUDE = 0.05;4 private final static double DEFAULT_THRESHOLD = 3;5 private final static long DEFAULT_READY_AFTER = 50;67 private double cusumPrev = 0;8 private double cusum;9 private double magnitude;10 private double threshold;11 private double magnitudeMultiplier;12 private double thresholdMultiplier;13 private long readyAfter;1415 private long observationCount = 0;16 private double runningMean = 0.0;17 private double runningVariance = 0.0;1819 private boolean change = false;2021 /**22 * Create a CUSUM detector23 * @param magnitudeMultiplier Magnitude of acceptable change in stddevs24 * @param thresholdMultiplier Threshold in stddevs25 * @param readyAfter Number of observations before allowing change to be signalled26 */27 public CUSUMChangeDetector(double magnitudeMultiplier, 28 double thresholdMultiplier, 29 long readyAfter) {30 this.magnitudeMultiplier = magnitudeMultiplier;31 this.thresholdMultiplier = thresholdMultiplier;32 this.readyAfter = readyAfter;33 }3435 public CUSUMChangeDetector() {36 this(DEFAULT_MAGNITUDE, DEFAULT_THRESHOLD, DEFAULT_READY_AFTER);37 }3839 @Override40 public void update(Double xi) {41 ++observationCount;4243 // Instead of providing the target mean as a parameter as 44 // we would in an offline test, we calculate it as we go to 45 // create a target of normality.46 double newMean = runningMean + (xi - runningMean) / observationCount;47 runningVariance += (xi - runningMean)*(xi - newMean);48 runningMean = newMean;49 double std = Math.sqrt(runningVariance);5051 magnitude = magnitudeMultiplier * std;52 threshold = thresholdMultiplier * std;5354 cusum = Math.max(0, cusumPrev +(xi - runningMean - magnitude));5556 if(isReady()) {57 this.change = cusum > threshold;58 }5960 cusumPrev = cusum;61 }6263 @Override64 public boolean isChange() {65 return change;66 }6768 @Override69 public boolean isReady() {70 return this.observationCount >= readyAfter;71 }7273 public void reset() {74 this.cusum = 0;75 this.cusumPrev = 0;76 this.runningMean = 0;77 this.observationCount = 0;78 }7980}
Now, using the power of spring boot, let's plug this into monitoring some request times. I'm not going to paste the code in here, but I've provided a full example in this git repository.
I have written the following test to assert that our detector can spot a change:
1@Test2public void changeDetectionTest() throws Exception {3 for(int i=0;i<200;i++) {4 mockMvc.perform(MockMvcRequestBuilders.get("/data"))5 .andExpect(status().isOk());6 Assert.assertFalse(detector.isChange());7 }89 int at = 0;10 boolean detected = false;1112 long delay = 0;13 for(int i=0;i<1000;i++) {1415 // Introduce abrupt change16 if(i % 100 == 0) {17 delay += 100;18 provider.setDelay(delay);19 log.info("Artificially slowed requests by {}ms", delay);20 }2122 mockMvc.perform(MockMvcRequestBuilders.get("/data"))23 .andExpect(status().isOk());2425 if(detector.isChange()) {26 at = i;27 detected = true;28 break;29 }30 }3132 Assert.assertTrue(detected);33 if(detected) {34 log.info("Detected change {} observations after change point", at);35 }36}
If you go and run the example test, you will see something like this:
12017-10-03 10:29:40.923 INFO 19836 --- [ main] me.faithfull.cd4soft.DataControllerTest : Artificially slowed requests by 100ms22017-10-03 10:29:42.146 INFO 19836 --- [ main] m.f.cd4soft.ChangeDetectionService : CHANGE DETECTED! Time to send a midnight page to the sysadmin. Anomalous value: 10232017-10-03 10:29:42.146 INFO 19836 --- [ main] me.faithfull.cd4soft.DataControllerTest : Detected change 11 observations after change point4Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.565 sec - in me.faithfull.cd4soft.DataControllerTest
And that's a basic and practical CUSUM.
I'm well aware that CUSUM may not be the best suited change detection approach for this particular problem, it is however a very simple approach applied to a very common practical problem. That is the tack I wish to take in this series because I feel it aids understanding.
If you are modelling change in request times, you may wish to model them as a Poisson process, and investigate it's evolution.
Thanks to Jon Snyder and Dan Silverglate, who pointed out a bug in the calculation of the variance within the CUSUMChangeDetector
class.