Bug Fixes
-
[BatchManager] Consider all states for batch callback uniquness.
Previously, if a callback failed enough it could be discarded and not considered for subsequent uniquness checks.
[BatchManager] Consider all states for batch callback uniquness.
Previously, if a callback failed enough it could be discarded and not considered for subsequent uniquness checks.
:infinity
as duration in dynamic pruning so users
don’t have to specify ludicrous values like {999, :years}
.
[Relay] Attach the Relay
telemetry handler using module function capture
syntax to prevent warnings.
[DynamicCron] Include :expression
as an available update option to prevent
dialyzer errors.
[SmartEngine] Preserve existing rate limit fields when scaling or otherwise
changing a producer’s meta
values.
[SmartEngine] Ensure that the total fetch demand is never negative.
When running queues are converted to global mode there may be a defecit
between the total jobs and the global limit. In that case we must fetch 0
jobs rather than passing a negative number
[SmartEngine] Verify the presence of a rate-limit period before calculating window time. This fixes situations where a producer record for the same queue existed, but lacked a rate limit structure.
[Producer] Improve legacy Ecto support by lazily calculating the local limit, without assuming the params are coerced into a map.
global_limit
or rate_limit
set. Driven by popular demand, v0.9 brings partitions to the Smart Engine’s rate limiter. With partitions, rate limits are applied per-worker, on args, or on a subset of args fields rather than across the entire queue. This enables your application to enforce limits per-customer or respect external throttling, without splitting jobs into multiple queues.
rate_limit: [allowed: 100, period: 5, partition: [fields: [:worker]]
Alternatively, you can partition by the job’s account_id
field:
rate_limit: [allowed: 100, period: 5, partition: [fields: [:args], keys: [:account_id]]]
Naturally, a combination of worker
, args
, and any number of keys
works.
Check out the SmartEngine Guide for options and details.
Batches are the oldest worker in Pro, and as such, they existed prior to the
meta
field. Finally, that difference is rectified. Building batches on meta
enables a handful of ergonomic improvements and new functionality:
args
through the batch_callback_args
option batch_callback_worker
option stream_batch_jobs/2
for map/reduce processing
IMPORTANT: For in-flight batch callback jobs to run after an upgrade you’ll
need to migrate batch_id
and callback
into meta
. Run the following SQL in
a migration immediately prior to, or following, the upgrade.
UPDATE oban_jobs
SET meta = jsonb_set_lax(
jsonb_set(meta, '{batch_id}', args->'batch_id'),
'{callback}',
args->'callback',
true,
'return_target'
)
WHERE state NOT IN ('cancelled', 'completed', 'discarded')
AND args ? 'batch_id'
Workflows got a little ergonomic love, too. Now you can dynamically extend
workflows at runtime with the new append_workflow
function:
def process(%Job{} = job) do
jobs =
job
|> append_workflow(check_deps: false)
|> add(:d, WorkerD.new(%{}), deps: [:a])
|> add(:e, WorkerE.new(%{}), deps: [:b])
|> add(:f, WorkerF.new(%{}), deps: [:c])
|> Oban.insert_all()
{:ok, jobs}
end
Note the use of check_deps: false
to prevent dependency validation. To be safe
and check jobs while appending, we’ll use the new stream_workflow_jobs/1
function to load all of the previous jobs and feed them in:
def process(%Job{} = job) do
{:ok, jobs} =
MyApp.Repo.transaction(fn ->
job
|> stream_workflow_jobs()
|> Enum.to_list()
end)
jobs
|> append_workflow()
|> add(:d, WorkerD.new(%{}), deps: [:a])
|> add(:e, WorkerE.new(%{}), deps: [:b])
|> add(:f, WorkerF.new(%{}), deps: [:c])
|> Oban.insert_all()
:ok
end
[Oban] Require Oban ~> v2.9
to support the new cancel_all_jobs
engine
callback.
[Oban.Pro.Queue.SmartEngine] Make operations like refresh
more efficient
(less data transport) and more failure tolerant to prevent producer crashes.