Image from pythonmana.com
Scheduling tasks is a necessity in system administration and application development. There are a variety of techniques and libraries for scheduling tasks and depending on the use case, a particular technique or library may be preferred. Advanced Python Scheduler (APScheduler) offers thread-safe schedulers with configurable triggers for Cron-like scheduling, delayed scheduling of single-run jobs, and interval-based scheduling. While the descriptor advanced might carry the connotation hard to use, APScheduler's syntax is succinct and straightforward. Its thread-safety, configurability, simple syntax, and documentation, make APScheduler a compelling library for scheduling tasks. Perhaps it is best described in the developer's own words:
Advanced Python Scheduler (APScheduler) is a light but powerful in-process task scheduler that lets you schedule jobs (functions or any python callables) to be executed at times of your choosing.
This can be a far better alternative to externally run cron scripts for long-running applications (e.g. web applications), as it is platform neutral and can directly access your application’s variables and functions.
The development of APScheduler was heavily influenced by the Quartz task scheduler written in Java. APScheduler provides most of the major features that Quartz does, but it also provides features not present in Quartz (such as multiple job stores). [https://pypi.org/project/APScheduler/2.1.2/]
APScheduler is not included in the standard Python installation. It is maintained by developer Alex Grönholm (https://github.com/agronholm/apscheduler). To install APScheduler and its dependencies, first install pip
, if you have not yet so. Then add pip
to your system's Path directory. Finally, run the following command in your system's command line interpreter:
pip install apscheduler
APScheduler offers four main components: triggers, job stores, executors, and schedulers. This lesson will introduce the two most fundamental components: schedulers and triggers.
The scheduler is the component used to add or remove a job. APScheduler has several predefined scheduling modules. These include:
BlockingScheduler
: As the name indicates, this scheduler is blocking, meaning it prevents the execution of further code by running in the foreground. This could be used to provide a standalone scheduler (e.g., to build a daemon). BackgroundScheduler
: This scheduler runs inside your existing application on a separate thread. As such, it is considered to run in the background of an application. AsyncIOScheduler
: This scheduler will run jobs in the asyncio event loop's thread pool. This lesson will focus on using AsyncIOScheduler
with a few different types of triggers. Among the included schedulers APScheduler also provides GeventScheduler
, TornadoScheduler
, TwistedScheduler
, and QtScheduler
which are designed to work with specific libraries.
Triggers contain the logic to calculate the date/time for the scheduler to execute a job. APScheduler provides three trigger types:
date: use when you want to run the job just once at a certain point of time
interval: use when you want to run the job at fixed intervals of time
cron: use when you want to run the job periodically at certain time(s) of day [https://apscheduler.readthedocs.io/en/3.x/userguide.html]
One thing we may want to do is schedule a job to run at a particular time on a given date. This would be useful if we were writing an application (such as a Django or Flask web app) which provides a feature to schedule reminders or calendar events.
First, we will import AsyncIOScheduler
, the DateTrigger
class, and the datetime
and timedelta
modules (from the datetime
library).
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.date import DateTrigger
from datetime import datetime as dt, timedelta
First, let's see a simplistic example. We will schedule a function to run at a certain time on a particular date. We will use DateTrigger to calculate for the scheduler when our function should run.
sched = AsyncIOScheduler() # Initialize a new async scheduler
sched.start() # Start the scheduler
# Create a function to execute at the scheduled time
def exec_task():
print("Task executed!")
timestamp = dt.now() + timedelta(seconds=2) # 2 seconds from now
dt_trigger = DateTrigger(run_date=timestamp) # Create a DateTrigger
sched.add_job(func=exec_task, trigger=dt_trigger)
When we create a job, AsyncIOScheduler returns a reference to the newly created Job object (apscheduler.job.Job
). If we decide we want to cancel our job before it is scheduled to run, we must access the Job
's unique id
so that we can pass it to remove_job
.
timestamp = dt.now() + timedelta(seconds=2) # 2 seconds from now
dt_trigger = DateTrigger(run_date=timestamp) # Create a DateTrigger
job = sched.add_job(func=exec_task, trigger=dt_trigger)
print(f"Job id={job.id}")
# Delete the job:
sched.remove_job(job.id)
If we need to accept arguments in our target function, we will have to use a slightly different syntax in the add_job
function.
def exec_task(task_no, n):
print(f"Task {task_no} of {n} executed!")
for i in range(0, 3):
timestamp = dt.now() + timedelta(seconds=1)
dt_trigger = DateTrigger(run_date=timestamp)
# We use an args list that our target function
# will assign to its parameters.
sched.add_job(func=exec_task, trigger=dt_trigger, args=[i, 3])
Task 0 of 3 executed!Task 1 of 3 executed! Task 2 of 3 executed!
Run the above function a few times. Notice that the last task sometimes executes second. That is due to the scheduler's asynchronous nature.
NOTE: If you need to guarantee that jobs execute in a particular order, you will need to use a synchronous scheduler.
Finally, we can shut down our asynchronous scheduler's executor.
# Shut down the scheduler once all of our jobs have executed.
sched.shutdown(wait=True)
Now that we know how to use the async scheduler with a DateTrigger
, we can create something a little bit more useful: a basic design for scheduling reminders.
First, we will create a reminder class that we can instantiate to represent reminders.
class Reminder:
def __init__(self, timestamp, title, description):
# The first two fields will be accessed by our
# scheduler:
self._job_id = None
self.scheduled = False
self.timestamp = timestamp
self.title = title
self.description = description
# Create a human-readable string to show when the reminder is scheduled:
self.reminder_time = dt.strftime(self.timestamp, '%H:%M:%S on %m-%d-%y')
def update(self, timestamp, title=None, description=None):
# If the reminder is alread scheduled, require
# the reminder to be un-scheduled before the reminder
# can be updated.
if self.scheduled:
return "Please remove the scheduled job before updating"
else:
self.timestamp = timestamp
self.reminder_time = dt.strftime(self.timestamp, '%H:%M:%S on %m-%d-%y')
# Set title and description if they were passed in as args:
if title:
self.title = title
if description:
self.description = description
return "Updated"
def __str__(self):
return (
f"Title: {self.title}\n"
f"Scheduled: {self.scheduled}\n"
f"Reminder time: {self.reminder_time}\n"
f"Description: {self.description}\n"
)
Now we will write a custom scheduler class that extends the functionality of AsyncIOScheduler for our particular use case. In essence, we are creating a wrapper class that will start the scheduler when it is initialized. Our subtyped class will allow Reminder
objects to conveniently be passed as arguments that we will then use to schedule or remove jobs with the methods provided by our superclass (APScheduler), namely, add_job
and remove_job
.
class AsyncReminderScheduler(AsyncIOScheduler):
def show_reminder(self, reminder):
reminder.scheduled=False # Set scheduled to false again -- event fired
print("\n" + str(reminder))
def __init__(self):
super().__init__()
self.start()
def add_reminder(self, reminder):
# If the show_reminder function had no args, we would not include the args list
dt_trigger = DateTrigger(run_date=reminder.timestamp)
job = self.add_job(func=self.show_reminder, trigger=dt_trigger, args=[reminder])
# Update the reminder object to show that it is scheduled
# and store the job ID so the reminder can be removed if needed:
reminder._job_id = job.id
reminder.scheduled = True
def remove_reminder(self, reminder):
if reminder.scheduled:
self.remove_job(reminder._job_id)
reminder.scheduled = False
Now that we have all the code we need, let's go ahead and initialize the scheduler.
reminder_sched = AsyncReminderScheduler()
Now that our scheduler is running, let's create a new reminder.
# Create a datetime.datetime object on the current date, 1 day from now:
reminder_timestamp = dt.now() + timedelta(days=1)
py_reminder = Reminder(reminder_timestamp, 'Do Python work!', 'Lab due at midnight 😩')
Let's schedule our new reminder.
reminder_sched.add_reminder(py_reminder)
print(f'Reminder scheduled: {py_reminder.scheduled}')
# Since the reminder is scheduled, we are locked from updating it until
# the reminder has been un-scheduled:
attempt = py_reminder.update(dt.now())
print(attempt)
# Now, let's un-schedule the reminder so we can modify the time and description:
reminder_sched.remove_reminder(py_reminder)
print(f'\nReminder scheduled: {py_reminder.scheduled}')
new_timestamp = dt.now() + timedelta(seconds=4) # Run in 4 seconds...
attempt = py_reminder.update(new_timestamp, description="Lab due tomorrow 😮💨")
print(attempt)
reminder_sched.add_reminder(py_reminder)
print(f'\nReminder scheduled: {py_reminder.scheduled}')
Reminder scheduled: True Please remove the scheduled job before updating Reminder scheduled: False Updated Reminder scheduled: True Title: Do Python work! Scheduled: False Reminder time: 23:12:07 on 04-01-22 Description: Lab due tomorrow 😮💨
Using DateTrigger let us set one-time reminders but many scheduling apps allow us to set repeating reminders. Using the CronTrigger
, we can too! Let's create a WeeklyReminder
class and then we will recreate AsyncReminderScheduler
with a new add_weekly_reminder
function (we will use the same remove_reminder
function to un-schedule either type of reminder.
First, we will define WeeklyReminder. We will accept a string for the day followed along with the hour and minute as ints to specify the time that our reminders will trigger on their respective day of the week.
CronTrigger accepts a few different arguments for the day. The days can be specified as strings in the format:
"mon", "tue", "wed", "thu", "fri", "sat"
They can also be specified as integers from 0-6 with 0 representing Monday and 6 representing Sunday.
Cron is even flexible enough to let us specify day ranges where by linking the two of the string day specifiers with a dash. E.g.,
"mon-fri", "sun-sat", "tue-wed"
By accepting a string argument to the day
field of our WeeklyReminder
class, we can give users the flexibility to schedule a job on a range of days or an individual day.
class WeeklyReminder:
def __init__(self, day, hour, minute, title, description):
day = day.lower() # Clean
if not '-' in day: #If not day range
day = day[0:3] # Ensure 1st 3 letters only in case full day name was passed
self._job_id = None
self.scheduled = False
self.day = day
self.hour = hour
self.minute = minute
self.title = title
self.description = description
self.reminder_time = f"{self.hour}:{self.minute} on {self.day}"
def update(self, day=None, hour=None, minute=None, title=None, description=None):
# If the reminder is alread scheduled, require
# the reminder to be un-scheduled before the reminder
# can be updated.
if self.scheduled:
return "Please remove the scheduled job before updating"
else:
day = day.lower() # Clean
if day:
if not '-' in day: #If not day range
day = day[0:3] # Ensure 1st 3 letters only in case full day name was passed
self.day = day
if hour:
self.hour = hour
if minute:
self.minute = minute
if title:
self.title = title
if description:
self.description = description
self.reminder_time = f"{self.hour}:{self.minute} on {self.day}"
return "Updated"
def __str__(self):
return (
f"Title: {self.title}\n"
f"Scheduled: {self.scheduled}\n"
f"Reminder time: {self.reminder_time}\n"
f"Description: {self.description}\n"
)
We will redefine the AsyncReminderScheduler
by adding the add_weekly_reminder
function that uses a CronTrigger
.
import tzlocal
from apscheduler.triggers.cron import CronTrigger
class AsyncReminderScheduler(AsyncIOScheduler):
def show_reminder(self, reminder):
reminder.scheduled=False # Set scheduled to false again -- event fired
print("\n" + str(reminder))
def __init__(self):
super().__init__(timezone=str(tzlocal.get_localzone()))
self.start()
def add_reminder(self, reminder):
# If the show_reminder function had no args, we would not include the args list
dt_trigger = DateTrigger(run_date=reminder.timestamp)
job = self.add_job(func=self.show_reminder, trigger=dt_trigger, args=[reminder])
# Update the reminder object to show that it is scheduled
# and store the job ID so the reminder can be removed if needed:
reminder._job_id = job.id
reminder.scheduled = True
def add_weekly_reminder(self, reminder):
cr_trigger = CronTrigger(
day_of_week=reminder.day,
hour=reminder.hour,
minute=reminder.minute
)
job = self.add_job(func=self.show_reminder, trigger=cr_trigger, args=[reminder])
reminder.scheduled = True
def remove_reminder(self, reminder):
if reminder.scheduled:
self.remove_job(reminder._job_id)
reminder.scheduled = False
# Initialize a new scheduler
reminder_sched = AsyncReminderScheduler()
# Schedule 1 minute from the current time weekly:
now = dt.now() + timedelta(minutes=1)
hour = now.hour
minute = now.minute
day1 = now.strftime('%A')[0:3] + "-"
tmrw = now + timedelta(days=1)
day2 = tmrw.strftime('%A')[0:3]
day = day1 + day2
print(f"Day arg: {day} {type(day)}")
print(f"Hour: {hour} {type(hour)}")
print(f"Minute arg: {minute} {type(minute)}")
py_wk_reminder = WeeklyReminder(
day, hour, minute, 'Do Python work!',
'🐍 Time to write more Python!!'
)
reminder_sched.add_weekly_reminder(py_wk_reminder)
print("Reminders should print in ~60 seconds")
You may have noticed one other slight change in the redefinition of AsyncReminderScheduler: tzlocal
was imported and the superclass init function changed to super().__init__(timezone=str(tzlocal.get_localzone()))
. That was done to supress a warning that looks something like this:
C:\ProgramData\Anaconda3\lib\site-packages\apscheduler\util.py:436: PytzUsageWarning: The localize method is no longer necessary, as this time zone supports the fold attribute (PEP 495). For more details on migrating to a PEP 495-compliant implementation, see https://pytz-deprecation-shim.readthedocs.io/en/latest/migration.html
return tzinfo.localize(dt)
In short, PEP 495 deprecated pytz
in favor of zoneinfo
. APScheduler is clearly still using pytz
. Using the local timezone from tzlocal
will suppress the ugly warning.
Some jobs need to be performed on intervals, for example, syncing a message inbox every 30 days, or perhaps running a custom script every few hours. IntervalTrigger
fits such use cases.
def exec_job():
print("Executing job")
sched = AsyncIOScheduler()
sched.start()
# Schedule exec_job to be called every two hours
sched.add_job(exec_job, 'interval', hours=2)
You can also specify a start date and an end date.
sched.add_job(exec_job, 'interval', hours=2, start_date='2022-04-01 09:30:00', end_date='2023-06-20 11:00:00')
Lastly, there is a jitter
option which adds an element of randomness ([-120,+120] seconds) to the precise execution time. The documentation explains that this can be useful if you are using multiple servers and wish to avoid them running a job at the exact same moment, or to prevent multiple jobs with similar options from running concurrently.
# Run `exec_job` every hour with an extra-delay picked randomly in a [-120,+120] seconds window.
sched.add_job(exec_job, 'interval', hours=1, jitter=120)
<Job (id=3160c90f563f48ed8213ba24488a6604 name=exec_job)>