Asynchronous Scheduling with APScheduler

APScheduler interaction diagram

Image from pythonmana.com

✪ Ryan's Profile Image ✪ Ryan   |   Sept. 19, 2023,  Last edited Sept. 19, 2023   |   Johnson City, TN, US

Overview

 

Scheduling tasks is a necessity in system administration and application development. There are a variety of techniques and libraries for scheduling tasks and depending on the use case, a particular technique or library may be preferred. Advanced Python Scheduler (APScheduler) offers thread-safe schedulers with configurable triggers for Cron-like scheduling, delayed scheduling of single-run jobs, and interval-based scheduling. While the descriptor advanced might carry the connotation hard to use, APScheduler's syntax is succinct and straightforward. Its thread-safety, configurability, simple syntax, and documentation, make APScheduler a compelling library for scheduling tasks. Perhaps it is best described in the developer's own words:

Advanced Python Scheduler (APScheduler) is a light but powerful in-process task scheduler that lets you schedule jobs (functions or any python callables) to be executed at times of your choosing.

This can be a far better alternative to externally run cron scripts for long-running applications (e.g. web applications), as it is platform neutral and can directly access your application’s variables and functions.

The development of APScheduler was heavily influenced by the Quartz task scheduler written in Java. APScheduler provides most of the major features that Quartz does, but it also provides features not present in Quartz (such as multiple job stores). [https://pypi.org/project/APScheduler/2.1.2/]

APScheduler is not included in the standard Python installation. It is maintained by developer Alex Grönholm (https://github.com/agronholm/apscheduler). To install APScheduler and its dependencies, first install pip, if you have not yet so. Then add pip to your system's Path directory. Finally, run the following command in your system's command line interpreter:

pip install apscheduler

 

Schedulers and Triggers

 

APScheduler offers four main components: triggers, job stores, executors, and schedulers. This lesson will introduce the two most fundamental components: schedulers and triggers.

 

Schedulers

 

The scheduler is the component used to add or remove a job. APScheduler has several predefined scheduling modules. These include:

  1. BlockingScheduler: As the name indicates, this scheduler is blocking, meaning it prevents the execution of further code by running in the foreground. This could be used to provide a standalone scheduler (e.g., to build a daemon).
    Example implementation.
     
  2. BackgroundScheduler: This scheduler runs inside your existing application on a separate thread. As such, it is considered to run in the background of an application.
    Example implementation
     
  3. AsyncIOScheduler: This scheduler will run jobs in the asyncio event loop's thread pool. This lesson will focus on using AsyncIOScheduler with a few different types of triggers.
    Example implementation

Among the included schedulers APScheduler also provides GeventScheduler, TornadoScheduler, TwistedScheduler, and QtScheduler which are designed to work with specific libraries.

 

 

Triggers

 

Triggers contain the logic to calculate the date/time for the scheduler to execute a job. APScheduler provides three trigger types:

 

Using AsyncIOScheduler

 

AsyncIOScheduler with DateTrigger

 

One thing we may want to do is schedule a job to run at a particular time on a given date. This would be useful if we were writing an application (such as a Django or Flask web app) which provides a feature to schedule reminders or calendar events.

First, we will import AsyncIOScheduler, the DateTrigger class, and the datetime and timedelta modules (from the datetime library).

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.date import DateTrigger
from datetime import datetime as dt, timedelta

 

Executing Our First Job

 

First, let's see a simplistic example. We will schedule a function to run at a certain time on a particular date. We will use DateTrigger to calculate for the scheduler when our function should run.

sched = AsyncIOScheduler() # Initialize a new async scheduler
sched.start() # Start the scheduler
# Create a function to execute at the scheduled time
def exec_task():
    print("Task executed!")

timestamp = dt.now() + timedelta(seconds=2) # 2 seconds from now
dt_trigger = DateTrigger(run_date=timestamp) # Create a DateTrigger

sched.add_job(func=exec_task, trigger=dt_trigger)
<Job (id=d8a6d45b6d72413fbedd351772445bdb name=exec_task)>
 
Task executed!

 

Removing a Job

 

When we create a job, AsyncIOScheduler returns a reference to the newly created Job object (apscheduler.job.Job). If we decide we want to cancel our job before it is scheduled to run, we must access the Job's unique id so that we can pass it to remove_job.

timestamp = dt.now() + timedelta(seconds=2) # 2 seconds from now
dt_trigger = DateTrigger(run_date=timestamp) # Create a DateTrigger

job = sched.add_job(func=exec_task, trigger=dt_trigger)
print(f"Job id={job.id}")

# Delete the job:
sched.remove_job(job.id)
Job id=f6adce10664145bc9853c92a2719f56e

 

Using args

 

If we need to accept arguments in our target function, we will have to use a slightly different syntax in the add_job function.

def exec_task(task_no, n):
    print(f"Task {task_no} of {n} executed!")

for i in range(0, 3):
    timestamp = dt.now() + timedelta(seconds=1)
    dt_trigger = DateTrigger(run_date=timestamp)

    # We use an args list that our target function
    # will assign to its parameters.
    sched.add_job(func=exec_task, trigger=dt_trigger, args=[i, 3])
Task 0 of 3 executed!Task 1 of 3 executed!

Task 2 of 3 executed!

 

Run the above function a few times. Notice that the last task sometimes executes second. That is due to the scheduler's asynchronous nature.

NOTE: If you need to guarantee that jobs execute in a particular order, you will need to use a synchronous scheduler.

 

Shutting Down

 

Finally, we can shut down our asynchronous scheduler's executor.

# Shut down the scheduler once all of our jobs have executed.
sched.shutdown(wait=True)

 

Writing a Custom Reminder System that uses DateTrigger

 

Now that we know how to use the async scheduler with a DateTrigger, we can create something a little bit more useful: a basic design for scheduling reminders.

 

First, we will create a reminder class that we can instantiate to represent reminders.

class Reminder:
    def __init__(self, timestamp, title, description):
        # The first two fields will be accessed by our
        # scheduler:
        self._job_id       = None
        self.scheduled     = False
        self.timestamp     = timestamp
        self.title         = title
        self.description   = description
        # Create a human-readable string to show when the reminder is scheduled:
        self.reminder_time = dt.strftime(self.timestamp, '%H:%M:%S on %m-%d-%y')
        
    
    def update(self, timestamp, title=None, description=None):
        # If the reminder is alread scheduled, require
        # the reminder to be un-scheduled before the reminder
        # can be updated.
        if self.scheduled:
            return "Please remove the scheduled job before updating"
        else:
            self.timestamp = timestamp
            self.reminder_time = dt.strftime(self.timestamp, '%H:%M:%S on %m-%d-%y')
            # Set title and description if they were passed in as args:
            if title:
                self.title = title
            if description:
                self.description = description
            return "Updated"
    
    def __str__(self):
        return (
            f"Title:         {self.title}\n"
            f"Scheduled:     {self.scheduled}\n"
            f"Reminder time: {self.reminder_time}\n"
            f"Description:   {self.description}\n"
        )

 

Now we will write a custom scheduler class that extends the functionality of AsyncIOScheduler for our particular use case. In essence, we are creating a wrapper class that will start the scheduler when it is initialized. Our subtyped class will allow Reminder objects to conveniently be passed as arguments that we will then use to schedule or remove jobs with the methods provided by our superclass (APScheduler), namely, add_job and remove_job.

class AsyncReminderScheduler(AsyncIOScheduler):
    def show_reminder(self, reminder):
        reminder.scheduled=False # Set scheduled to false again -- event fired
        print("\n" + str(reminder))
    
    def __init__(self):
        super().__init__()
        self.start()
    
    def add_reminder(self, reminder):        
        # If the show_reminder function had no args, we would not include the args list
        dt_trigger = DateTrigger(run_date=reminder.timestamp)
        job = self.add_job(func=self.show_reminder, trigger=dt_trigger, args=[reminder])
        
        # Update the reminder object to show that it is scheduled
        # and store the job ID so the reminder can be removed if needed:
        reminder._job_id = job.id
        reminder.scheduled = True
        
    def remove_reminder(self, reminder):
        if reminder.scheduled:
            self.remove_job(reminder._job_id)
            reminder.scheduled = False

 

Now that we have all the code we need, let's go ahead and initialize the scheduler.

reminder_sched = AsyncReminderScheduler()
 

Now that our scheduler is running, let's create a new reminder.

# Create a datetime.datetime object on the current date, 1 day from now:
reminder_timestamp = dt.now() + timedelta(days=1)

py_reminder = Reminder(reminder_timestamp, 'Do Python work!', 'Lab due at midnight 😩')

 

Let's schedule our new reminder.

 
reminder_sched.add_reminder(py_reminder)
print(f'Reminder scheduled: {py_reminder.scheduled}')

# Since the reminder is scheduled, we are locked from updating it until
# the reminder has been un-scheduled:
attempt = py_reminder.update(dt.now())
print(attempt)

# Now, let's un-schedule the reminder so we can modify the time and description:
reminder_sched.remove_reminder(py_reminder)
print(f'\nReminder scheduled: {py_reminder.scheduled}')

new_timestamp = dt.now() + timedelta(seconds=4) # Run in 4 seconds...
attempt = py_reminder.update(new_timestamp, description="Lab due tomorrow 😮‍💨")
print(attempt)

reminder_sched.add_reminder(py_reminder)
print(f'\nReminder scheduled: {py_reminder.scheduled}')
Reminder scheduled: True
Please remove the scheduled job before updating

Reminder scheduled: False
Updated

Reminder scheduled: True

Title:         Do Python work!
Scheduled:     False
Reminder time: 23:12:07 on 04-01-22
Description:   Lab due tomorrow 😮‍💨

 

Enhancing the Reminder System: Adding CronTriggers

 

Using DateTrigger let us set one-time reminders but many scheduling apps allow us to set repeating reminders. Using the CronTrigger, we can too! Let's create a WeeklyReminder class and then we will recreate AsyncReminderScheduler with a new add_weekly_reminder function (we will use the same remove_reminder function to un-schedule either type of reminder.

 

First, we will define WeeklyReminder. We will accept a string for the day followed along with the hour and minute as ints to specify the time that our reminders will trigger on their respective day of the week.

 

CronTrigger accepts a few different arguments for the day. The days can be specified as strings in the format:

"mon", "tue", "wed", "thu", "fri", "sat"

 

They can also be specified as integers from 0-6 with 0 representing Monday and 6 representing Sunday.

 

Cron is even flexible enough to let us specify day ranges where by linking the two of the string day specifiers with a dash. E.g.,

"mon-fri", "sun-sat", "tue-wed"

 

By accepting a string argument to the day field of our WeeklyReminder class, we can give users the flexibility to schedule a job on a range of days or an individual day.

class WeeklyReminder:
    def __init__(self, day, hour, minute, title, description):
        day = day.lower() # Clean
        if not '-' in day: #If not day range
            day = day[0:3] # Ensure 1st 3 letters only in case full day name was passed
        
        self._job_id       = None
        self.scheduled     = False
        self.day           = day
        self.hour          = hour
        self.minute        = minute
        self.title         = title
        self.description   = description

        self.reminder_time = f"{self.hour}:{self.minute} on {self.day}"
    
    def update(self, day=None, hour=None, minute=None, title=None, description=None):
        # If the reminder is alread scheduled, require
        # the reminder to be un-scheduled before the reminder
        # can be updated.
        if self.scheduled:
            return "Please remove the scheduled job before updating"
        else:
            day = day.lower() # Clean
            if day:
                if not '-' in day: #If not day range
                    day = day[0:3] # Ensure 1st 3 letters only in case full day name was passed
                self.day = day
            if hour:
                self.hour = hour
            if minute:
                self.minute = minute
            if title:
                self.title = title
            if description:
                self.description = description

            self.reminder_time = f"{self.hour}:{self.minute} on {self.day}"
            
            return "Updated"
    
    def __str__(self):
        return (
            f"Title:         {self.title}\n"
            f"Scheduled:     {self.scheduled}\n"
            f"Reminder time: {self.reminder_time}\n"
            f"Description:   {self.description}\n"
        )

 

We will redefine the AsyncReminderScheduler by adding the add_weekly_reminder function that uses a CronTrigger.

import tzlocal

from apscheduler.triggers.cron import CronTrigger

class AsyncReminderScheduler(AsyncIOScheduler):
    def show_reminder(self, reminder):
        reminder.scheduled=False # Set scheduled to false again -- event fired
        print("\n" + str(reminder))
    
    def __init__(self):
        super().__init__(timezone=str(tzlocal.get_localzone()))
        self.start()
    
    def add_reminder(self, reminder):        
        # If the show_reminder function had no args, we would not include the args list
        dt_trigger = DateTrigger(run_date=reminder.timestamp)
        job = self.add_job(func=self.show_reminder, trigger=dt_trigger, args=[reminder])
        
        # Update the reminder object to show that it is scheduled
        # and store the job ID so the reminder can be removed if needed:
        reminder._job_id = job.id
        reminder.scheduled = True
        
    def add_weekly_reminder(self, reminder):
        cr_trigger = CronTrigger(
                            day_of_week=reminder.day,
                            hour=reminder.hour,
                            minute=reminder.minute
                        )
        job = self.add_job(func=self.show_reminder, trigger=cr_trigger, args=[reminder])
        reminder.scheduled = True
        
    def remove_reminder(self, reminder):
        if reminder.scheduled:
            self.remove_job(reminder._job_id)
            reminder.scheduled = False
# Initialize a new scheduler
reminder_sched = AsyncReminderScheduler()
# Schedule 1 minute from the current time weekly:
now = dt.now() + timedelta(minutes=1)
hour = now.hour
minute = now.minute

day1 = now.strftime('%A')[0:3] + "-"
tmrw = now + timedelta(days=1)
day2 = tmrw.strftime('%A')[0:3]

day = day1 + day2

print(f"Day arg:    {day} {type(day)}")
print(f"Hour:       {hour} {type(hour)}")
print(f"Minute arg: {minute} {type(minute)}")

py_wk_reminder = WeeklyReminder(
                    day, hour, minute, 'Do Python work!',
                    '🐍 Time to write more Python!!'
                 )

reminder_sched.add_weekly_reminder(py_wk_reminder)
print("Reminders should print in ~60 seconds")
 
Day arg:    Fri-Sat <class 'str'>
Hour:       23 <class 'int'>
Minute arg: 17 <class 'int'>
Reminders should print in ~60 seconds

Title:         Do Python work!
Scheduled:     False
Reminder time: 23:17 on fri-sat
Description:   🐍 Time to write more Python!!

 

PytzUsageWarning

 

You may have noticed one other slight change in the redefinition of AsyncReminderScheduler: tzlocal was imported and the superclass init function changed to super().__init__(timezone=str(tzlocal.get_localzone())). That was done to supress a warning that looks something like this:

 

C:\ProgramData\Anaconda3\lib\site-packages\apscheduler\util.py:436: PytzUsageWarning: The localize method is no longer necessary, as this time zone supports the fold attribute (PEP 495). For more details on migrating to a PEP 495-compliant implementation, see https://pytz-deprecation-shim.readthedocs.io/en/latest/migration.html

return tzinfo.localize(dt)

 

In short, PEP 495 deprecated pytz in favor of zoneinfo. APScheduler is clearly still using pytz. Using the local timezone from tzlocal will suppress the ugly warning.

 

AsyncIOScheduler with IntervalTrigger

 

Some jobs need to be performed on intervals, for example, syncing a message inbox every 30 days, or perhaps running a custom script every few hours. IntervalTrigger fits such use cases.

 

def exec_job():
    print("Executing job")

sched = AsyncIOScheduler()

sched.start()

# Schedule exec_job to be called every two hours
sched.add_job(exec_job, 'interval', hours=2)
<Job (id=5df83ac2d7314076bdb1b969859c63b9 name=exec_job)>

 

You can also specify a start date and an end date.

sched.add_job(exec_job, 'interval', hours=2, start_date='2022-04-01 09:30:00', end_date='2023-06-20 11:00:00')
 
<Job (id=5f9cfb57c1d74e4cb71fdff0ccb66d61 name=exec_job)>

 

Lastly, there is a jitter option which adds an element of randomness ([-120,+120] seconds) to the precise execution time. The documentation explains that this can be useful if you are using multiple servers and wish to avoid them running a job at the exact same moment, or to prevent multiple jobs with similar options from running concurrently.

 

# Run `exec_job` every hour with an extra-delay picked randomly in a [-120,+120] seconds window.
sched.add_job(exec_job, 'interval', hours=1, jitter=120)
<Job (id=3160c90f563f48ed8213ba24488a6604 name=exec_job)>