Arrange Act Assert

Jag Reehals thinking on things, mostly product development

Working with Real-Time Data? TimescaleDB Might Be the Simple, Scalable Solution You Need

12 Nov 2024

Throughout my career, I've worked on many projects that required real-time processing, from examination scripts to device telemetry data. Those projects used a variety of technologies and architectural patterns that were the best options available at the time.

When I catch up with colleagues who worked with me on these projects, we often discuss how we would approach those same challenges today.

Traditional methods for handling real-time data, such as load balancers, web sockets, and message queues, each have specific limitations.

In this post, I'll explain why TimescaleDB is my choice when adopting an architecture that accommodates time-series data with accuracy and scalability in mind, essential to maintaining efficient, accurate, and up-to-date information.

All the code for used in this post, including instructions of how you can run locally, is available in this GitHub repository.

Why Choose TimescaleDB for Real-Time Data?

Let's start by considering how real-time data might typically be managed in a traditional PostgreSQL database.

You’d often need to set up indexes for efficient querying of the latest values, as shown here:

-- Create an index to help PostgreSQL efficiently retrieve the latest values by patient and metric
CREATE INDEX ON patient_data (patient_id, metric, timestamp DESC);

-- Query to retrieve the latest values per patient and metric
SELECT DISTINCT ON (patient_id, metric)
    patient_id, metric, value, timestamp
FROM patient_data
ORDER BY patient_id, metric, timestamp DESC;

While this approach works, it becomes increasingly inefficient as the dataset grows. The database must scan through all records for each patient and metric to find the latest values, even with appropriate indexing.

TimescaleDB takes a fundamentally different approach, offering several key advantages:

These technical advantages make TimescaleDB particularly well-suited for several common use cases:

Challenges with Ordering in Real-Time Systems

Ensuring accurate, time-ordered data processing can be complex with traditional tools. Here’s a look at common issues in real-time systems using message queues and streaming platforms:

Message Queues

I've encountered issues when using message queues like SQS, which, at best, process messages in the order they arrive, not necessarily by their true event time. This can lead to inaccurate or inconsistent metrics for IoT health and device data, where order matters for meaningful analysis. This limitation risks producing misleading data, especially in systems where the correct sequence of events is crucial.

Caching Solutions (e.g., Redis)

Although Redis is more commonly used to cache data or as a message broker, I've also used it for real-time processing. It works well in many cases but isn't optimised to preserve strict event order. Without built-in time ordering, events can be processed out of sequence under high load or distributed conditions, compromising accuracy in scenarios that rely on precise event timing, such as monitoring health metrics or device telemetry.

Streaming Platforms

When I've platforms like Kafka that can preserve order within a partition, it lacks global time-based order across partitions. This can lead to issues when precise event sequencing is required.

For applications like order management or health monitoring, these limitations can lead to unpredictable behaviour. Here’s a diagram showing how out-of-order processing can occur:

sequenceDiagram
    participant User
    participant MessageQueue as Message Queue (e.g., SQS)
    participant Redis as Caching Solution (e.g., Redis)
    participant StreamingPlatform as Streaming Platform (e.g., Kafka)
    participant AppServer as App Server
    participant Database as Database

    User->>MessageQueue: Send Order (Event Time: T1)
    User->>MessageQueue: Send Cancel Order (Event Time: T2)
    Note over MessageQueue: Messages may arrive out of order
    MessageQueue->>AppServer: Cancel Order (T2)
    MessageQueue->>AppServer: Order (T1)
    AppServer->>Database: Apply Cancel Before Order, causes inconsistency

    User->>Redis: Send Health Metric (Event Time: T1)
    User->>Redis: Send Health Update (Event Time: T2)
    Note over Redis: High load may process events out of order
    Redis->>AppServer: Health Update (T2)
    Redis->>AppServer: Health Metric (T1)

    User->>StreamingPlatform: Send Trade Event (Event Time: T1)
    User->>StreamingPlatform: Send Price Update (Event Time: T2)
    Note over StreamingPlatform: Order maintained in partitions only
    StreamingPlatform->>AppServer: Price Update (T2)
    StreamingPlatform->>AppServer: Trade Event (T1)

How TimescaleDB Solves the Ordering Challenge

TimescaleDB, built on PostgreSQL, is designed for high-frequency, timestamped data, ensuring that data is stored and queried in true event order. This enables precise, reliable real-time updates and state management.

Here’s how TimescaleDB supports critical real-time requirements:

  1. Consistent State Management: Events are processed in chronological order, ensuring state accuracy.
  2. Efficient Real-Time Access: TimescaleDB’s time-based ordering allows immediate access to the latest data without reordering.
  3. Automatic Partitioning and Retention Policies: TimescaleDB partitions data by time and automatically manages storage, making it efficient and scalable for large datasets.

Example: Real-Time Data Processing with Upserts in TimescaleDB

Let’s walk through a healthcare example where we track patient vitals such as pulse and temperature. We’ll use an upsert pattern to retain only the most recent data for each patient and metric.

real-time-timescaledb

Table Setup with Time-Series Optimisation

We create a patient_data table with a primary key on (patient_id, metric, timestamp). This setup allows multiple records per patient and metric with different timestamps, and we then convert the table into a TimescaleDB hypertable for time-series performance.

-- Create a table with a primary key for patient ID, metric, and timestamp
CREATE TABLE IF NOT EXISTS patient_data (
    patient_id TEXT NOT NULL,
    metric TEXT NOT NULL,
    value DOUBLE PRECISION,
    timestamp TIMESTAMPTZ NOT NULL,
    PRIMARY KEY (patient_id, metric, timestamp)
);

-- Convert the table into a TimescaleDB hypertable, optimised for time-series data
SELECT create_hypertable('patient_data', 'timestamp', if_not_exists => TRUE);

Materialised View for the Latest Value

To query the latest value for each patient and metric, we use a materialised view that refreshes periodically:

-- Create a materialised view for real-time access patterns
-- Optimises read performance by pre-computing latest values
-- Trade-off: Additional storage for improved query performance
CREATE MATERIALIZED VIEW IF NOT EXISTS latest_pulse AS
SELECT DISTINCT ON (patient_id)
    patient_id,
    metric,
    timestamp AS latest_timestamp,
    value AS latest_value
FROM patient_data
WHERE metric = 'pulse'
ORDER BY patient_id, timestamp DESC;

Continuous Aggregate for Historical Summaries

In addition to the latest values, it’s useful to track historical trends, such as hourly or daily averages. TimescaleDB’s continuous aggregates make this easy by automatically updating as new data is ingested.

For example, to track the hourly average pulse for each patient, we can create the following continuous aggregate:

-- Create a continuous aggregate to calculate hourly average pulse for each patient
CREATE MATERIALIZED VIEW IF NOT EXISTS hourly_avg_pulse
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', timestamp) AS bucket,
    patient_id,
    AVG(value) AS avg_pulse
FROM patient_data
WHERE metric = 'pulse'
GROUP BY bucket, patient_id;

This setup automatically aggregates data into hourly buckets, giving us an efficient view of historical trends over time. Continuous aggregates simplify query performance by summarising data automatically as it arrives.

Scalable Data Ingestion with Batch Processing

Handling high-frequency data in real time often requires inserting large volumes of records quickly. Using psycopg2’s execute_batch, we can perform batch data ingestion, which is more efficient than inserting each record individually.

In the updated ingest_data_batch function, we gather data into batches and insert them all at once:

import os
import psycopg2
from psycopg2.extras import execute_batch
from datetime import datetime
import random
import time
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

def ingest_data_batch(batch_size=10, interval=5):
    database_url = os.getenv("DATABASE_URL")
    if not database_url:
        print("Error: DATABASE_URL is not set in the environment.")
        raise ValueError("DATABASE_URL is not set in the environment.")

    try:
        conn = psycopg2.connect(database_url)
        print("Connected to the database successfully.")
    except psycopg2.OperationalError as e:
        print(f"Database connection error: {e}")
        return

    with conn.cursor() as cursor:
        patients = ["patient1", "patient2", "patient3"]
        metrics = ["pulse", "temperature"]

        while True:
            data_batch = []
            # Generate random data for each patient and metric
            for _ in range(batch_size):
                patient = random.choice(patients)
                metric = random.choice(metrics)
                value = random.uniform(60, 100) if metric == "pulse" else random.uniform(36, 38)
                timestamp = datetime.utcnow()
                data_batch.append((patient, metric, value, timestamp))

            # Execute batch insert for efficiency
            try:
                execute_batch(
                    cursor,
                    """
                    INSERT INTO patient_data (patient_id, metric, value, timestamp)
                    VALUES (%s, %s, %s, %s)
                    ON CONFLICT (patient_id, metric, timestamp) DO NOTHING;
                    """,
                    data_batch,
                )
                conn.commit()
                print(f"Batch of {batch_size

} records upserted successfully.")
            except psycopg2.Error as e:
                print(f"Database error during batch insert: {e}")
                conn.rollback()

            # Wait before the next batch insert
            print(f"Next update in {interval} seconds...\n")
            time.sleep(interval)

if __name__ == "__main__":
    print("Starting batch data ingestion process.")
    ingest_data_batch(batch_size=10, interval=5)

This batch ingestion function:

Querying the Last X Values

For flexible querying, TimescaleDB allows easy retrieval of the latest and historical values. Here’s a query to get the last 5 values for a specific patient:

SELECT patient_id, metric, value, timestamp
FROM patient_data
WHERE patient_id = 'patient1' AND metric = 'pulse'
ORDER BY timestamp DESC
LIMIT 5;

Python Code for Retrieving the Latest and Historical Values

Here’s a Python script that runs this query and retrieves the latest records:

import os
import psycopg2
from dotenv import load_dotenv
from prettytable import PrettyTable
import time

load_dotenv()

def get_latest_values(patient_id, metric, limit=5, interval=10):
    database_url = os.getenv("DATABASE_URL")
    if not database_url:
        print("Error: DATABASE_URL is not set in the environment.")
        raise ValueError("DATABASE_URL is not set in the environment.")

    try:
        conn = psycopg2.connect(database_url)
        print("Connected to the database successfully.")
    except psycopg2.OperationalError as e:
        print(f"Database connection error: {e}")
        return

    while True:
        with conn.cursor() as cursor:
            try:
                cursor.execute(
                    """
                    SELECT patient_id, metric, value, timestamp
                    FROM patient_data
                    WHERE patient_id = %s AND metric = %s
                    ORDER BY timestamp DESC
                    LIMIT %s;
                    """,
                    (patient_id, metric, limit),
                )
                records = cursor.fetchall()

                if records:
                    # Create and display the table using PrettyTable
                    table = PrettyTable(["Patient ID", "Metric", "Value", "Timestamp"])
                    for record in records:
                        table.add_row(record)

                    os.system("cls" if os.name == "nt" else "clear")
                    print("Latest Values:")
                    print(table)
                else:
                    print(f"No records found for patient_id={patient_id}, metric={metric}.")
            except psycopg2.Error as e:
                print(f"Database query error: {e}")

        # Wait for interval before refreshing
        print(f"Updating in {interval} seconds...\n")
        time.sleep(interval)

if __name__ == "__main__":
    print("Starting periodic latest values retrieval.")
    get_latest_values("patient1", "pulse", limit=5, interval=2)

Conclusion

Hopefully this post illustrates why I've found TimescaleDB to be a powerful solution for handling timestamp-sensitive data. It offers features like ordered real-time ingestion, continuous aggregates, and efficient querying.

While similar outcomes can be achieved with other solutions, TimescaleDB simplifies the process, sparing you from implementing complex workarounds to achieve the same scalability and reliability for real-time monitoring, insights, and analysis.

In the next post, I'll explain how integrating NATS JetStream can enhance this setup by providing reliable, ordered message delivery and robust real-time streaming capabilities. With NATS JetStream, we can efficiently manage message streams, ensure message persistence, and add replay capabilities to handle complex data flows seamlessly.

timescaledb architecture real-time