Working with Real-Time Data? TimescaleDB Might Be the Simple, Scalable Solution You Need
12 Nov 2024Throughout my career, I've worked on many projects that required real-time processing, from examination scripts to device telemetry data. Those projects used a variety of technologies and architectural patterns that were the best options available at the time.
When I catch up with colleagues who worked with me on these projects, we often discuss how we would approach those same challenges today.
Traditional methods for handling real-time data, such as load balancers, web sockets, and message queues, each have specific limitations.
In this post, I'll explain why TimescaleDB is my choice when adopting an architecture that accommodates time-series data with accuracy and scalability in mind, essential to maintaining efficient, accurate, and up-to-date information.
All the code for used in this post, including instructions of how you can run locally, is available in this GitHub repository.
Why Choose TimescaleDB for Real-Time Data?
Let's start by considering how real-time data might typically be managed in a traditional PostgreSQL database.
You’d often need to set up indexes for efficient querying of the latest values, as shown here:
-- Create an index to help PostgreSQL efficiently retrieve the latest values by patient and metric
CREATE INDEX ON patient_data (patient_id, metric, timestamp DESC);
-- Query to retrieve the latest values per patient and metric
SELECT DISTINCT ON (patient_id, metric)
patient_id, metric, value, timestamp
FROM patient_data
ORDER BY patient_id, metric, timestamp DESC;
While this approach works, it becomes increasingly inefficient as the dataset grows. The database must scan through all records for each patient and metric to find the latest values, even with appropriate indexing.
TimescaleDB takes a fundamentally different approach, offering several key advantages:
-
Automatic Time-Based Partitioning: Data is automatically split into time-based chunks. This allows latest-value queries to only scan recent chunks, with older chunks automatically excluded from queries, making it efficient for real-time monitoring.
-
Continuous Aggregates vs. Traditional Materialised Views: Traditional PostgreSQL materialised views require a complete refresh, whereas TimescaleDB’s continuous aggregates update incrementally. This approach allows real-time updates without the overhead of full recalculations, making it efficient for both real-time and historical data.
-
Time-Based Chunking Benefits: Time-based chunking supports efficient data retention policies and better query performance through automatic chunk exclusion. This enables optimal management of hot (recent) and cold (historical) data.
These technical advantages make TimescaleDB particularly well-suited for several common use cases:
-
IoT Device Monitoring: For managing thousands of smart factory sensors, TimescaleDB’s chunk-based storage allows you to efficiently query the latest readings without scanning historical data, ideal for real-time access and historical trend analysis.
-
Real-Time Analytics: For an e-commerce platform, TimescaleDB supports both real-time dashboards and historical analysis. Its continuous aggregates maintain both current and historical views, allowing easy identification of trends and anomalies.
-
Financial Trading Systems: In trading, the precise order of events and millisecond timestamp accuracy are crucial. TimescaleDB’s guaranteed timestamp-based ordering allows accurate tracking of positions and price movements, while continuous aggregates efficiently calculate time-based metrics like moving averages.
Challenges with Ordering in Real-Time Systems
Ensuring accurate, time-ordered data processing can be complex with traditional tools. Here’s a look at common issues in real-time systems using message queues and streaming platforms:
Message Queues
I've encountered issues when using message queues like SQS, which, at best, process messages in the order they arrive, not necessarily by their true event time. This can lead to inaccurate or inconsistent metrics for IoT health and device data, where order matters for meaningful analysis. This limitation risks producing misleading data, especially in systems where the correct sequence of events is crucial.
Caching Solutions (e.g., Redis)
Although Redis is more commonly used to cache data or as a message broker, I've also used it for real-time processing. It works well in many cases but isn't optimised to preserve strict event order. Without built-in time ordering, events can be processed out of sequence under high load or distributed conditions, compromising accuracy in scenarios that rely on precise event timing, such as monitoring health metrics or device telemetry.
Streaming Platforms
When I've platforms like Kafka that can preserve order within a partition, it lacks global time-based order across partitions. This can lead to issues when precise event sequencing is required.
For applications like order management or health monitoring, these limitations can lead to unpredictable behaviour. Here’s a diagram showing how out-of-order processing can occur:
sequenceDiagram participant User participant MessageQueue as Message Queue (e.g., SQS) participant Redis as Caching Solution (e.g., Redis) participant StreamingPlatform as Streaming Platform (e.g., Kafka) participant AppServer as App Server participant Database as Database User->>MessageQueue: Send Order (Event Time: T1) User->>MessageQueue: Send Cancel Order (Event Time: T2) Note over MessageQueue: Messages may arrive out of order MessageQueue->>AppServer: Cancel Order (T2) MessageQueue->>AppServer: Order (T1) AppServer->>Database: Apply Cancel Before Order, causes inconsistency User->>Redis: Send Health Metric (Event Time: T1) User->>Redis: Send Health Update (Event Time: T2) Note over Redis: High load may process events out of order Redis->>AppServer: Health Update (T2) Redis->>AppServer: Health Metric (T1) User->>StreamingPlatform: Send Trade Event (Event Time: T1) User->>StreamingPlatform: Send Price Update (Event Time: T2) Note over StreamingPlatform: Order maintained in partitions only StreamingPlatform->>AppServer: Price Update (T2) StreamingPlatform->>AppServer: Trade Event (T1)
How TimescaleDB Solves the Ordering Challenge
TimescaleDB, built on PostgreSQL, is designed for high-frequency, timestamped data, ensuring that data is stored and queried in true event order. This enables precise, reliable real-time updates and state management.
Here’s how TimescaleDB supports critical real-time requirements:
- Consistent State Management: Events are processed in chronological order, ensuring state accuracy.
- Efficient Real-Time Access: TimescaleDB’s time-based ordering allows immediate access to the latest data without reordering.
- Automatic Partitioning and Retention Policies: TimescaleDB partitions data by time and automatically manages storage, making it efficient and scalable for large datasets.
Example: Real-Time Data Processing with Upserts in TimescaleDB
Let’s walk through a healthcare example where we track patient vitals such as pulse and temperature. We’ll use an upsert pattern to retain only the most recent data for each patient and metric.
Table Setup with Time-Series Optimisation
We create a patient_data
table with a primary key on (patient_id, metric, timestamp)
. This setup allows multiple records per patient and metric with different timestamps, and we then convert the table into a TimescaleDB hypertable for time-series performance.
-- Create a table with a primary key for patient ID, metric, and timestamp
CREATE TABLE IF NOT EXISTS patient_data (
patient_id TEXT NOT NULL,
metric TEXT NOT NULL,
value DOUBLE PRECISION,
timestamp TIMESTAMPTZ NOT NULL,
PRIMARY KEY (patient_id, metric, timestamp)
);
-- Convert the table into a TimescaleDB hypertable, optimised for time-series data
SELECT create_hypertable('patient_data', 'timestamp', if_not_exists => TRUE);
Materialised View for the Latest Value
To query the latest value for each patient and metric, we use a materialised view that refreshes periodically:
-- Create a materialised view for real-time access patterns
-- Optimises read performance by pre-computing latest values
-- Trade-off: Additional storage for improved query performance
CREATE MATERIALIZED VIEW IF NOT EXISTS latest_pulse AS
SELECT DISTINCT ON (patient_id)
patient_id,
metric,
timestamp AS latest_timestamp,
value AS latest_value
FROM patient_data
WHERE metric = 'pulse'
ORDER BY patient_id, timestamp DESC;
Continuous Aggregate for Historical Summaries
In addition to the latest values, it’s useful to track historical trends, such as hourly or daily averages. TimescaleDB’s continuous aggregates make this easy by automatically updating as new data is ingested.
For example, to track the hourly average pulse for each patient, we can create the following continuous aggregate:
-- Create a continuous aggregate to calculate hourly average pulse for each patient
CREATE MATERIALIZED VIEW IF NOT EXISTS hourly_avg_pulse
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', timestamp) AS bucket,
patient_id,
AVG(value) AS avg_pulse
FROM patient_data
WHERE metric = 'pulse'
GROUP BY bucket, patient_id;
This setup automatically aggregates data into hourly buckets, giving us an efficient view of historical trends over time. Continuous aggregates simplify query performance by summarising data automatically as it arrives.
Scalable Data Ingestion with Batch Processing
Handling high-frequency data in real time often requires inserting large volumes of records quickly. Using psycopg2
’s execute_batch
, we can perform batch data ingestion, which is more efficient than inserting each record individually.
In the updated ingest_data_batch
function, we gather data into batches and insert them all at once:
import os
import psycopg2
from psycopg2.extras import execute_batch
from datetime import datetime
import random
import time
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
def ingest_data_batch(batch_size=10, interval=5):
database_url = os.getenv("DATABASE_URL")
if not database_url:
print("Error: DATABASE_URL is not set in the environment.")
raise ValueError("DATABASE_URL is not set in the environment.")
try:
conn = psycopg2.connect(database_url)
print("Connected to the database successfully.")
except psycopg2.OperationalError as e:
print(f"Database connection error: {e}")
return
with conn.cursor() as cursor:
patients = ["patient1", "patient2", "patient3"]
metrics = ["pulse", "temperature"]
while True:
data_batch = []
# Generate random data for each patient and metric
for _ in range(batch_size):
patient = random.choice(patients)
metric = random.choice(metrics)
value = random.uniform(60, 100) if metric == "pulse" else random.uniform(36, 38)
timestamp = datetime.utcnow()
data_batch.append((patient, metric, value, timestamp))
# Execute batch insert for efficiency
try:
execute_batch(
cursor,
"""
INSERT INTO patient_data (patient_id, metric, value, timestamp)
VALUES (%s, %s, %s, %s)
ON CONFLICT (patient_id, metric, timestamp) DO NOTHING;
""",
data_batch,
)
conn.commit()
print(f"Batch of {batch_size
} records upserted successfully.")
except psycopg2.Error as e:
print(f"Database error during batch insert: {e}")
conn.rollback()
# Wait before the next batch insert
print(f"Next update in {interval} seconds...\n")
time.sleep(interval)
if __name__ == "__main__":
print("Starting batch data ingestion process.")
ingest_data_batch(batch_size=10, interval=5)
This batch ingestion function:
- Builds a batch of records in each loop.
- Inserts the entire batch at once using
execute_batch
, which is more efficient than inserting records one at a time.
Querying the Last X Values
For flexible querying, TimescaleDB allows easy retrieval of the latest and historical values. Here’s a query to get the last 5 values for a specific patient:
SELECT patient_id, metric, value, timestamp
FROM patient_data
WHERE patient_id = 'patient1' AND metric = 'pulse'
ORDER BY timestamp DESC
LIMIT 5;
Python Code for Retrieving the Latest and Historical Values
Here’s a Python script that runs this query and retrieves the latest records:
import os
import psycopg2
from dotenv import load_dotenv
from prettytable import PrettyTable
import time
load_dotenv()
def get_latest_values(patient_id, metric, limit=5, interval=10):
database_url = os.getenv("DATABASE_URL")
if not database_url:
print("Error: DATABASE_URL is not set in the environment.")
raise ValueError("DATABASE_URL is not set in the environment.")
try:
conn = psycopg2.connect(database_url)
print("Connected to the database successfully.")
except psycopg2.OperationalError as e:
print(f"Database connection error: {e}")
return
while True:
with conn.cursor() as cursor:
try:
cursor.execute(
"""
SELECT patient_id, metric, value, timestamp
FROM patient_data
WHERE patient_id = %s AND metric = %s
ORDER BY timestamp DESC
LIMIT %s;
""",
(patient_id, metric, limit),
)
records = cursor.fetchall()
if records:
# Create and display the table using PrettyTable
table = PrettyTable(["Patient ID", "Metric", "Value", "Timestamp"])
for record in records:
table.add_row(record)
os.system("cls" if os.name == "nt" else "clear")
print("Latest Values:")
print(table)
else:
print(f"No records found for patient_id={patient_id}, metric={metric}.")
except psycopg2.Error as e:
print(f"Database query error: {e}")
# Wait for interval before refreshing
print(f"Updating in {interval} seconds...\n")
time.sleep(interval)
if __name__ == "__main__":
print("Starting periodic latest values retrieval.")
get_latest_values("patient1", "pulse", limit=5, interval=2)
Conclusion
Hopefully this post illustrates why I've found TimescaleDB to be a powerful solution for handling timestamp-sensitive data. It offers features like ordered real-time ingestion, continuous aggregates, and efficient querying.
While similar outcomes can be achieved with other solutions, TimescaleDB simplifies the process, sparing you from implementing complex workarounds to achieve the same scalability and reliability for real-time monitoring, insights, and analysis.
In the next post, I'll explain how integrating NATS JetStream can enhance this setup by providing reliable, ordered message delivery and robust real-time streaming capabilities. With NATS JetStream, we can efficiently manage message streams, ensure message persistence, and add replay capabilities to handle complex data flows seamlessly.