Introduction to Time Series Database - InfuxDB
Aneshka Goyal
AWS Certified Solutions Architect | Software Development Engineer III at Egencia, An American Express Global Business Travel Company
What is Time Series Data?
As the title of the blog depicts we would be discussing about time series databases and in particular about Influx db, but before we start about databases its very much important to discuss about the data itself.
Time series data, is captured over a period of time and ordered chronologically. The primary characteristic of a time series is that it’s indexed or listed in time order, which is a critical distinction from other types of data sets. This means that if we were to plot the points of time series data on a graph, then one of our axes would always be time. Apart from having time as one of the critical index, time series data generally is added but never modified as the next record comes up with a different time value and is recorded as a new entry, thus is generally Immutable.
Such data has numerous applications across various industries like Electrical activity in the brain, Capturing Application Logs/ metrics (CPU, memory utilization), Stock prices, Heartbeats per minute etc.
Time series data can be classified into two types:
For some of the applications like stock market time series data analysis can be used for trend forecasting.
Thus time series data can be found in and around and in the growing world of cloud computing, IOT devices, ML and AI this data and its presence is felt even more. Thus it becomes crucial to store it efficiently and make query and analysis performant.
What is Time Series Database?
A time series database (TSDB) is a database optimised for time-stamped or time series data. Time series data are simply measurements or events that are tracked, monitored, downsampled, and aggregated over time. In past financial data was the only kind of time series data but now a days with Internet Of Things systems(sensors) into picture and scalability, monitoring become critical and prime aspect of deploying applications on cloud, there are a lot of systems that are emitting events or metrics(apart from financial applications) and these need a proficient solution or database to efficiently store and analyse these. Thus we have need for time series databases to handle time series data.
In this blog post we are going to introduce and apply one of the most popular time series database i.e Influx DB.
Understanding Influx DB
InfluxDB is an open-source time series database (TSDB) developed by the company InfluxData. It is used for storage and retrieval of time series data in fields such as operations, monitoring, application metrics, Internet of Things sensor data, and real-time analytics.Before diving into how it stores time series data and makes analysis and querying efficient lets familiarise ourselves with some terminologies of influx DB.
Bucket - A bucket is a named location where time series data is stored in InfluxDB 2.0. In InfluxDB each combination of a database and a retention policy (database/retention-policy) represents a bucket.
Database - A logical container for users, retention policies, continuous queries, and time series data.
Measurement - A Measurement acts as a container for tags, fields, and timestamps. Use a measurement name that describes our data. It is synonymous to a table in SQL database
Tags - The key-value pair(comprising of tag key and tag value) in the InfluxDB data structure that records metadata. Tags are an optional part of the data structure, but they are useful for storing commonly queried metadata; tags are indexed so queries on tags are performant. Query tip: Compare tags to fields; fields are not indexed.
Fields - The key-value pair(composed of field key and field value) in an InfluxDB data structure that records metadata and the actual data value. Fields are required in InfluxDB data structures and they are not indexed - queries on field values scan all points that match the specified time range and, as a result, are not performant relative to tags.
Continuous Queries - Continuous queries (CQ) are InfluxQL queries that run automatically and periodically on realtime data and store query results in a specified measurement. These are like stored procedures in SQL and can be used for some processing and aggregation use cases.
Point - In InfluxDB, a point represents a single data record, similar to a row in a SQL database table. Each point:
Series - A logical grouping of data defined by shared measurement, tag set(the collection of tag keys and tag values on a point.), and field key.
Retention Policy (RP)- Describes how long InfluxDB keeps data (duration), how many copies of the data to store in the cluster (replication factor), and the time range covered by shard groups (shard group duration). RPs are unique per database and along with the measurement and tag set define a series. Default retention policy is autogen (where data lives forever).
Thus influx db groups database tied to a retention policy in one bucket. The database can have multiple measurement and each measurement stores related data. A point in influx is a measurement along with tag set, field set and timestamp. Whenever we refer something as set we mean both the key and value. Tags are indexed while fields are not though both are key value pairs. Fields are supposed to have actual data (like CPU/temperature metrics) while tags can be machine(host) name (with tag value as m1, m2) etc. To logically group data of a measurement, tag set and filed key we have a term called series. Lets take example of a measurement called Temperature, it basically stores the machine temperatures emitted by machines or hosts (m1, m2 and m3), one of the field key is temperature_val and values are numbers(float type) representing temperature of that machine usage. Here our tag would be host name and field key is temperature_val so we will have exactly 3 series (Temperature|host|m1|temperature_val, Temperature|host|m2|temperature_val and Temperature|host|m3|temperature_val). Thus we say that data cardinality is 3.
Influx DB has line protocol a text based format to write points. Line protocol syntax is as below.
+-----------+--------+-+---------+-+---------+
|measurement|,tag_set| |field_set| |timestamp|
+-----------+--------+-+---------+-+---------+
Time series data can become irrelevant for processing as it gets older hence we get to specify the retention policy over data and how many times in needs to be replicated. By default data is not deleted and the default policy is autogen.
Point to note: InfluxDB is not a full CRUD database but more like a CR-ud, prioritising the performance of creating and reading data over update and destroy. This is because time series events or metrics are rarely updated but always added as a new record and we have retention policies to deleted data that looses relevance basis on its age. Also Drop or delete is on series(see the definition above) rather than individual points (because of performance needs).
How is time series data stored in InfluxDB - Storage Engine?
To be able to deal with time series data its very important to understand how influx DB stores it internally so that we understand how it is able to efficiently handle the data and use cases around it.
The InfluxDB storage engine ensures that:
The InfluxDB storage engine looks very similar to a LSM(Log structured Merge) Tree. It has a write ahead log and a collection of read-only data files which are similar in concept to SS(Sorted Strings)Tables in an LSM Tree. TSM files contain sorted, compressed series data.
Before we dive into how data storage engine works lets familiarise ourselves with a few more key terms around storage engine.
Shard - A shard contains encoded and compressed time series data for a given time range defined by the shard group duration. All points in a series within the specified shard group duration are stored in the same shard. A single shard contains multiple series, one or more TSM files on disk, and belongs to one and only one shard group.
Shard Group - A shard group belongs to an InfluxDB bucket and contains time series data for a specific time range defined by the shard group duration.
Shard group duration - specifies the time range for each shard group and determines how often to create a new shard group. By default, InfluxDB sets the shard group duration according to the retention period of the bucket:
Point to note: It is possible to configure a custom shard group duration as well.
Shard groups older than the buckets retention policy are deleted periodically and automatically. Influx DB always writes to un compacted shards (hot shards). It runs compaction process for shards periodically ,when a shard is no longer actively written to, InfluxDB compacts shard data, resulting in a “cold” shard. Compaction is run to compact the TSM files belonging to a shard for optimised reads.
For shards to have durable writes we write these to a WAL (write ahead log) it is an append only file, this write ahead log just appends recently written data and it is not optimised for reads or query perspective. The WAL data or points are cached in memory while they are getting written to a TSM files to make it read optimized. Queries to the storage engine merge data from the cache with data from the TSM files and return results.
To efficiently compact and store data, the storage engine groups field values by series key, and then orders those field values by time. (A series key is defined by measurement, tag key and value, and field key.) The storage engine uses a Time-Structured Merge Tree (TSM) data format. TSM files store compressed series data in a columnar format. To improve efficiency, the storage engine only stores differences (or deltas) between values in a series. Column-oriented storage lets the engine read by series key and omit extraneous data. End goal is to organize values for a series together into long runs to best optimize compression and scanning queries.
TSM files are also compacted and merged with existing TSM files to bring down the number of such files and easily compact the data for read performances.
On Top of TSM files for read performance we have Time Series Index (TSI)
As data cardinality (the number of series) grows, queries read more series keys and become slower. The Time Series Index(TSI) ensures queries remain fast as data cardinality grows. The TSI stores series keys grouped by measurement, tag, and field. This allows the database to answer two questions well:
Thus now we know how under the hood influx DB stores organises, compacts and indexes the data points and series to give us correct and quick responses and this also answers a question as to what difference time series databases hold that a simple NoSQL or SQL database cannot be simply leveraged. Time series database are explicitly optimised for time series data (a different nature of data). We also know about the terminologies used in influx db and its storage engine.
Hands On!
Let's now try to see things in action while we create a simple spring boot java application that is able to write some time series data to influxdb and also query the mean aggregation.
InfluxDB setup
In order for our application to talk to influx db, a prerequisite is to have a database running that our application can connect to. We will here leverage docker to run a container containing image for influx db. We would also be mapping our local volume to container volume so that whatever data we write to our database that remains persisted on subsequent container starts and stops. Similarly for configurations of our container as well.
Below is how our docker-compose file should look like.
version: '3.6'
services:
influxdb:
image: influxdb:2
container_name: influxdb
restart: unless-stopped
ports:
- '8086:8086'
volumes:
- ./influxdb_data/data:/var/lib/influxdb2
- ./influxdb_data/config:/etc/influxdb2
We would be able to access our influx db on localhost post 8086.
The first screen we land onto allows us to configure our username, password, organization and bucket name. Once we are done, it gives us the operator or admin API token, this is like a super token but we would be generating another API access token restricted to our bucket for this demo. These tokens can be leveraged by our application to connect to our influx DB instance and thus if a specific token is used with restrictive access to a bucket we would only be able to access contents of that bucket and not the others in our organisation.
This setup is a one time step and once done, next time we would be able to log into influx db console using the username and password set in step one.
Java Application Setup
Now we have influx db up and running, we would now go ahead and setup a spring-boot application using Spring Initializr.
Our Application will simply expose APIs to write time series data i.e data about a particular hosts temperature to a measurement called temperature. We will tag our data by host i.e tag key is host and value will be host name, field key will be temperature and value will be the actual temperature value in floating point.
The pom file for our application looks something like this.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>influxdbdemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>influxdbdemo</name>
<description>Demo project for Spring Boot with influxdb</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.influxdb/influxdb-client-java -->
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>7.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
In here the artifact of importance that would allow us to build a connection and write and read from the DB is influxdb-client-java.
Since we don't have a boiler plate code for spring-boot connection configuration we would be writing configuration class and code to create an InfluxDB client that we would use to write and read data. The code for the same looks like below.
@Configuration
public class InfluxConfiguration {
@Value("${spring.influxdb.url}")
private String url;
@Value("${spring.influxdb.token}")
private String token;
@Bean
public InfluxDBClient influxDBClient(){
return InfluxDBClientFactory.create(url, token.toCharArray());
}
}
Here the token will be obtained for our bucket InfluxBDDemo providing read and write access to the bucket via the application, the url is the localhost:8086 where influxdb is reachable.
All these values are part of our application.properties file.
spring.application.name=influxdbdemo
spring.influxdb.url=https://localhost:8086
spring.influxdb.token=<add token here>
Whenever the application is stopped we would want to close connection to our influx db, this is done by writing an event listener that listens to ContextClosed Event.
领英推荐
@Component
public class ContextClosedEventListener {
private InfluxDBClient influxDBClient;
@Autowired
public ContextClosedEventListener(InfluxDBClient influxDBClient) {
this.influxDBClient = influxDBClient;
}
@EventListener(ContextClosedEvent.class)
public void onContextClosedEvent(ContextClosedEvent contextClosedEvent) throws InterruptedException {
System.out.println("closing influx db connection");
influxDBClient.close();
}
}
We have so far a bucket created, generated an API token, written boiler plate code to establish connection with influxdb and closing that connection when no longer needed.
Next let's write a controller for our application, There are three ways of writing data to our database -
we will first be writing a simple point (as discussed definition above) to our database. We will invoke the API with /point - Here we write a point with host tag value as m1 and temperature_val field value as 37.2
Next we will write data for host tag value as m2 and temperature_val field value as 42.3 but here we will leverage the line protocol to insert that value instead of building a point. The API we hit is /line-protocol
Next we will write data for host tag value as m3 and temperature_val field value as 66.80 but here we will leverage the Java POJO to insert that value instead of building a point or leveraging the line protocol way. The API we hit is /class-object
Point to note: For other APIs we created a single value but we are calling this API 4 time to insert 4 points (data records).
@RestController
@RequestMapping("/v1/demo")
public class InfluxDemoController {
private InfluxDemoService influxDemoService;
@Autowired
public InfluxDemoController(InfluxDemoService influxDemoService) {
this.influxDemoService = influxDemoService;
}
@PostMapping("/point")
public void createPoint(@RequestParam("field") float field, @RequestParam("tag") String tag) {
influxDemoService.createPoint(field, tag);
}
@PostMapping("/line-protocol")
public void createLineProtocol(@RequestParam("field") float field, @RequestParam("tag") String tag) {
influxDemoService.createLineProtocol(field, tag);
}
@PostMapping("/class-object")
public void createClassObject(@RequestParam("field") float field, @RequestParam("tag") String tag) {
influxDemoService.createClassObj(field, tag);
}
@GetMapping("/temperature")
public void printTemperature() {
influxDemoService.getTemperature();
}
}
The service code that actually calls the client for InfluxDB is as below.
@Service
public class InfluxDemoService {
private static final String BUCKET = "influxdbDemo";
private static final String ORG = "dummy";
private InfluxDBClient influxDBClient;
@Autowired
public InfluxDemoService(InfluxDBClient influxDBClient) {
this.influxDBClient = influxDBClient;
}
public void createPoint (float field, String tag) {
try {
Point point = Point.measurement("temperature")
.addTag("host", tag)
.addField("temperature_val", field)
.time(Instant.now(), WritePrecision.NS);
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
writeApi.writePoint(BUCKET, ORG, point);
} catch (Exception e) {
System.out.println("Error occurred -----> " + e.getMessage());
}
}
public void createLineProtocol (float field, String tag) {
try {
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
String lineProtocol = "temperature,host="+tag+ " temperature_val=" + field;
writeApi.writeRecord(BUCKET, ORG, WritePrecision.NS, lineProtocol);
} catch (Exception e) {
System.out.println("Error occurred -----> " + e.getMessage());
}
}
public void createClassObj (float field, String tag) {
try {
Temperature temperature = new Temperature(tag, field, Instant.now());
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
writeApi.writeMeasurement(BUCKET, ORG, WritePrecision.NS, temperature);
} catch (Exception e) {
System.out.println("Error occurred -----> " + e.getMessage());
}
}
public void getTemperature() {
try {
String flux = "from(bucket: \"influxdbDemo\")\n" +
" |> range(start: -24h)\n" +
" |> filter(fn: (r) => r._measurement == \"temperature\")\n" +
" |> filter(fn: (r) => r._field == \"temperature_val\")\n" +
" |> window(every: 24h)\n"+
" |> mean()";
QueryApi queryApi = influxDBClient.getQueryApi();
List<FluxTable> tables = queryApi.query(flux, ORG);
for (FluxTable fluxTable : tables) {
List<FluxRecord> records = fluxTable.getRecords();
for (FluxRecord fluxRecord : records) {
System.out.println(fluxRecord.getValueByKey("host") + ": " + fluxRecord.getValueByKey("_value"));
}
}
} catch (Exception e) {
System.out.println("Error occurred -----> " + e.getMessage());
}
}
}
Here we have our org(org name is same that we configured while initially setting up influx db) and bucket name in constants.
We create a point for temperature measurement, having tag and filed value as specified in request and simply write that point using the InfluxDB Client we created.
Similarly we also create entries using line protocol and the Java POJO in the next two methods respectively.
Our InfluxDB table looks like this. Its the simple table view for our data
The POJO for temperature measurement looks like this.
@Measurement(name = "temperature")
public class Temperature {
public Temperature(String host, float temperatureVal, Instant time) {
this.host = host;
this.temperatureVal = temperatureVal;
this.time = time;
}
@Column(tag = true)
private String host;
@Column(name = "temperature_val")
private float temperatureVal;
@Column(timestamp = true)
Instant time;
}
Thus by all these three ways we are able to write points to our influx db which then are available for further querying, aggregation and processing.
Next we will leverage Flux to invoke our last endpoint that lets us query this data.
The last endpoint we have is /temperature and its a get endpoint, the service code is nothing but Flux query that is executed by our InfluxDB client.
Flux Query
A Flux query does the following:
It is a simple query scripting language, similar to SQL in relational databases. There is a syntax to write queries in flux where the pipe-forward operator (|>) sends the output of one function as input to the next function
Lets take a look at our flux query in our java code.
"from(bucket: \"influxdbDemo\")\n" +
" |> range(start: -24h)\n" +
" |> filter(fn: (r) => r._measurement == \"temperature\")\n" +
" |> filter(fn: (r) => r._field == \"temperature_val\")\n" +
" |> window(every: 24h)\n"+
" |> mean()";
Here we are specifying a source bucket i.e our influxdbDemo bucket (to which we have been writing data points). We specify a range i.e last 24 hours to get hold of all the points written. Then we filter for a specific measurement name i.e our temperature measurement we created and also we want all records that have a filed key as temperature_val (the filed key we have been adding corresponding field value for).
Next we apply a function to give us mean of values for a specific series and to calculate the mean we specify take mean value in window of every 24 hours (so that we just have one window where all our data for a series is available).
Please note we can have smaller window size possible then it groups the data in that window and performs the mean on the data in every window so created.
As expected we can execute same query on InfluxDB UI as well.
Before executing the mean calculation query, lets first execute the simple query that returns the results without applying any mean function.
The query looks like below, its the same query but without computing mean in a window.
from(bucket: "influxdbDemo")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "temperature")
|> filter(fn: (r) => r["_field"] == "temperature_val")
|> yield()
Output of the query returns 3 series (see definition of series in influxdb above).
Now we execute the query mean computation query on influx UI and see the difference
from(bucket: "influxdbDemo")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "temperature")
|> filter(fn: (r) => r["_field"] == "temperature_val")
|>window(every: 24h)
|> mean()
The output is again three series but this time we see the aggregated mean value, since for m1 and m2 we had one temperature_val each we see the same value but with m3 since we had 4 values we see the mean across those as 54.55.
Getting back to our application since now we understand flux as a query language and the expected execution result, lets try to invoke the same mean query from our application.
String flux = "from(bucket: \"influxdbDemo\")\n" +
" |> range(start: -24h)\n" +
" |> filter(fn: (r) => r._measurement == \"temperature\")\n" +
" |> filter(fn: (r) => r._field == \"temperature_val\")\n" +
" |> window(every: 24h)\n"+
" |> mean()";
QueryApi queryApi = influxDBClient.getQueryApi();
List<FluxTable> tables = queryApi.query(flux, ORG);
The last line of the code block above, takes in our Flux query that already has the bucket name specified and ORG constant has the same org name which we had been using while writing data.
for (FluxTable fluxTable : tables) {
List<FluxRecord> records = fluxTable.getRecords();
for (FluxRecord fluxRecord : records) {
System.out.println(fluxRecord.getValueByKey("host") + ": " + fluxRecord.getValueByKey("_value"));
}
}
Next we simply loop over the result and print the host tag value and the field value contained in _value (corresponding to the filed key temperature_val)
The output of execution looks like this
P.S - The precision issue that we see here is because we used float which is 32 bit while influxdb precession is 64bit hence using double would give us exact match in answer. Checkout for another host m4 where metrics were published as double instead of float and we got the exact same precession.
Influx db is not limited to simple querying and aggregations, we can even have continuous queries(procedures) and tasks (Tasks can be used for downsampling just like continuous queries but also for alerting, data transformations etc) created that run at scheduled intervals perform some aggregations on data and write it to a different bucket or measurement.
Prometheus Vs InfluxDB
This blog would be incomplete without differentiating prometheus with influx db. Prometheus is another popular choice for capturing time series metrics and building monitoring systems.
Key Differences:
Data Model: Prometheus focuses on metrics with key-value pairs, while InfluxDB uses a more structured approach with measurements, tags, fields, and timestamps.
Query Language: PromQL (Prometheus Query Language) for Prometheus, InfluxQL for InfluxDB. Each has its syntax and capabilities tailored to their respective databases.
Data Collection: Prometheus primarily pulls metrics from endpoints, whereas InfluxDB supports both pull and push-based models.
Alerting: Both provide alerting capabilities, but Prometheus integrates alerting seamlessly with its querying language.
Scalability: Both are designed to scale horizontally, but InfluxDB emphasises clustering and sharding earlier in its design.
Purpose: Prometheus is focused on monitoring and alerting, while InfluxDB is more versatile, used for IoT data storage, application metrics monitoring, and more.
So basically there are some differences and some similarities and it's not one better than the other, like in most design decisions it depends on the need of the application and use case at hand that let's us choose either.
Summary
In this blog post we explored a different type of data i.e time series data and how this data is growing and expanding boundaries outside of traditional financial or stocks domain. We explored one of the most widely used time series database and understood the storage engine internals that make it performant when dealing with time series data, its indexing and querying. We built a hands on application, connected it to docker containerised influx db to publish time series data and query the same using a query language called flux. We also explored writing and executing these queries over the influx UI. Though we have a range of visualisations possible on influxUI but influxDB can also be connected as a data source to popular dashboard building tool called grafana that allows us to visualise our metrics and create dashboards our of our series data.
Generative AI & Large Language Models (LLM)/Data Science /DL/NLP/CV/AI/Analytics/BlockChain
4 个月Well written article! Aneshka Goyal. I am actually looking for this kind of article to work on one of my use cases. Luckily I found yours ??