Deterministic Background Tasks for Django 6.0+ powered by Go.
Reproq is a production-grade tasks backend that combines the ease of Django with the performance and reliability of the Reproq Worker, a high-performance execution engine written in Go.
Reproq is split into two specialized components:
SKIP LOCKED for high-performance claiming.spec_hash for deduplication.| Component | Supported |
|---|---|
| Django | 6.x (Django>=6.0,<7.0) |
| Python | 3.12+ (dev/CI uses 3.12.x) |
| Reproq Worker | Latest release (install/upgrade via python manage.py reproq install / upgrade) |
Use python manage.py reproq doctor to verify schema, worker binary, and DSN alignment.
uv pip install reproq-django
# or: pip install reproq-django
Add reproq_django to your INSTALLED_APPS and configure the TASKS backend:
INSTALLED_APPS = [
...,
"reproq_django",
]
TASKS = {
"default": {
"BACKEND": "reproq_django.backend.ReproqBackend",
"OPTIONS": {
"DEDUP_ACTIVE": True,
"TIMEOUT_SECONDS": 900,
"MAX_ATTEMPTS": 3,
}
}
}
Use the Django 6.0 Tasks API with a @task decorator:
from django.tasks import task
@task(queue_name="default", priority=0)
def send_welcome_email(user_id: int) -> str:
# business logic here
return f"Email sent to {user_id}"
Bootstrap writes a config file, installs the worker binary, and runs both migration steps:
python manage.py reproq init
Use --format toml, --skip-install, --skip-migrate, or --skip-worker-migrate if you need a lighter touch.
If you want only the worker binary:
python manage.py reproq install
This command detects your OS/Architecture and fetches the correct pre-built binary from GitHub. No Go installation required!
python manage.py reproq migrate-worker
python manage.py migrate
Note: migrate-worker applies necessary Postgres optimizations (indexes, extensions) that Django migrations cannot handle.
It also backfills task_path in batches and ensures the task_runs_task_path_not_empty check exists; new installs validate it immediately, while older installs can validate it later if desired.
python manage.py reproq worker
You have two options (choose one):
Option A: Run Beat (one process per database)
python manage.py reproq beat
Option B: Use Postgres-native scheduling (pg_cron)
python manage.py reproq pg-cron --install
pg_cron requires the Postgres extension to be enabled (see docs/deployment.md).
Run python manage.py reproq <subcommand> to manage the worker and day-to-day ops.
Full reference (examples + exit codes): docs/cli.md.
init writes reproq.yaml/reproq.toml, installs the worker, and runs migrations.config --explain prints the effective config and its precedence.doctor --strict validates DSN, schema, worker binary, and allowlist.upgrade fetches the latest worker release and optionally runs migrate-worker.allowlist --write --config reproq.yaml populates allowed_task_modules.status/stats, logs --id <result_id>, cancel --id <result_id>.Reproq is built for teams who want deterministic background tasks without adding Redis or RabbitMQ.
If you need complex routing, multi-broker support, or huge existing Celery ecosystems, Celery may still fit better. Reproq prioritizes clarity, determinism, and low operational overhead.
Reproq fully implements the Django 6.0 Tasks API.
Use the standard @task decorator. Reproq respects queue_name and priority.
from django.tasks import task
@task(queue_name="high-priority", priority=100)
def send_welcome_email(user_id):
# logic here
return f"Email sent to {user_id}"
You can also annotate a task with a concurrency limit:
from reproq_django.concurrency import limits_concurrency
@limits_concurrency(lambda user_id: f"user:{user_id}", to=1)
@task(queue_name="high-priority", priority=100)
def send_welcome_email(user_id):
return f"Email sent to {user_id}"
Use .enqueue() to dispatch tasks. Reproq supports additional arguments via kwargs.
# Standard Enqueue
result = send_welcome_email.enqueue(123)
# Scheduled Execution (run_after)
from datetime import timedelta
result = send_welcome_email.using(run_after=timedelta(minutes=10)).enqueue(123)
# Concurrency Control (lock_key)
# Ensure only one task with this key runs at a time
result = send_welcome_email.enqueue(123, lock_key=f"user_123_sync")
# Concurrency Control (limit per key)
# Allow up to N concurrent tasks sharing a key
result = send_welcome_email.enqueue(
123,
concurrency_key="user:123",
concurrency_limit=2,
)
Supported enqueue kwargs (Reproq extensions):
run_after: datetime or timedelta. Delays execution.lock_key: str. Prevents multiple tasks with the same key from being in RUNNING state simultaneously.priority: int. Overrides the taskβs default priority for this enqueue only.concurrency_key: str or callable. Limits concurrent tasks sharing the key.concurrency_limit: int. Maximum concurrent tasks for the key (0 disables).Reserved kwargs: run_after, lock_key, priority, concurrency_key, and concurrency_limit are treated as scheduling metadata and are removed from task kwargs. If your task needs parameters with these names, rename them.
Note on Priority: Task priority is set at definition time via @task(priority=...) and can be overridden per call via enqueue(priority=...).
If you are in an async view or task producer, use aenqueue() to avoid blocking:
result = await send_welcome_email.aenqueue(123)
In sync/Wsgi code, continue to use enqueue().
If your task needs context (result id, attempt, metadata), use takes_context=True.
from django.tasks import task
@task(takes_context=True)
def process_upload(context, upload_id):
context.metadata["stage"] = "starting"
context.save_metadata()
# work...
context.metadata["stage"] = "done"
context.save_metadata()
Metadata is stored in task_runs.metadata_json and surfaced in task results.
For high-throughput scenarios, use bulk_enqueue to insert thousands of tasks in a single query.
from django.tasks import tasks
from datetime import timedelta
backend = tasks["default"]
jobs = []
for i in range(1000):
# (task_func, args, kwargs)
jobs.append((
send_welcome_email,
(i,),
{"lock_key": f"user_{i}", "run_after": timedelta(seconds=i)}
))
backend.bulk_enqueue(jobs)
Reproq stores schedules in the PeriodicTask model. You must run exactly one
scheduler per database:
python manage.py reproq beatpython manage.py reproq pg-cron --install (Postgres-native)If you choose pg_cron, run reproq pg-cron --install after every deploy or
migration to keep schedules in sync. On managed platforms, prefer
--if-supported so deploys do not fail if pg_cron is unavailable.
If you cannot run a long-lived beat process (low-memory environments), run beat as a one-shot command from crontab every minute:
* * * * * /path/to/venv/bin/python manage.py reproq beat --once
* * * * * /path/to/venv/bin/python manage.py reproq schedule
You can manage schedules in the Django Admin under βReproq Djangoβ or via code.
from django.utils import timezone
from reproq_django.models import PeriodicTask
PeriodicTask.objects.update_or_create(
name="Nightly cleanup",
defaults={
"cron_expr": "0 2 * * *",
"task_path": "myapp.tasks.nightly_cleanup",
"queue_name": "maintenance",
"next_run_at": timezone.now(),
"enabled": True,
},
)
This pattern keeps schedules in sync across environments without migrations.
from django.apps import AppConfig
from django.db import connections
from django.db.models.signals import post_migrate
from django.utils import timezone
from reproq_django.models import PeriodicTask
def _setup_periodic_tasks(**kwargs):
using = kwargs.get("using")
connection = connections[using]
if "periodic_tasks" not in connection.introspection.table_names():
return
PeriodicTask.objects.update_or_create(
name="Nightly cleanup",
defaults={
"cron_expr": "0 2 * * *",
"task_path": "myapp.tasks.nightly_cleanup",
"queue_name": "maintenance",
"next_run_at": timezone.now(),
"enabled": True,
},
)
class MyAppConfig(AppConfig):
name = "myapp"
def ready(self) -> None:
post_migrate.connect(_setup_periodic_tasks, sender=self)
Use the ORM to force a schedule to run, or enqueue the task directly.
from django.utils import timezone
from reproq_django.models import PeriodicTask
from myapp.tasks import nightly_cleanup
PeriodicTask.objects.filter(name="Nightly cleanup").update(
next_run_at=timezone.now()
)
# Or bypass the schedule
nightly_cleanup.enqueue()
cron_expr uses standard 5-field cron syntax: min hour day month weekday.task_path must be the full Python import path for the task.queue_name is optional; when set, the worker must listen on that queue.payload_json should use the {"args": [...], "kwargs": {...}} envelope, with values
encoded the same way as task enqueues (e.g., timedeltas/models are supported).ALLOWED_TASK_MODULES is used.Use the @recurring decorator to keep periodic schedules in source control.
from django.tasks import task
from reproq_django.recurring import recurring
@recurring(schedule="0 9 * * *", key="daily_report", args=(42,))
@task(queue_name="maintenance")
def send_report(account_id):
...
Sync code-defined schedules with:
python manage.py reproq sync-recurring
By default, recurring tasks auto-sync on post_migrate. Set
REPROQ_RECURRING_AUTOSYNC=False to disable.
Configure Reproq behavior via the TASKS setting in settings.py.
TASKS = {
"default": {
"BACKEND": "reproq_django.backend.ReproqBackend",
"OPTIONS": {
# Deduplication (Default: True)
# If True, enqueuing a task with the exact same arguments as a
# READY/RUNNING task will return the existing result_id.
"DEDUP_ACTIVE": True,
# Execution Timeout (Default: 900)
# Max seconds a task can run before being killed by the worker.
"TIMEOUT_SECONDS": 900,
# Retry Limit (Default: 3)
# Max number of attempts for a task.
"MAX_ATTEMPTS": 3,
# Expiry (Optional)
# If set, tasks not picked up by this time will be marked expired.
"EXPIRES_IN": timedelta(hours=24),
# Provenance (Optional)
# Metadata stored with the task for auditing.
"CODE_REF": "git-sha-or-version",
"PIP_LOCK_HASH": "hash-of-dependencies",
}
}
}
Worker config files (reproq.yaml/reproq.toml) are optional. Precedence is:
defaults < config file < env vars < CLI flags. --dsn overrides DATABASE_URL, and
DATABASE_URL is optional when flags or a config file are provided.
Route specific queues to different Django database aliases.
DATABASES = {
"default": {...},
"queues": {...},
}
REPROQ_QUEUE_DATABASES = {
"high-priority": "queues",
"bulk-*": "queues", # glob patterns supported
}
Use REPROQ_DEFAULT_DB_ALIAS to change the fallback alias (defaults to
"default"). When multiple queue databases are configured, task result IDs are
prefixed with the database alias (queues:123). Set
REPROQ_RESULT_ID_WITH_ALIAS=False to force legacy IDs.
To route all Reproq tables to a non-default alias in ORM reads/writes (admin,
health checks, etc.), add the router:
DATABASE_ROUTERS = ["reproq_django.db_router.ReproqRouter"]
Run one worker/beat per database:
python manage.py reproq worker --database queues --queues high-priority,bulk-1
python manage.py reproq beat --database queues
Use REPROQ_STATS_DATABASES=["*"] to aggregate stats across all queue databases.
We standardize on Python 3.12.x for local development.
bash scripts/dev_bootstrap.sh
uv run pytest
Always run tests with uv run pytest so dependencies and settings stay consistent.
Reproq supports complex task dependencies.
Execute tasks one after another. If a task fails, the chain stops.
from reproq_django.workflows import chain
# task_a -> task_b -> task_c
c = chain(
(task_a, (1,), {}),
task_b, # no args
(task_c, (), {"param": "val"})
)
results = c.enqueue()
# results[0] is READY, results[1..] are WAITING
Execute tasks in parallel.
from reproq_django.workflows import group
g = group(
(resize_image, ("img1.jpg",), {}),
(resize_image, ("img2.jpg",), {}),
)
results = g.enqueue()
Run a callback once a group finishes.
from reproq_django.workflows import chord
group_results, callback_result = chord(
(resize_image, ("img1.jpg",), {}),
(resize_image, ("img2.jpg",), {}),
callback=notify_done,
).enqueue()
The callback runs only after all group tasks succeed. Failed tasks leave the callback waiting. Failures mark the workflow as failed and the callback will not run.
Retries are managed by the Go worker. When a task fails and attempts remain, it is re-queued with an exponential backoff:
2^attempt (attempt starts at 1)The worker updates run_after on the failed task, and the backend will only claim it after that timestamp.
Reproq Worker enforces token bucket limits stored in the rate_limits table. You can manage these from Django Admin or via the worker CLI.
Keys:
queue:<queue_name> limits a specific queue.task:<task_path> limits a specific task (overrides queue/global).global is a fallback when no task/queue limit exists.Example:
reproq limit set --key queue:default --rate 5 --burst 10
Defaults: global rate limiting is disabled until you set a positive rate.
The Go worker binary includes operational commands you can run directly:
# Request cancellation of a running task
reproq cancel --dsn "..." --id 12345
# Inspect failed tasks
reproq triage list --dsn "..." --limit 50
The python manage.py reproq command is your Swiss Army knife.
| Subcommand | Description |
|---|---|
init |
Bootstraps Reproq in the current project. |
worker |
Starts the Go worker. Flags: --config, --concurrency (default 10), --queues, --allowed-task-modules, --logs-dir, --payload-mode, --metrics-port, --metrics-addr, --metrics-auth-token, --metrics-allow-cidrs, --metrics-tls-cert, --metrics-tls-key, --metrics-tls-client-ca, --database. Auto-configures allow-list when unset (unless config file is used). |
beat |
Starts the scheduler. Flags: --config, --interval (default 30s), --once, --database. |
schedule |
Enqueue due periodic tasks once and exit (alias for beat --once). |
pg-cron |
Syncs Postgres-native schedules. Flags: --install (default), --remove, --prefix, --dry-run, --database. |
install |
Downloads/builds the worker binary. |
upgrade |
Upgrades the worker binary and optionally runs migrate-worker. |
migrate-worker |
Applies essential SQL schema optimizations (indexes, extensions). |
check |
Validates binary path, DB connection, and schema health. |
doctor |
Validates DSN, schema, worker binary, and allowlist; --strict fails on warnings. |
config |
Prints effective worker/beat config; use --explain for precedence. |
allowlist |
Prints ALLOWED_TASK_MODULES or writes them to a config file with --write. |
sync-recurring |
Sync code-defined recurring schedules into periodic_tasks. |
pause-queue |
Pause a queue (prevents new claims). |
resume-queue |
Resume a paused queue. |
logs |
Prints logs for a task run using logs_uri (supports aliased result IDs). |
cancel |
Requests cancellation of a task run by result ID (supports aliased result IDs). |
reclaim |
Requeue or fail tasks with expired leases (--database / --all-databases). |
prune-workers |
Delete workers not seen recently (--database / --all-databases). |
prune-successful |
Delete successful task runs older than a cutoff (--database / --all-databases). |
prune |
Delete task runs by status/age (--database / --all-databases). |
stats / status |
Shows task counts by status and active workers (--database / --all-databases). |
systemd |
Generates systemd service files for production. |
stress-test |
Enqueues dummy tasks for benchmarking. |
When you include reproq_django.urls in your project, GET /stats/ returns
JSON task counts, per-queue task counts, worker records, and periodic task
schedules. Access is granted to staff sessions, a signed TUI JWT, or
METRICS_AUTH_TOKEN as a bearer token.
Additional fields include worker_health, queue_controls, scheduler, and
per-database rollups when REPROQ_STATS_DATABASES is set.
Set METRICS_AUTH_TOKEN to enable the TUI login flow and sign TUI JWTs. This
token is also accepted as a bearer token for /reproq/stats/ and is forwarded
to the worker metrics proxy endpoints. If you do not want to expose SSE, set
REPROQ_TUI_DISABLE_EVENTS=1 to omit the /reproq/tui/events/ stream from the
TUI config payload.
If METRICS_AUTH_TOKEN is unset, the TUI auth endpoints return 404.
Set LOW_MEMORY_MODE=1 to disable the worker proxy endpoints
(/reproq/tui/metrics/, /reproq/tui/healthz/, /reproq/tui/events/). The
endpoints return a 503 with a short hint while low-memory mode is enabled, and
events are omitted from the TUI config payload.
Set REPROQ_MEMORY_LOG_INTERVAL=60s to emit periodic memory logs from the
Django process (includes RSS on Linux).
Pass the TUI JWT in the Authorization: Bearer ... header when accessing
these endpoints programmatically.
The Go worker/beat support YAML/TOML config files. manage.py reproq worker and manage.py reproq beat
will load a config file when --config or REPROQ_CONFIG is set. If no worker/beat flags are provided,
they also look for reproq.yaml, reproq.yml, reproq.toml, .reproq.yaml, .reproq.yml, or .reproq.toml
in the current working directory. CLI flags override config values; environment variables override config values too.
--dsn always overrides DATABASE_URL, and DATABASE_URL is optional when a config file or flags are provided.
See reproq.example.yaml and reproq.example.toml for full templates.
Queue selection uses --queues (comma-separated). The legacy --queue flag remains for compatibility but is deprecated.
--concurrency): more goroutines in a single worker process; best for I/O-heavy tasks with minimal overhead.Rule of thumb: start with 1-2 workers per host and tune --concurrency to available CPU cores and workload type.
Reproq integrates deeply with the Django Admin.
next_run_at to run a job immediately.WAITING or CANCELLED map to PENDING/CANCELLED when supported by Djangoβs TaskResultStatus. The original value is always available via raw_status.Generate service files to run the worker and beat processes as background daemons.
python manage.py reproq systemd --user myuser --concurrency 20
This generates reproq-worker.service and reproq-beat.service. Copy them to /etc/systemd/system/ and enable them.
You can pass metrics flags (for example --metrics-addr 127.0.0.1:9090) or use --env-file to load METRICS_AUTH_TOKEN and METRICS_ALLOW_CIDRS.
To use cron-style scheduling instead of a persistent beat process, generate a timer:
python manage.py reproq systemd --schedule --schedule-on-calendar "*-*-* *:*:00"
Set REPROQ_SCHEDULER_MODE=cron in your EnvironmentFile and enable the timer to ensure
only one scheduler runs.
The Go worker relies on standard environment variables:
DATABASE_URL: postgres://user:pass@host:5432/db (optional if you provide --dsn or a config file)WORKER_ID: (Optional) Unique name for the node.REPROQ_WORKER_BIN: (Optional) Path to the binary if not using manage.py reproq install.REPROQ_CONFIG: (Optional) Path to a YAML/TOML worker/beat config file.ALLOWED_TASK_MODULES: (Optional) Comma-separated task module allow-list for the worker. If unset, manage.py reproq worker auto-configures it from discovered task modules.REPROQ_LOGS_DIR: (Optional) Directory to persist worker stdout/stderr logs (updates task_runs.logs_uri).REPROQ_MEMORY_LOG_INTERVAL: (Optional) Log memory stats at the given interval (e.g., 60s).METRICS_AUTH_TOKEN: (Optional) Bearer token for /metrics, /healthz, and /events.METRICS_ALLOW_CIDRS: (Optional) Comma-separated IP/CIDR allow-list for metrics/health.METRICS_TLS_CERT: (Optional) TLS certificate path for health/metrics.METRICS_TLS_KEY: (Optional) TLS private key path for health/metrics.METRICS_TLS_CLIENT_CA: (Optional) Client CA bundle to require mTLS for health/metrics.If DATABASE_URL is not set, manage.py reproq worker derives a DSN from settings.DATABASES["default"].
Worker binary resolution order:
REPROQ_WORKER_BIN (setting or env)./.reproq/bin/reproq (installed by reproq install)reproq_django/bin/reproq (packaged fallback)PATHReproq is split into two repos:
Issues and PRs are welcome in both!
Apache License 2.0. See LICENSE.