Detecting anomalies in data series can be valuable in many contexts: from anticipatory waiting to monitoring resource consumption and IT security. The time factor also plays a role in detection: the earlier the anomaly is detected, the better. Ideally, anomalies should be detected in real time and immediately after they occur. This can be achieved with just a few tools. In the following, we take a closer look at Kafka, Docker and Python with scikit-learn. The complete implementation of the listed code fragments can be found under github.com.
The core of Apache Kafka is a simple idea: a log to which data can be attached at will. This continuous log is then called a stream. On the one hand, so-called producers can write data into the stream, on the other hand, consumers can read this data. This simplicity makes Kafka very powerful and usable for many purposes. For example, sensor data can be sent into the stream directly after the measurement. On the other hand, an anomaly detection software can read and process the data. But why should there even be a stream between the data source (sensors) and the anomaly detection instead of sending the data directly to the anomaly detection?
There are at least three good reasons for this. First, Kafka can run distributed in a cluster and offer high reliability (provided that it is actually running in a cluster of servers and that another server can take over in case of failure). If the anomaly detection fails, the stream can be processed after a restart and continue where it left off last. So no data will be lost.
Secondly, Kafka offers advantages when a single container for real-time anomaly detection is not working fast enough. In this case, load balancing would be needed to distribute the data. Kafka solves this problem with partitions. Each stream can be divided into several partitions (which are potentially available on several servers). A partition is then assigned to a consumer. This allows multiple consumers (also called consumer groups) to dock to the same stream and allows the load to be distributed between several consumers.
Thirdly, it should be mentioned that Kafka enables the functionalities to be decoupled. If, for example, the sensor data are additionally stored in a database, this can be realized by an additional consumer group that docks onto the stream and stores the data in the database. This is why companies like LinkedIn, originally developed Kafka, have declared Kafka to be the central nervous system through which all data is passed.
A Kafka cluster can be easily started using Docker. A Docker-Compose-File is suitable for this. It allows the user to start several Docker containers at the same time. A Docker network is also created (here called kafka_cluster_default), which all containers join automatically and which enables their communication. The following commands start the cluster from the command line (the commands can also be found in the README.md in the GitHub repository for copying):
git clone https://github.com/NKDataConv/anomalie-erkennung.git
cd anomaly detection/
docker-compose -f ./kafka_cluster/docker-compose.yml up -d –build
Here the Kafka Cluster consists of three components:
- The broker is the core component and contains and manages the stream.
- ZooKeeper is an independent tool and is used at Kafka to coordinate several brokers. Even if only one broker is started, ZooKeeper is still needed.
- The schema registry is an optional component, but has established itself as the best practice in connection with Kafka.
The Schema Registry solves the following problem: In practice, the responsibilities for the producer (i.e. origin of the data) are often detached from the responsibilities for the consumer (i.e. data processing). In order to make this cooperation smooth, the Schema Registry establishes a kind of contract between the two parties. This contract contains a schema for the data. If the data does not match the schema, the Schema Registry does not allow the data to be added to the corresponding stream. The Schema Registry therefore checks that the schema is adhered to each time the data is sent. This schema is also used to serialize the data. This is the avro format. Serialization with avro allows you to further develop the schema, for example, to add additional data fields. The downward compatibility of the schema is always ensured. A further advantage of the Schema Registry in cooperation with Avro is that the data in the Kafka stream ends up without the schema, because it is stored in the Schema Registry. This means that only the data itself is in the stream. In contrast, when sending data in JSON format, for example, the names of all fields would have to be sent with each message. This makes the Avro format very efficient for Kafka.
To test the Kafka cluster, we can send data from the command line into the stream and read it out again with a command line consumer. To do this we first create a stream. In Kafka, a single stream is also called a topic. The command for this consists of two parts. First, we have to take a detour via the Docker-Compose-File to address Kafka. The command important for Kafka starts with kafka-topics. In the following the topic test-stream is created:
docker-compose -f kafka_cluster/docker-compose.yml exec broker kafka-topics –create –bootstrap-server localhost:9092 –topic test-stream
Now we can send messages to the topic. To do this, messages can be sent with ENTER after the following command has been issued:
docker-compose -f kafka_cluster/docker-compose.yml exec broker kafka-console-producer –broker-list localhost:9092 –topic test-stream
At the same time, a consumer can be opened in a second terminal, which outputs the messages on the command line:
docker-compose -f kafka_cluster/docker-compose.yml exec broker kafka-console-consumer –bootstrap-server localhost:9092 –topic test-stream –from-beginning
Both processes can be aborted with CTRL + C. The Schema Registry has not yet been used here. It will only be used in the next step.
Kafka Producer with Python
There are several ways to implement a producer for Kafka. Kafka Connect allows you to connect many external tools such as databases or the file system and send their data to the stream. But there are also libraries for various programming languages. For Python, for example, there are at least three working, actively maintained libraries. To simulate data in real time, we use a time series that measures the resource usage of a server. This time series includes CPU usage and the number of bytes of network and disk read operations. This time series was recorded with Amazon CloudWatch and is available on Kaggle. An overview of the data is shown in Figure 1, with a few obvious outliers for each time series.
In the time series, the values are recorded at 5-minute intervals. To save time, they are sent in the stream every 1 second. In the following, an image for the Docker Container is created. The code for the producer can be viewed and adjusted in the subfolder producer. Then the container is started as part of the existing network kafka_cluster_default.
docker build ./producer -t kafka-producer
docker run –network=”kafka_cluster_default” -it kafka-producer
Now we want to turn to the other side of the stream.
For an anomaly detection it is interesting to implement two consumers. One consumer for real-time detection, the second for training the model. This consumer can be stopped after the training and restarted for a night training. This is useful if the data has changed. Then the algorithm is trained again and a new model is saved. However, it is also conceivable to trigger this container at regular intervals using a cron job. In the productive case, however, this should not be given a completely free hand. An intermediate step with a check of the model makes sense at this point – more about this later.
The trained model is then stored in the Docker volume. The volume is mounted in the subfolder data. The first consumer has access to this Docker Volume. This consumer can load the trained model and evaluate the data for anomalies.
Technologically, libraries also exist for the consumer in many languages. Since Python is often the language of choice for data scientists, it is the obvious choice. It is interesting to note that the schema no longer needs to be specified for implementation on the consumer side. It is automatically obtained via the schema registry, and the message is deserialized. An overview of the containers described so far is shown in Figure 2. Before we also run the consumer, let’s take a closer look at what is algorithmically done in the containers.
For the anomaly detection a colourful bouquet of algorithms exists. Furthermore, the so-called no-free-lunch theorem makes it difficult for us to choose. The main point of this theorem is not that we should take money with us into lunch break, but that there is nothing free when choosing the algorithm. In concrete terms, this means that we don’t know whether a particular algorithm works best for our data unless we test all algorithms (with all possible settings) – an impossible task. Nevertheless, we can sort the algorithms a bit.
Basically, in anomaly detection one can first distinguish between supervised and unsupervised learning. If the data indicates when an anomaly has occurred, this is referred to as supervised learning. Classification algorithms are suitable for this problem. Here, the classes “anomaly” and “no anomaly” are used. If, on the other hand, the data set does not indicate when an anomaly has occurred, one speaks of unsupervised learning. Since this is usually the case in practice, we will concentrate here on the approaches for unsupervised learning. Any prediction algorithm can be used for this problem. The next data point arriving is predicted. When the corresponding data point arrives, it is compared with the forecast. Normally the forecast should be close to the data point. However, if the forecast is far off, the data point is classified as an anomaly.
What “far off” exactly means can be determined with self-selected threshold values or with statistical tests. The Seasonal Hybrid ESD algorithm developed by Twitter, for example, does this. A similar approach is pursued with autoencoders. These are neural networks that reduce the dimensions of the data in the first step. In the second step, the original data is to be restored from the dimension reduction. Normally, this works well. If, on the other hand, a data point does not succeed, it is classified as an anomaly. Another common approach is the use of One-Class Support Vector Machines. These draw as narrow a boundary as possible around the training data. A few data points are the exception and are outside the limits. The number of exceptions must be given to the algorithm as a percentage and determines how high the percentage of detected anomalies will be. There must therefore be some premonition about the number of anomalies. Each new data point is then checked to see whether it lies within or outside the limits. Outside corresponds to an anomaly.
Another approach that works well is Isolation Forest. The idea behind this algorithm is that anomalies can be distinguished from the rest of the data with the help of a few dividing lines (Figure 3). Data points that can be isolated with the fewest separators are therefore the anomalies (box: “Isolation Forests”).
Isolation Forests are based on several decision trees. A single decision tree in turn consists of nodes and leaves. The decisions are made in the nodes. To do this, a random feature (that is, a feature of the data, in our example, CPU, network, or memory resource) and a random split value is selected for the node. If a data point in the respective feature has a value that is less than the split value, it is placed in the left branch, otherwise in the right branch. Each branch is followed by the next node, and the whole thing happens again: random feature and random split value. This continues until each data point ends up in a separate sheet. So this isolates each data point. The procedure is repeated with the same data for several trees. These trees form the forest and give the algorithm its name.
So at this point we have isolated all training data in each tree into leaves. Now we have to decide which leaves or data points are an anomaly. Here is what we can determine: Anomalies are easier to isolate. This is also shown in Figure 3, which shows two-dimensional data for illustration. Points that are slightly outside the data point can be isolated with a single split (shown as a black line on the left). This corresponds to a split in the decision tree. In contrast, data points that are close together on the screen can only be isolated with multiple splits (shown as multiple separators on the right).
This means that on average, anomalies require fewer splits to isolate. And this is exactly how anomalies are defined in Isolation Forests: Anomalies are the data points that are isolated with the fewest splits. Now all that remains to be done is to decide which part of the data should be classified as an anomaly. This is a parameter that the Isolation Forests need for training. Therefore, one needs a premonition of how many anomalies are found in the data set. Alternatively, an iterative approach can be used. Different values can be tried out and the result checked. Another parameter is the number of decision trees for the forest, since the number of splits is calculated using the average of several decision trees. The more decision trees, the more accurate the result. One hundred trees is a generous value here, which is sufficient for a good result. This may sound like a lot at first, but the calculations are actually within seconds. This efficiency is also an advantage of Isolation Forests.
Isolation Forests are easy to implement with Python and the library scikit-learn. The parameter n_estimators corresponds to the number of decision trees and contamination to the expected proportion of anomalies. The fit method is used to trigger the training process on the data. The model is then stored in the Docker Volume.
from sklearn.ensemble import IsolationForest
From the Docker Volume, the Anomaly Detection Consumer can then call the model and evaluate streaming data using the predict method:
iso_forest = joblib.load(‘/data/iso_forest.joblib’)
anomaly = iso_forest.predict(df_predict)
A nice feature of scikit-learn is the simplicity and consistency of the API. Isolation Forests can be exchanged for the above mentioned One-Class Support Vector Machines. Besides the import, only line 2 IsolationForest has to be replaced by OneClassSVM. The data preparation and methods remain the same. Let us now turn to data preparation.
Feature Engineering with time series
In Machine Learning, each column of the data set (CPU, network and disk) is also called a feature. Feature engineering is the preparation of the features for use. There are a few things to consider. Often the features are scaled to get all features in a certain value range. For example, MinMax scaling transforms all values into the range 0 to 1. The maximum of the original values then corresponds to 1, the minimum to 0, and all other values are scaled in the range 0 to 1. Scaling is, for example, a prerequisite for training neural networks. For Isolation Forests especially, scaling is not necessary.
Another important step is often the conversion of categorical features that are represented by a string. Let’s assume we have an additional column for the processor with the strings Processor_1 and Processor_2, which indicate which processor the measured value refers to. These strings can be converted to zeros and ones using one-hot encoding, where zero represents the category Processor_1 and one represents Processor_2. If there are more than two categories, it makes sense to create a separate feature (or column) for each category. This will result in a Processor_1 column, a Processor_2 column, and so on, with each column consisting of zeros and ones for the corresponding category.
Feature engineering also means to prepare the existing information in a useful way and to provide additional information. Especially for time series it is often important to extract time information from the time stamp. For example, many time series fluctuate during the course of a day. Then it might be important to prepare the hour as a feature. If the fluctuations come with the seasons, a useful feature is the month. Or you can differentiate between working days and vacation days. All this information can easily be extracted from the timestamp and often provides a significant added value to the algorithm. A useful feature in our case could be working hours. This feature is zero during the time 8-18 and otherwise one. With the Python library pandas, such features can be easily created:
df[‘hour’] = df.timestamp.dt.hour
df[‘business_hour’] = ((df.hour < 8) | (df.hour>18)).astype(“int”)
It is always important that sufficient data is available. It is therefore useless to introduce the feature month if there is no complete year of training data available. Then the first appearance of the month of May would be detected as an anomaly in live operation, if it was not already present in the training data.
At the same time one should note that the following does not apply: A lot helps out a lot. On the contrary! If a feature is created that has nothing to do with the anomalies, the algorithm will still use the feature for detection and find any irregularities. In this case, thinking about the data and the problem is essential.
Trade-off in anomaly detection
Training an algorithm to correctly detect all anomalies is difficult or even impossible. You should therefore expect the algorithm to fail. But these errors can be controlled to a certain degree. The so-called first type of error occurs when an anomaly is not recognized as such by the algorithm. So, an anomaly is missed.
The second type of error is the reverse: An anomaly is detected, but in reality it is not an anomaly – i.e. a false alarm. Both errors are of course bad.
However, depending on the scenario, one of the two errors may be more decisive. If each detected anomaly results in expensive machine maintenance, we should try to have as few false alarms as possible. On the other hand, anomaly detection could monitor important values in the health care sector. In doing so, we would rather accept a few false alarms and look after the patient more often than miss an emergency. These two errors can be weighed against each other in most anomaly detection algorithms. This is done by setting a threshold value or, in the case of isolation forests, by setting the contamination parameter. The higher this value is set, the more anomalies are expected by the algorithm and the more are detected, of course. This reduces the probability that an anomaly will be missed.
The argumentation works the other way round as well: If the parameter is set low, the few detected anomalies are most likely the ones that are actually those. But because of this, some anomalies can be missed. So here again it is important to think the problem through and find meaningful parameters by iterative trial and error.
Real-time anomaly detection
Now we have waited long enough and the producer should have written some data into the Kafka stream by now. Time to start training for anomaly detection. This happens in a second terminal with Docker using the following commands:
docker build ./Consumer_ML_Training -t kafka-consumer-training
docker run –network=”kafka_cluster_default” –volume $(pwd)/data:/data:rw -it kafka consumer training
The training occurs on 60 percent of the data. The remaining 40 percent is used for testing the algorithm. For time series, it is important that the training data lies ahead of the test data in terms of time. Otherwise it would be an unauthorized look into a crystal ball.
An evaluation on the test data is stored in Docker Volume in the form of a graph together with the trained model. An evaluation is visible in Figure 4. The vertical red lines correspond to the detected anomalies. For this evaluation we waited until all data were available in the stream.
If we are satisfied with the result, we can now use the following commands to start the anomaly detection:
docker build ./Consumer_Prediction -t kafka-consumer-prediction
docker run –network=”kafka_cluster_default” –volume $(pwd)/data:/data:ro -it kafka-consumer-prediction
As described at the beginning, we can also use several Docker containers for evaluation to distribute the load. For this, the Kafka Topic load must be distributed over two partitions. So let’s increase the number of partitions to two:
docker-compose -f kafka_cluster/docker-compose.yml exec broker kafka-topics –alter –zookeeper zookeeper:2181 –topic anomaly_tutorial –partitions 2
Now we simply start a second consumer for anomaly detection with the known command:
docker run –network=”kafka_cluster_default” –volume $(pwd)/data:/data:ro -it kafka-consumer-prediction
Kafka now automatically takes care of the allocation of partitions to Consumers. The conversion can take a few seconds. An anomaly is displayed in the Docker logs with “No anomaly” or “Anomaly at time *”. Here it remains open to what extent the anomaly is processed further in a meaningful way. It would be particularly elegant to write the anomaly into a new topic in Kafka. An ELK stack could dock to this with Kafka Connect and visualize the anomalies.