Changelog for Oban Pro v0.12


Worker Hooks

Worker hooks are gauranteed to trigger after a job finishes executing. They're defined as callback functions on the worker, or in separate modules for reuse across workers.

Think of hooks as a convenient alternative to globally attached telemetry handlers, but with clearer scoping and much more safety. They are purely for side-effects such as cleanup, logging, broadcasting notifications, updating other records, error notifications, etc.

There are three mechanisms for defining and attaching hooks:

  1. Implicitly—hooks are defined directly on the worker and they only run for that worker
  2. Explicitly—hooks are listed when defining a worker and they run anywhere they are listed
  3. Globally—hooks are executed for all Pro workers (only Pro workers)

It's possible to combine each type of hook on a single worker. When multiple hooks are stacked they're executed in the order: implicit, explicit, and then global.

Here's a simple example of defining an inline hook on a worker to let us know when a job is discarded:

defmodule MyApp.HookWorker do
  use Oban.Pro.Worker

  @impl Oban.Pro.Worker
  def process(_job) do
    # ...

  @impl Oban.Pro.Worker
  def after_process(:discard, %Job{} = job) do
    IO.puts "Job processed with this job will never complete: #{}"

  def after_process(_state, _job), do: :ok

Look at the Worker Hooks section for a complete usage guide and examples. Note, as part of an ongoing documentation enhancement we've replaced the old guide with proper moduledocs.


Global Partitioning

Rate limits aren't the only SmartEngine-powered limiters that can be partitioned! Now, with partitioned global limiting, you can enforce concurrency limits by the worker, args, and sub-keys within a queue, across all nodes (without clustering).

Here's an example of partitioning an exporters queue by the worker to minimize resource contention:

queues: [
  exporters: [
    local_limit: 6,
    global_limit: [allowed: 3, partition: [fields: [:worker]]]

In this example where we're partitioning a scrapers queue by the service_id value so only one service will run across all nodes:

queues: [
  scrapers: [
    local_limit: 10,
    global_limit: [allowed: 1, partition: [fields: [:args], keys: [:service_id]]]

To achieve this style of global limiting before, you'd need one queue for every partition, which is challenging to manage and puts additional strain on the database.

Learn more in the revamped Partitioning section of the SmartEngine guide.


Unique Jobs and Batching for Insert All

A long-standing "gotcha" of bulk inserting jobs with insert_all was the lack of unique support. To apply uniqueness, you had to insert jobs individually within a transaction (which is as slow as it sounds and led to frequent timeouts). Now, unique support is baked into the SmartEngine's insert_all implementation and works without any configuration:

- Repo.transaction(fn ->
-  Enum.each(lots_of_unique_jobs, &Oban.insert/1)
- end)

+ Oban.insert_all(lots_of_unique_jobs)

There's also automatic batching for insert_all. No more manually chunking massive job inserts to avoid PostgreSQL's parameter limits, the engine handles it all for you and enforces uniqueness the whole time:

- Repo.transaction(fn ->
-   lots_of_jobs
-   |> Enum.chunk_every(500)
-   |> Enum.each(&Oban.insert_all/1)
- end)

+ Oban.insert_all(lots_of_jobs, batch_size: 500)

Automatic batching, combined with insert placeholders, optimized locks, and streamlined replace, means bulk inserts with the SmartEngine is faster and more efficient.

Explore a little more (and we do mean little, there isn't much more to it) in the SmartEngine's Unique Bulk Insert section.


v0.12.0 — 2022-07-22



  • [DynamicPruner] Add a before_delete option to operate on jobs before they're deleted, e.g. for logging, reporting, cold-storage, etc.

  • [Batch] Accept batch_callback_queue option to override where callback jobs are enqueued.


Bug Fixes

  • [Workflow] Safely handle deleted upstream dependencies. Workflow jobs with a missing dependency no longer crash, and their status can be controlled with the new ignore_deleted workflow option.

  • [Workflow] Verify that jobs implement the workflow behaviour as they are added to a workflow.

  • [DynamicQueues] Default the local_limit to global_limit to match the behaviour of standard queues.

  • [SmartEngine] Restore accepting refresh_interval to control down-time record refreshing. As a virtual field, it wasn't included in the list of allowed keys.

  • [SmartEngine] Use the new shutdown callback for safer pausing during queue shutdown. During tests, or other situations where a database connection was no longer available, pausing could raise an unexpected error.

  • [SmartEngine] Prevent running jobs that exceed rate limit when there are large gaps between insertion/execution.

For changes prior to v0.12 see the v0.11 docs.