Testing Pro Workers
This guide builds on Testing Workers, which you should read to acquaint yourself with the
basics of unit testing workers. With Oban.Pro.Testing
helpers, testing Pro workers is identical
to testing basic workers, but with a few powerful additions. To see those additions in action,
let's step through how to test each of the Pro specific workers.
Testing Batches
Batches link the execution of multiple jobs and optional callbacks after jobs are processed. To
test a batch exhaustively, you can exercise process/1
with Oban.Pro.Testing.perform_job/2,3
and use Oban.Pro.Testing.perform_callback/2,3
for any callback handlers.
To demonstrate, we'll define a basic batch worker with a handler for when the batch completes successfully:
defmodule MyApp.MyBatch do
use Oban.Pro.Workers.Batch
@impl true
def process(%{args: %{"email" => email}}) do
if MyApp.valid_email?(email) do
MyApp.deliver_welcome(email)
else
{:error, :invalid_email}
end
end
@impl true
def handle_completed(%{args: %{"admin_email" => email}, meta: %{"batch_id" => bid}}) do
MyApp.batch_complete(bid, email)
:ok
end
end
Testing the worker's process/1
function is straight forward with perform_job/2
:
test "delivering welcome emails to valid addresses" do
assert :ok = perform_job(MyBatch, %{email: "real@example.com"})
assert {:error, _} = perform_job(MyBatch, %{email: "fake-email"})
end
Similarly, there is a helper for testing callback functions. The helper produces a batch callback
job and verifies that the callback function is exported. Here we are verifying the
handle_completed/1
callback:
test "notifying admins that a batch completed" do
assert :ok = perform_callback(MyBatch, :completed, %{admin_email: "me@admin.com"})
end
Integration Testing Batches
Oban inserts callback jobs automatically based on the results of each job in the batch; e.g. if
each job is completed
then there will be a handle_completed/1
callback job. The
Oban.Pro.Testing.run_batch/2
helper handles inserting and executing all jobs in a batch,
including any appropriate callbacks.
test "running all jobs in a batch and the callbacks" do
batch =
["a@example.com", "b@example.com", "c@example.com"]
|> Enum.map(&MyBatch.new(%{email: &1}))
|> MyBatch.new_batch(batch_callback_args: %{admin_email: "me@admin.com"})
assert %{completed: 4} = run_batch(batch)
end
When you're application code inserts a batch on its own, outside the context of your test, you
can't call run_batch/1,2
. In that case, you can use Oban.Pro.Testing.drain_jobs/1
instead to
execute the jobs and the callback:
test "draining batches inserted by application code" do
:ok = MyApp.welcome_recent_users()
assert %{completed: 4} = drain_jobs(queue: :all)
end
Testing Chunks
Chunk workers process jobs in groups based on size or a timeout. They are an outlier amongst other
workers because the process/1
callback receives a list of jobs rather than a single job.
That difference prevents chunks from working with perform_job/2
, and instead you can use the
Oban.Pro.Testing.perform_chunk/3
helper.
To demonstrate testing chunks we'll define a worker that checks a batch of password hashes against a pwned database and notifies admins when a significant ratio of hashes are compromised.
defmodule MyApp.MyChunk do
use Oban.Pro.Workers.Chunk, size: 100, timeout: :timer.seconds(30)
@impl true
def process([_ | _] = jobs) do
pwned_count = Enum.count(jobs, fn %{args: args} -> MyApp.was_pwned?(args["hash"]) end)
pwned_ratio = pwned_count / length(jobs)
if pwned_ratio > 0.1, do: MyApp.deliver_pwned_alert()
{:ok, pwned_count}
end
end
Now, exercise process/1
with perform_chunk/3
:
test "calculating the ratio of pwned password hashes" do
clear_args = for _ <- 1..3, do: %{hash: MyApp.gen_hash(:clear)}
pwned_args = for _ <- 1..3, do: %{hash: MyApp.gen_hash(:pwned)}
assert {:ok, 3} = perform_chunk(MyChunk, clear_args ++ pwned_args)
end
Unit testing chunks is convenient for checking edge cases, but it lacks the depth and reality of real chunked execution. For that, we need integration testing.
Integration Testing Chunks
During normal execution, chunk size is limited to the configured size
. In our example above, the
size
is set to 100
, which means that a chunk may process up to 100 jobs at once. To verify
our chunking we'll insert and execute the chunk jobs with Oban.Pro.Testing.run_chunk/2
:
test "running up to 100 jobs at a time" do
jobs = Enum.map(1..150, fn _ -> MyChunk.new(%{hash: MyApp.gen_hash(:clear)}) end)
assert %{completed: 2} = run_chunk(jobs)
# Integration tests are about side effects; assert no alert email was delivered
refute email_delivered()
end
As with the other run_*
functions, if you need to execute jobs that were inserted within
application code, use drain_jobs
instead:
test "draining chunks inserted by application code" do
# Prepare the database with less than 100 recent users
:ok = MyApp.check_pwned_signups()
assert %{completed: 1} = drain_jobs(queue: :chunked)
end
Testing Chains
Chain workers link jobs together to ensure they run in a strict sequential order. Because they
lack callbacks like Batches or an alternative process/
signature like Chunks, individual jobs
are testable with the standard perform_job/3
helper.
To demonstrate testing a chain, we'll define an account management worker that receives external balance notifications and synchronizes them locally.
defmodule MyApp.MyChain do
use Oban.Pro.Workers.Chain, queue: :transactions, by: [args: :account_id]
@impl true
def process(%Job{args: %{"account_id" => account_id, "balance" => balance}}) do
account_id
|> MyApp.Account.find()
|> MyApp.Account.update_balance(balance)
end
end
Now, use perform_job/3
to verify that updating works as expected:
test "updating account balances" do
args = %{account_id: insert(:account).id, balance: 100}
assert {:ok, %{balance: 100}} = perform_job(MyChain, args)
end
The real power of Chains only shows when multiple jobs run in sequence. For that, we need integration testing.
Integration Testing Chains
Downstream execution halts when a Chain job encounters an error. Subsequent jobs snooze while
waiting for the upstream job to complete successfully. We can test this with run_chain/2
and by
disabling the with_recursion
option to prevent the failing job from retrying until exhaustion.
test "hold balance processing when an error occurs" do
jobs = [
MyChain.new(%{account_id: account.id, balance: 0.0}),
MyChain.new(%{account_id: account.id, balance: 100}),
MyChain.new(%{account_id: account.id, balance: 150})
]
assert %{retryable: 1, scheduled: 2} = run_chain(changesets, with_recursion: false)
end
Testing Workflows
Workflows jobs compose together with arbitrary dependencies that effect if and when jobs are
executed. Individual jobs in a workflow are easily tested with perform_job/3
, but the real
challenge is testing the interplay between jobs as they execute.
For this demonstration we'll build a workflow that applies various natural language processing to a text submission.
defmodule MyApp.MyWorkflow do
use Oban.Pro.Workers.Workflow, recorded: true
@impl true
def process(%{args: %{"text" => text, "mode" => mode}}) do
analysis_fun =
case mode do
"complexity" => :complexity_analysis
"sentiment" => :sentiment_analysis
"syntax" => :syntax_analysis
end
apply(MyApp, analysis_fun, [text])
end
def process(job) do
expressiveness =
job
|> all_workflow_jobs(only_deps: true)
|> Enum.map(fn job, acc -> {job.args["mode"], fetch_recorded(job)} end)
|> MyApp.expressiveness()
{:ok, expressiveness}
end
end
Our workflow is defined as a single worker with multiple process/1
clauses. Typically, workflows
are composed of multiple workers, but there isn't any practical difference.
Each of the clauses can be exercised with perform_job/3
:
test "analyzing text sentiment" do
assert {:ok, :positive} = perform_job(MyWorkflow, %{text: text(), mode: :sentiment})
end
Dependencies between jobs are what defines a workflow, and to test those dependencies we need integration tests.
Integration Testing Workflows
Downstream workflow jobs only run when their upstream dependencies have completed successfully. To
verify ordered execution between dependencies we'll insert and execute jobs using
Oban.Pro.Testing.run_workflow/2
.
test "running through a complete NLP analysis workflow" do
text = text_sample()
workflow =
MyWorkflow.new_workflow()
|> MyWorkflow.add(:com, MyWorkflow.new(%{text: text, mode: :complexity}))
|> MyWorkflow.add(:sen, MyWorkflow.new(%{text: text, mode: :sentiment}))
|> MyWorkflow.add(:syn, MyWorkflow.new(%{text: text, mode: :syntax}))
|> MyWorkflow.add(:exp, MyWorkflow.new(%{}), deps: [:com, :sen, :syn])
assert [_com, _sen, _syn, exp_job] = run_workflow(workflow, with_summary: false)
assert {:ok, 0.8} = MyWorkflow.fetch_recorded(exp_job)
end
The test executes all upstream jobs and then uses the results to compute a score in the final
downstream job. Because we want to verify the result of the final job, we use with_summary: false
to give us the completed jobs rather than a count summary.
Finally, you can use drain_jobs
directly when your application code inserts the workflow:
test "draining workflows inserted by application code" do
:ok = MyApp.analyze_text(text_sample())
assert %{completed: 4} = drain_jobs(queue: :all)
end