Oban.Pro.Worker behaviour (Oban Pro v0.13.2)

The Oban.Pro.Worker is a replacement for Oban.Worker with expanded capabilities such as encryption, enforced structure, output recording, and execution hooks.

In addition, because Batch, Chunk, and Workflow workers are based on the Pro worker, you can use all of the advanced options* there as well (The one exception is that recording doesn't function with the Chunk worker).

usage

Usage

Using Oban.Pro.Worker is identical to using Oban.Worker, with a few additional options. All of the basic options such as queue, priority, and unique are still available along with more advanced options.

To create a basic Pro worker point use at Oban.Pro.Worker and define a process/1 callback:

def MyApp.Worker do
  use Oban.Pro.Worker

  @impl Oban.Pro.Worker
  def process(%Job{} = job) do
    # Do stuff with the job
  end
end

If you have existing workers that you'd like to convert you only need to change the use definition and replace perform/1 with process/1.

Without any of the advanced Pro features there isn't any difference between the basic and pro workers.

structured-jobs

Structured Jobs

Structured workers help you catch invalid data within your jobs by validating args on insert and casting args before execution. They also automatically generate a struct for compile-time checks and friendly dot access.

Defining a Structured Worker

A structured worker's options determine which keys are allowed, required, and their expected types. Another notable benefit is that the args passed to process/1 are converted into a struct. Here's an example worker that demonstrates all of the possible options:

defmodule MyApp.StructuredWorker do
  use Oban.Pro.Worker, structured: [
    id: {:*, :id},
    name: {:*, :string},
    mode: ~w(enabled disabled paused)a,
    data: [office_id: {:*, Ecto.UUID}, notes: :boolean]
  ]

  @impl Oban.Pro.Worker
  def process(%Job{args: %__MODULE__{id: id, name: name, mode: mode, data: data}}) do
    %{office_id: office_id, notes: notes} = data

    # Use the matched, cast values
  end
end

The example's structure defines four top level keys, :id, :name, :mode, and :data. Of those, :id and :name are marked required with a {:*, type} tuple. The :mode is treated as an Enum, where it is validated as a member of the list and cast to an atom. Finally, the :data field declares a nested map with its own type coercion and validation, including a custom Ecto.UUID type.

Job args are validated on new/1 and errors bubble up to prevent insertion:

StructuredWorker.new(%{id: "not-an-id", mode: "unknown"}).valid?
# => false (invalid id, invalid mode, missing name)

StructuredWorker.new(%{id: "123", mode: "enabled"}).valid?
# => false (missing name)

StructuredWorker.new(%{id: "123", name: "NewBiz", mode: "enabled"}).valid?
# => true

This shows how args, stored as JSON, are cast before passing to process/1:

# {"id":123,"name":"NewBiz","mode":"enabled","data":{"parent_id":456}}

%MyApp.StructuredWorker{
  id: 123,
  name: "NewBiz",
  mode: :enabled,
  data: %{parent_id:456}
}

Structured Types and Casting

Type casting and validation are handled by changesets. Any types supported in Ecto schemas are allowed, e.g. :id, :integer, :string, :float, Ecto.UUID

See the Ecto documentation for a complete list of Ecto types and their Elixir counterparts.

Structured Extensions

Structured workers support some custom extensions beyond Ecto's standard type casting.

  • required — specify the value with a {:*, type} tuple to indicate that the field is required.

  • enum — a list of atoms, e.g. ~w(foo bar baz)a, declares that the field is an enum, which both validates that values are included in the list and casts them to an atom.

  • nested — a keyword, e.g. [sub_id: :id, total: :float, declares a nested map and supports all the same casting and validation as top level options.

Defining Typespecs for Structured Workers

Typespecs aren't generated automatically. If desired, you must to define a sctuctured worker's type manually:

defmodule MyApp.StructuredWorker do
  use Oban.Pro.Worker, structured: [id: :integer, active: :boolean]

  @type t :: %__MODULE__{id: integer(), active: boolean()}

  ...

recorded-jobs

Recorded Jobs

Sometimes the output of a job is just as important as any side effects. When that's the case, you can use the recorded option to stash a job's output back into the job itself. Results are compressed and safely encoded for retrieval later, either manually, in a batch callback, or a in downstream workflow job.

Defining a Recorded Worker

defmodule MyApp.RecordedWorker do
  use Oban.Pro.Worker, recorded: true

  @impl true
  def process(%Job{args: args}) do
    # Do your typical work here.
  end
end

If your process function returns an {:ok, value} tuple, it is recorded. Any other value, i.e. an plain :ok, error, or snooze, is ignored.

The example above uses recorded: true to opt into recording with the defaults. That means an output limit of 32kb after compression and encoding—anything larger than the configured limit will return an error tuple. If you expect larger results (and you want them stored in the database) you can override the limit. For example, to set the limit to 64kb instead:

use Oban.Pro.Worker, recorded: [limit: 64_000]

retrieving-results

Retrieving Results

The fetch_recorded/1 function is your ticket to extracting recorded results. If a job has ran and recorded a value, it will return an {:ok, result} tuple:

job = MyApp.Repo.get(Oban.Job, job_id)

case MyApp.RecordedWorker.fetch_recorded(job) do
  {:ok, result} ->
    # Use the result

  {:error, :missing} ->
    # Nothing recorded yet
end

encrypted-jobs

Encrypted Jobs

Some applications have strong regulations around the storage of personal information. For example, medical records, financial details, social security numbers, or other data that should never leak. The encrypted option lets you store all job data at rest with encryption so sensitive data can't be seen.

Defining an Encrypted Worker

Encryption is handled transparently as jobs are inserted and executed. All you need to do is flag the worker as encrypted and configure it to fetch a secret key:

defmodule MyApp.SensitiveWorker do
  use Oban.Pro.Worker, encrypted: [key: {module, fun, args}]

  @impl true
  def process(%Job{args: args}) do
    # Args are decrypted, use them as you normally would
  end
end

Now job args are encrypted before insertion into the database and decrypted when the job runs.

Generating Encryption Keys

Encryption requires a 32 byte, Base 64 encoded key. You can generate one with the :crypto and Base modules:

key = 32 |> :crypto.strong_rand_bytes() |> Base.encode64()

The result will look something like this "w7xGJClzEh1pbWuq6zsZfKfwdINu2VIkgCe3IO0hpsA=".

While it's possible to use the generated key in your worker directly, that defeats the purpose of encrypting sensitive data because anybody with access to the codebase can read the encryption key. That's why it is highly recommended that you use an MFA to retrieve the key dynamically at runtime. For example, here's how you could pull the key from the Application environment:

use Oban.Pro.Worker, encrypted: [key: {Application, :fetch_key!, [:enc_key]}]

Encryption Implementation Details

  • Erlang's crypto module is used with the aes_256_ctr cipher for encryption.

  • Encoding and decoding stacktraces are pruned to prevent leaking the private key or initialization vector.

  • Only args are encrypted, meta is kept as plaintext. You can use that to your advantage for uniqueness, but be careful not to put anything sensitive in meta.

  • Error messages and stacktraces aren't encrypted and are stored as plaintext. Be careful not to expose sensitive data when raising errors.

  • Args are encrypted at rest as well as in Oban Web. You won't be able to view or search encrypted args in the Web dashboard.

  • Uniqueness works for encrypted jobs, but not for arguments because the same args are encrypted differently every time. Favor meta over args to enforce uniqueness for encrypted jobs.

worker-hooks

Worker Hooks

Worker hooks are called after a job finishes executing. They can be defined as callback functions on the worker, or in a separate module for reuse across workers.

Hooks are called synchronously, from within the job's process with safety applied. Any exceptions or crashes are caught and logged, they won't cause the job to fail or the queue to crash.

Hooks do not modify the job or execution results. Think of them as a convenient alternative to globally attached telemetry handlers. They are purely for side-effects such as cleanup, logging, recording metrics, broadcasting notifications, updating other records, error notifications, etc.

defining-hooks

Defining Hooks

There are three mechanisms for defining and attaching an after_process/2 hook:

  1. Implicitly—hooks are defined directly on the worker and they only run for that worker
  2. Explicitly—hooks are listed when defining a worker and they run anywhere they are listed
  3. Globally—hooks are executed for all Pro workers

It's possible to combine each type of hook on a single worker. When multiple hooks are stacked they're executed in the order: implicit, explicit, and then global.

An after_process/2 hook is called with the job and an execution state corresponding to the result from process/1:

  • complete—when process/1 returns :ok or {:ok, result}
  • cancel—when process/1 returns {:cancel, reason}
  • discard—when a job errors and exhausts retries, or returns {:discard, reason}
  • error—when a job crashes, raises an exception, or returns {:error, value}
  • snooze—when a job returns {:snooze, seconds}

First, here's how to define a single implicit local hook on the worker using after_process/2:

defmodule MyApp.HookWorker do
  use Oban.Pro.Worker

  @impl Oban.Pro.Worker
  def process(_job) do
    # ...
  end

  @impl Oban.Pro.Worker
  def after_process(state, %Job{} = job) do
    MyApp.Notifier.broadcast("oban-jobs", {state, %{id: job.id}})
  end
end

Any module that exports after_process/2 can be used as a hook. For example, here we'll define a shared error notification hook:

defmodule MyApp.ErrorHook do
  def after_process(state, job) when state in [:discard, :error] do
    error = job.unsaved_error
    extra = Map.take(job, [:attempt, :id, :args, :max_attempts, :meta, :queue, :worker])

    Sentry.capture_exception(error.reason, stacktrace: error.stacktrace, extra: extra)
  end

  def after_process(_state, _job), do: :ok
end

defmodule MyApp.HookWorker do
  use Oban.Pro.Worker, hooks: [MyApp.ErrorHook]

  @impl Oban.Pro.Worker
  def process(_job) do
    # ...
  end
end

The same module can be attached globally, for all Oban.Pro.Worker modules, using attach_hook/1:

:ok = Oban.Pro.Worker.attach_hook(MyApp.ErrorHook)

Attaching hooks in your application's start/2 function is an easy way to ensure hooks are registered before your application starts processing jobs.

def start(_type, _args) do
  :ok = Oban.Pro.Worker.attach_hook(MyApp.ErrorHook)

  children = [
    ...

Link to this section Summary

Types

Options to enable and configure encrypted mode.

All possible hook states.

Options to enable and configure recorded mode.

Options to enable and configure structured mode.

Callbacks

Called after a job finishes processing regardless of status (complete, failure, etc).

Extract the results of a previously executed job.

Called when executing a job.

Functions

Register a worker hook to be ran after any Pro worker executes.

Unregister a worker hook.

Link to this section Types

@type encrypted() :: [{:key, mfa()}]

Options to enable and configure encrypted mode.

Link to this type

hook_state()

@type hook_state() :: :cancel | :complete | :discard | :error | :snooze

All possible hook states.

@type recorded() :: true | [to: atom(), limit: pos_integer(), safe_decode: boolean()]

Options to enable and configure recorded mode.

Link to this type

structured()

@type structured() :: [
  changeset: {module(), atom()},
  fields: keyword() | map(),
  keys: [atom()],
  required: [atom()]
]

Options to enable and configure structured mode.

Link to this section Callbacks

Link to this callback

after_process(hook_state, job)

(optional)
@callback after_process(hook_state(), job :: Oban.Job.t()) :: :ok

Called after a job finishes processing regardless of status (complete, failure, etc).

See the shared "Worker Hooks" section for more details.

Link to this callback

fetch_recorded(job)

(optional)
@callback fetch_recorded(job :: Oban.Job.t()) :: {:ok, term()} | {:error, :missing}

Extract the results of a previously executed job.

If a job has ran and recorded a value, it will return an {:ok, result} tuple. Otherwise, you'll get {:error, :missing}.

@callback process(job :: Oban.Job.t() | [Oban.Job.t()]) :: Oban.Worker.result()

Called when executing a job.

The process/1 callback behaves identically to Oban.Worker.perform/1, except that it may have pre-processing and post-processing applied.

Link to this section Functions

Link to this function

attach_hook(module)

(since 0.12.0)
@spec attach_hook(module()) :: :ok | {:error, term()}

Register a worker hook to be ran after any Pro worker executes.

The module must define a function that matches the hook. For example, a module that handles an :on_complete hook must define an on_complete/1 function.

example

Example

Attach a hook handler globally:

defmodule MyApp.Hook do
  def after_process(_state, %Oban.Job{} = job) do
    # Do something with the job

    :ok
  end
end

:ok = Oban.Pro.Worker.attach_hook(MyApp.Hook)
Link to this function

detach_hook(module)

(since 0.12.0)
@spec detach_hook(module()) :: :ok

Unregister a worker hook.

example

Example

Detach a previously registered global hook:

:ok = Oban.Pro.Worker.detach_hook(MyApp.Hook)