How Scheduling Scheduled Us Weeks Behind Our Schedule?
Praveen Kumar
Engineering @ SuprSend | Ex - Razorpay | Ex Co-Founder DailyPe (YC W23) | IIT KGP
Initially, we believed the problem was straightforward. Our prior configuration utilized goroutines to schedule database queries, enabling us to operate the entire system with minimal setup using SQLite and a Go service. This seemed quite uncomplicated. However, when we chose to incorporate this feature into our SaaS platform, we were unaware at the beginning that we would be encountering a fresh set of difficulties related to dynamic scheduling and simultaneous task execution.
We needed a way to sync data in a scheduled manner from the client's data warehouse to our data store.
Challenges with replicating the previous setup:
To understand the issue, let's check the previous architecture more closely. Our previous architecture permitted users to link to their respective data warehouses, run database queries, and synchronize subscribers using a preset timeline (e.g., hourly, daily).
This scenario appeared straightforward initially, given that we used an embedded SQLite database within the service and anticipated limited occurrences of simultaneous executions, as most customers opted to update individual tables.
Also, since we used Golang, we didn’t need a separate process to handle scheduling as it was done efficiently using goroutines and a lightweight library Asynq built around this concept.
However, the complexity surfaced upon moving this functionality to our SAAS platform. We faced 2 challenges, dynamic scheduling from different processes and concurrent execution of those schedules, which would execute the query, process the results, and sync the subscribers.
How did we arrive at the solution?
Before going through how we arrived at the solution, let’s understand the problems more deeply.
Dynamic Scheduling
Imagine a scenario where anyone can schedule a task at any time, and it can be set to run via a cron schedule or at fixed intervals. The scheduler should be capable of prioritizing tasks at all times and managing them efficiently. Since the schedule can change dynamically, the heartbeat process should adapt to every change.
Conceptually, this can be achieved with the help of Redis’s sorted set data structure.
Redis's Sorted Set is a powerful data structure that significantly aids in scheduling tasks by enabling efficient storage and retrieval of tasks based on their execution time or priority.
The Sorted Set stores elements as unique strings (members) along with their associated scores, which are floating-point numbers. Internally In Redis, sorted sets are implemented using a combination of a hash table and a skip list data structure.
The hash table provides fast access to elements based on their value, while the skip list maintains the sorted order of the elements based on their scores. This dual structure allows Redis to perform operations on sorted sets efficiently.
In the context of task scheduling, scores typically represent the execution time or priority of tasks. Redis maintains the Sorted Set in ascending order based on the scores The priority of a task is determined by its score, with lower scores having higher priority. This allows for fast lookup and retrieval of tasks that are due for execution. If two tasks have the same scores, they are sorted lexicographically.
In the context of Redis-based schedulers, they would use Redis’s ZADD commands (to add task representation in sorted sets) and ZRANGEBYSCORE (to retrieve the highest priority task from the sorted set).
Let’s understand with an example:
Suppose we have a task scheduling system with different priorities (low, medium, high) and execution times. We want to schedule tasks such that high-priority tasks are executed before low-priority tasks, even if a low-priority task has an earlier execution time. To achieve this, we can use a scoring algorithm that combines the priority and execution time into a single score.
Example scoring algorithm:
def calculate_score(priority, execution_time):
# Convert execution_time to a UNIX timestamp
unix_timestamp = execution_time.timestamp()
# Assign numeric values to priorities (lower value means higher priority)
priority_values = {'low': 3, 'medium': 2, 'high': 1}
# Calculate the score by combining the priority value and UNIX timestamp
score = unix_timestamp + (10**9 * priority_values[priority])
return score
Now, let's add tasks to the Redis Sorted Set using the ZADD command:
import redis
# Connect to Redis
r = redis.Redis()
# Add tasks with their calculated scores
r.zadd('scheduled_tasks', {
'Task A (low)': calculate_score('low', datetime(2023, 3, 15, 10, 0, 0)),
'Task B (medium)': calculate_score('medium', datetime(2023, 3, 15, 10, 15, 0)),
'Task C (high)': calculate_score('high', datetime(2023, 3, 15, 10, 30, 0)),
'Task D (low)': calculate_score('low', datetime(2023, 3, 15, 10, 45, 0)),
})
To retrieve tasks due for execution, we can use the ZRANGEBYSCORE command with the current UNIX timestamp as the minimum score and a large value (e.g., +inf) as the maximum score:
import datetime
# Get the current UNIX timestamp
current_timestamp = datetime.datetime.utcnow().timestamp()
# Retrieve tasks due for execution
due_tasks = r.zrangebyscore('scheduled_tasks', current_timestamp, '+inf')
This approach ensures that tasks with higher priority are executed before tasks with lower priority, even if they have later execution times.
Now that the scoring and scheduling part is clear, let’s try to understand how we can leverage this to build a robust system that can schedule tasks from a separate producer process and utilize scheduler, worker, and Redis to function in sync.
In our case, the API server was our producer.
Implementation:
We evaluated various libraries that would utilize this unique functionality provided by Redis, and we found that rq_scheduler in Python ticks all the boxes. We also evaluated:
Now this is how the final architecture looked like:?
For Concurrent DB Writes
Our previous setup, SQLite, wouldn’t work for distributed applications like ours because:
Considering the situation where we had to handle distributed writes from multiple processes on the same DB. We chose to use Redis or Postgres for our distributed application. Since each query execution involved handling multiple states and processing results in batches to alleviate server load, we opted for Postgres as our database.
Postgres solves all the abovementioned issues related to distributed and concurrent writes, and scalability support, which was ideal for our use case. The only drawback was potentially a little extra compute cost to cloud providers for Postgres usage. Still, the cost paid for a bad customer experience is much larger and potentially catastrophic.
Well, after architecting the solution efficiently, processing the queries, which can sometimes even fetch a billion rows (or more), was another critical problem to solve, which we solved by creating a separate service to process the tasks as seen in the architecture diagram, which processed the tasks and sent events to SuprSend internally for subscriber addition.
SuprSend is a powerful product that helps you build your notification system with ease. This is how it works: