Oban.Pro.Plugins.DynamicCron (Oban Pro v0.13.1)

The DynamicCron plugin enhances Oban's cron scheduler by making it configurable at runtime, globally, across your entire cluster. DynamicCron supports adding, updating, deleting, and pausing cron entries at boot time or runtime. It is an ideal solution for applications that must dynamically start and manage scheduled tasks at runtime.

installation

Installation

Before running the DynamicCron plugin you must run a migration to add the oban_cron table to your database.

mix ecto.gen.migration add_oban_cron

Open the generated migration in your editor and add a call to the migration's change/0 function:

defmodule MyApp.Repo.Migrations.AddObanCron do
  use Ecto.Migration

  defdelegate change, to: Oban.Pro.Migrations.DynamicCron
end

As with the base Oban tables you can optionally provide a prefix to "namespace" the table within your database. Here we specify a "private" prefix:

defmodule MyApp.Repo.Migrations.AddObanCron do
  use Ecto.Migration

  def change, do: Oban.Pro.Migrations.DynamicCron.change(prefix: "private")
end

Run the migration to create the table:

mix ecto.migrate

Now you can use the DynamicCron plugin and start scheduling periodic jobs!

using-and-configuring

Using and Configuring

To begin using DynamicCron, add the module to your list of Oban plugins in config.exs:

config :my_app, Oban,
  plugins: [Oban.Pro.Plugins.DynamicCron]
  ...

By itself, without providing a crontab or dynamically inserting cron entries, the plugin doesn't have anything to schedule. To get scheduling started, provide a list of {cron, worker} or {cron, worker, options} tuples to the plugin. The syntax is identical to Oban's built in :crontab option, which means you can copy an existing standard :crontab list into the plugin's :crontab.

plugins: [{
  Oban.Pro.Plugins.DynamicCron,
  timezone: "America/Chicago",
  crontab: [
    {"* * * * *", MyApp.MinuteJob},
    {"0 * * * *", MyApp.HourlyJob, queue: :scheduled},
    {"0 0 * * *", MyApp.DailyJob, max_attempts: 1},
    {"0 12 * * MON", MyApp.MondayWorker, tags: ["scheduled"]},
    {"@daily", MyApp.AnotherDailyWorker}
  ]
}]

Now, when dynamic cron initializes, it will persist those cron entries to the database and start scheduling them according to their CRON expression. The plugin's crontab format is nearly identical to Oban's standard crontab, with a few important enhancements we'll look at soon.

Each of the crontab entries are persisted to the database and referenced globally, by all the other connected Oban instances. That allows us to insert, update, or delete cron entries at any time. In fact, changing the schedule or options of an entry in the crontab provided to the plugin will automatically update the persisted entry. To demonstrate, let's modify the MinuteJob we specified so that it runs every other minute in the :scheduled queue:

crontab: [
  {"*/2 * * * *", MyApp.MinuteJob, queue: :scheduled},
  ...
]

Now it isn't really a "minute job" any more, and the name is no longer suitable. However, we didn't provide a name for the entry and it's using the module name instead. To provide more flexibility we can add a :name overrride, then we can update the worker's name as well:

crontab: [
  {"*/2 * * * *", MyApp.FrequentJob, name: "frequent", queue: :scheduled},
  ...
]

All entries are referenced by name, which defaults to the worker's name and must be unique. You may define the same worker multiple times as long as you provide a name override:

crontab: [
  {"*/3 * * * *", MyApp.BasicJob, name: "client-1", args: %{client_id: 1}},
  {"*/3 * * * *", MyApp.BasicJob, name: "client-2", args: %{client_id: 2}},
  ...
]

To temporarily disable scheduling jobs you can set the paused flag:

crontab: [
  {"* * * * *", MyApp.BasicJob, paused: true},
  ...
]

To resume the job you must supply paused: false (or use update/2 to resume it manually), simply removing the paused option will have no effect.

crontab: [
  {"* * * * *", MyApp.BasicJob, paused: false},
  ...
]

It is also possible to delete a persisted entry during initialization by passing the :delete option:

crontab: [
  {"* * * * *", MyApp.MinuteJob, delete: true},
  ...
]

One or more entries can be deleted this way. Deleting entries is idempotent, nothing will happen if no matching entry can be found.

In the next section we'll look at how to list, insert, update and delete jobs dynamically at runtime.

overriding-the-timezone

Overriding the Timezone

Without any configuration the default timezone is Etc/UTC. You can override that for all cron entries by passing a timezone option to the plugin:

plugins: [{
  Oban.Pro.Plugins.DynamicCron,
  timezone: "America/Chicago",
  # ...

You can also override the timezone for individual entries by passing it as an option to the crontab list or to DynamicCron.insert/1:

DynamicCron.insert([
  {"0 0 * * *", MyApp.Pinger, name: "oslo", timezone: "Europe/Oslo"},
  {"0 0 * * *", MyApp.Pinger, name: "chicago", timezone: "America/Chicago"},
  {"0 0 * * *", MyApp.Pinger, name: "zagreb", timezone: "Europe/Zagreb"}
])

runtime-updates

Runtime Updates

Dynamic cron entries are persisted to the database, making it easy to manipulate them through typical CRUD operations. The DynamicCron plugin provides convenience functions to simplify working those operations.

The insert/1 function takes a list of one or more tuples with the same {expression, worker} or {expression, worker, options} format as the plugin's crontab option:

DynamicCron.insert([
  {"0 0 * * *", MyApp.GenericWorker},
  {"* * * * *", MyApp.ClientWorker, name: "client-1", args: %{client_id: 1}},
  {"* * * * *", MyApp.ClientWorker, name: "client-2", args: %{client_id: 2}},
  {"* * * * *", MyApp.ClientWorker, name: "client-3", args: %{client_id: 3}}
])

Be aware that insert/1 acts like an "upsert", making it possible to modify existing entries if the worker or name matches. Still, it is better to use update/2 to make targeted updates.

isolation-and-namespacing

Isolation and Namespacing

All DynamicCron functions have an alternate clause that accepts an Oban instance name as the first argument. This is in line with base Oban functions such as Oban.insert/2, which allow you to seamlessly work with multiple Oban instances and across multiple database prefixes. For example, you can use all/1 to list all cron entries for the instance named ObanPrivate:

entries = DynamicCron.all(ObanPrivate)

Likewise, to insert a new entry using the configuration associated with the ObanPrivate instance:

{:ok, _} = DynamicCron.insert(ObanPrivate, [{"* * * * *", PrivateWorker}])

instrumenting-with-telemetry

Instrumenting with Telemetry

The DynamicCron plugin adds the following metadata to the [:oban, :plugin, :stop] event:

  • :jobs - a list of jobs that were inserted into the database

See the docs on Plugin Events for details.

Link to this section Summary

Functions

Used to retrieve all persisted cron entries.

Returns a specification to start this module under a supervisor.

Delete individual entries, by worker or name.

Insert cron entries into the database to start scheduling new jobs.

Update a single cron entry, as identified by worker or name.

Link to this section Types

@type cron_expr() :: String.t()
Link to this type

cron_input()

@type cron_input() :: {cron_expr(), module()} | {cron_expr(), module(), [cron_opt()]}
@type cron_name() :: String.t() | atom()
@type cron_opt() ::
  {:args, Oban.Job.args()}
  | {:expression, cron_expr()}
  | {:max_attempts, pos_integer()}
  | {:paused, boolean()}
  | {:priority, 0..3}
  | {:name, cron_name()}
  | {:queue, atom() | String.t()}
  | {:tags, Oban.Job.tags()}
  | {:timezone, String.t()}

Link to this section Functions

Link to this function

all(oban_name \\ Oban)

@spec all(term()) :: [Ecto.Schema.t()]

Used to retrieve all persisted cron entries.

The all/0 function is provided as a convenience to inspect persisted entries.

examples

Examples

Return a list of cron schemas with raw attributes:

entries = DynamicCron.all()
Link to this function

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

delete(oban_name \\ Oban, name)

@spec delete(term(), cron_name()) ::
  {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()}

Delete individual entries, by worker or name.

Use delete/1 to remove entries at runtime, rather than hard-coding the :delete flag into the crontab list at compile time.

examples

Examples

With the worker as the entry name:

{:ok, _} = DynamicCron.delete(Worker)

With a custom name:

{:ok, _} = DynamicCron.delete("cron-1")
Link to this function

insert(oban_name \\ Oban, crontab)

@spec insert(term(), [cron_input()]) ::
  {:ok, [Ecto.Schema.t()]} | {:error, Ecto.Changeset.t()}

Insert cron entries into the database to start scheduling new jobs.

Be aware that insert/1 acts like an "upsert", making it possible to modify existing entries if the worker or name matches. Still, it is better to use update/2 to make targeted updates.

examples

Examples

Insert a list of tuples with the same {expression, worker} or {expression, worker, options} format as the plugin's crontab option.

DynamicCron.insert([
  {"0 0 * * *", MyApp.GenericWorker},
  {"* * * * *", MyApp.ClientWorker, name: "client-1", args: %{client_id: 1}},
  {"* * * * *", MyApp.ClientWorker, name: "client-2", args: %{client_id: 2}},
  {"* * * * *", MyApp.ClientWorker, name: "client-3", args: %{client_id: 3}}
])
Link to this function

update(oban_name \\ Oban, name, opts)

@spec update(term(), cron_name(), [cron_opt()]) ::
  {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()}

Update a single cron entry, as identified by worker or name.

Any option available when specifying an entry in the crontab list or when calling insert/2 can be updated—that includes the cron expression and the worker.

examples

Examples

The following call demonstrates updating every possible option:

{:ok, _} =
  DynamicCron.update(
    "cron-1",
    expression: "1 * * * *",
    max_attempts: 10,
    name: "special-cron",
    paused: false,
    priority: 0,
    queue: "dedicated",
    tags: ["client", "scheduled"],
    timezone: "Europe/Amsterdam",
    worker: Other.Worker,
  )

Naturally, individual options may be updated instead. For example, set paused: true to pause an entry:

{:ok, _} = DynamicCron.update(MyApp.ClientWorker, paused: true)

Since update/2 operates on a single entry at a time, it is possible to rename an entry without doing a delete/insert dance:

{:ok, _} = DynamicCron.update(MyApp.ClientWorker, name: "client-worker")

Or, update an entry with a custom entry name already set:

{:ok, _} = DynamicCron.update("cron-1", name: "special-cron")