Writing Cronjobs in Rust

Cover image

In this article we’re going to talk about how you can write your own cron jobs as a web service using Shuttle!

Cron jobs (or “scheduled tasks”) are useful for many things. They allow you to automatically do things like:

  • Automate data back-ups.
  • Adding daily reminders (for example to customers who are signed up to a service you own but haven’t started using it yet).
  • Creating/writing reports.

If you’re interested in the final code and want to deploy quickly, you can find the repo here. You can deploy it with two simple steps:

  1. Run cargo shuttle init --from shuttle-hq/shuttle-examples --subfolder shuttle-cron and follow the prompt (requires cargo-shuttle installed)

  2. Run cargo shuttle deploy. That’s it!

Getting Started

If you don’t have cargo-shuttle installed, you can install it by running the command below (requires Rust + Cargo installed):

cargo install cargo-shuttle

To get started, you’ll want to initialise a project with cargo-shuttle:

cargo shuttle init 

You’ll want to make sure to follow the prompt and choose None when the framework options come up. This will spawn a custom service that you can use with Shuttle. For this article, we will be using the project name shuttle-example-cron.

Once done, we’ll want to install the following dependencies:

  • serde - Allows us to de/serialize tasks
  • chrono - Allows us to use timestamps
  • apalis - The task queue framework we’ll be using
  • sqlx - Allows you to interact with your provisioned database
  • shuttle-shared-db - Provisions database for you from Shuttle (locally via Docker, via the runtime in deployment)
  • tower - Used with apalis so that the cron job can be hosted.

We can install all of these with the following shell snippet:

cargo add apalis -F cron,postgres,extensions,retry
cargo add chrono -F serde,clock
cargo add serde -F derive
cargo add shuttle-shared-db -F postgres
cargo add sqlx -F runtime-tokio-native-tls,postgres
cargo add tower 

Now that everything is installed, it’s time to get coding!

Writing our first cron job

Adding a database

Before we do anything else, let’s provision a database using the Shuttle runtime. You’ll want to add the shuttle-shared-db annotation to your main function like so and set up our Postgres connection pool:

use sqlx::{PgPool, postgres::PgPoolOptions};

pub struct MyService {
    db: PgPool,
}

#[shuttle_runtime::main]
async fn shuttle_main(
    #[shuttle_shared_db::Postgres] conn_string: String,
    ) -> Result<MyService, shuttle_runtime::Error> {
    let db = PgPoolOptions::new()
        .min_connections(5)
        .max_connections(5)
        .connect(&conn_string)
        .await
        .unwrap();

    Ok(MyService { db })
}

As you can see, pretty simple! You would otherwise need to run a Docker command to spin this up. In production, you would also need to manually instantiate and manage your Postgres instance or rely on an IaC (infrastructure as code) tool like Terraform.

Then we need to implement a few new things:

  • A struct that implements apalis::prelude::Job
  • A struct that can be added as a shared data extension to our cron service with a function that will do the actual work we want
  • A function that gets called by apalis when work is required to be done (according to the cronjob schedule)

Let’s start with implementing our job struct. Note that when using apalis jobs in a cronjob context, they must also implement From<chrono::DateTime<chrono::Utc>> (essentially this means they can’t hold any other fields):

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use apalis::prelude::Job;

#[derive(Default, Debug, Clone, Serialize, Deserialize)]
struct Reminder(DateTime<Utc>);

impl From<DateTime<Utc>> for Reminder {
   fn from(t: DateTime<Utc>) -> Self {
       Reminder(t)
   }
}

// set up an identifier for apalis
impl Job for Reminder {
    const NAME: &'static str = "reminder::DailyReminder";
}

Now we want to implement the function that will do the actual work when called by apalis. We can set this up so that it just says “Hello world from say_hello_world()!”:

use apalis::prelude::JobContext;

async fn say_hello_world(job: Reminder, ctx: JobContext) {
    println!("Hello world from `say_hello_world()`!");
}

However, if you want to add extra variables (for example, a database connection pool) you need to add an extension layer in your service to be able to access them. Let’s have a look at what this struct might look like:

#[derive(Clone)]
struct CronjobData { 
    message: String 
}

impl CronjobData {
    fn execute(&self, item: Reminder) {
        println!("{} from CronjobData::execute()!", &self.message);
    }
}

You can then feed this struct back into the say_hello_world function, which uses the execute() function:

async fn say_hello_world(job: Reminder, ctx: JobContext) {
    println!("Hello world from send_reminder()!");
    // this lets you use variables stored in the CronjobData struct
    let svc = ctx.data_opt::<CronjobData>().unwrap();
    // this executes CronjobData::execute()
    svc.execute(job);
}

The variables for the CronjobData struct come from instantiating the struct and then adding it as a shared data extension to the Tower service that we use. This allows apalis to be able to use the data by using .data_opt() from the JobContext struct.

Hooking it all up

Now we’re going to hook the structs we made previously into our main app!

To start with, we need to set up the PostgresStorage type so that we can use Postgres for durable job queues. Without durable job queues, our jobs would disappear if our web service has any outages! We can convert this directly from the PgPool type stored in our main struct and run the migrations for it like so:

#[shuttle_runtime::async_trait]
impl shuttle_runtime::Service for MyService {
    async fn bind(
        self, 
        _addr: std::net::SocketAddr
    ) -> Result<(), shuttle_runtime::Error> {
        // set up Postgres-backed storage
        let storage = PostgresStorage::new(self.db);
        // set up storage
        storage.setup().await.expect("Unable to run migrations :(");
    Ok(())
   }
}

The shuttle_runtime::Service trait is what allows your custom service to be packaged into a project that the Shuttle runtime can use. It also provides a HTTP address that you can optionally bind a HTTP-bound service to (for example, a web server).

Now we can create a tower service that holds our job that we want to run. We can use the DefaultRetryPolicy provided to us by apalis to be able to add automatic re-tries, as well as layering the shared data as an Extension:

use tower::ServiceBuilder;
use apalis::layers::{DefaultRetryLayer, Extension, RetryLayer};
use apalis::prelude::job_fn;

// .. your previous code

let cron_service_ext = CronjobData {
    message: "Hello world".to_string(),
};

// create a servicebuilder for the cronjob
let service = ServiceBuilder::new()
    .layer(RetryLayer::new(DefaultRetryPolicy))
    .layer(Extension(cron_service_ext))
    .service(job_fn(say_hello_world)); 

Now that we’ve built the service, we need to build the worker and then finally create an apalis::Monitor process that will carry the work out:

use apalis::prelude::timer::TokioTimer;
use apalis::cron::{Schedule, CronStream};
use apalis::prelude::{WorkerBuilder, Monitor};

// .. your previous code
let schedule = Schedule::from_str("* * * * * *").expect("Couldn't start the scheduler!");

// create a worker that uses the service created from the cronjob
let worker = WorkerBuilder::new("morning-cereal")
    .with_storage(storage.clone())
    .stream(
        CronStream::new(schedule)
        .timer(TokioTimer)
        .to_stream()
     )
    .build(service);

// start your worker up
Monitor::new().register(worker).run().await.expect("Unable to start worker");


Here is the full code for our main application:

#[shuttle_runtime::async_trait]
impl shuttle_runtime::Service for MyService {
    async fn bind(
        self, 
        _addr: std::net::SocketAddr
    ) -> Result<(), shuttle_runtime::Error> {
        let storage = PostgresStorage::new(self.db.clone());
        // set up storage
        storage.setup().await.expect("Unable to run migrations :(");
    
        let cron_service_ext = CronjobData { 
            message: "Hello world".to_string() 
        };
    
        // create a servicebuilder for the cronjob
        let service = ServiceBuilder::new()
            .layer(RetryLayer::new(DefaultRetryPolicy))
            .layer(Extension(cron_service_ext))
            .service(job_fn(say_hello_world));
 
        let schedule = Schedule::from_str("* * * * * *")
            .expect("Couldn't start the scheduler!");
    
        // create a worker that uses the service created from the cronjob
        let worker = WorkerBuilder::new("morning-cereal")
            .with_storage(storage.clone())
            .stream(CronStream::new(schedule).timer(TokioTimer).to_stream())
            .build(service);
    
        // start your worker up
        Monitor::new()
            .register(worker)
            .run()
            .await
            .expect("Unable to start worker");
    
            Ok(())
    }
}

Deploying

Now that we’ve written everything, all you need to do is cargo shuttle deploy (with the --allow-dirty flag if working on a dirty Git branch). When the deployment is finished, you’ll get the deployment information as well as the deployment database URL string.

Extending

Now that we’ve finished our new cron service, here’s a few ideas you can use to extend it:

  • Add more cron jobs! You can also abstract creating the scheduler, worker, etc into a new struct (or enum) so that you don’t need to manually instantiate everything as you may want to use the same defaults over many if not all of your jobs.
  • Augment the service to use Axum (or another web framework!) instead of purely tower. You can find out more about this from this GitHub example.

Finishing Up

Thanks for reading! I hope this cron job template helps you.

Further reading:

  • Find out more about the Axum framework here.
  • Find out more about using a database with SQLx here.
This blog post is powered by shuttle - The Rust-native, open source, cloud development platform. If you have any questions, or want to provide feedback, join our Discord server!
Share article
rocket

Build the Future of Backend Development with us

Join the movement and help revolutionize the world of backend development. Together, we can create the future!