Clojure background job queue on top of PostgreSQL 9.5.
Clojure background job queue on top of PostgreSQL 9.5.
It allows creating background jobs, placing those jobs on multiple queues, and processing them later.
Background jobs can be any named Clojure function.
The project is mostly inspired by Que,
Resque and Celery.
It hasn’t been proven yet, but Byplay can experience the problem described in Que docs:
Que’s job table undergoes a lot of churn when it is under high load, and like any heavily-written table,
is susceptible to bloat and slowness if Postgres isn’t able to clean it up. The most common cause of this is
long-running transactions, so it’s recommended to try to keep all transactions against the database housing
Que’s job table as short as possible. This is good advice to remember for any high-activity database,
but bears emphasizing when using tables that undergo a lot of writes.
This PostgreSQL issue is explained in more detail in the article
“Postgres Job Queues & Failure By MVCC”.
Alpha version. The library hasn’t been used in production yet. But it has a nice suite of tests.
Add dependency to your project:
[byplay "0.4.0"]
Require a namespace:
(ns my-app.core
(:require
[byplay.core :as b]
,,,))
On your app start setup Byplay table and the accompanying indexes in the database (it’s safe to call this function more than once):
(b/migrate jdbc-conn)
Here jdbc-conn
is a “raw” JDBC connection.
There are different ways to obtain such instance:
1) Via funcool/clojure.jdbc JDBC wrapper:
(with-open [conn (jdbc.core/connection dbspec)]
(let [jdbc-conn (jdbc.proto/connection conn)]
,,,))
2) Via clojure/java.jdbc JDBC wrapper:
(clojure.java.jdbc/with-db-connection [conn db-spec]
(let [jdbc-conn (clojure.java.jdbc/db-connection conn)]
,,,))
3) Via JDBC datasource (e.g. HikariCP):
(with-open [jdbc-conn (.getConnection datasource)]
,,,)
Define a job function:
(defn my-job
[ctx x y z]
(do-something-in-job-transaction1 (:jdbc-conn ctx))
; or if you use funcool/clojure.jdbc JDBC wrapper:
(do-something-in-job-transaction2 (:conn ctx))
,,,)
Here (:jdbc-conn ctx)
is a JDBC connection with the current transaction in progress and(:conn ctx)
is the same connection wrapped by funcool/clojure.jdbc
connection instance.
Put the job into :default
queue:
(b/schedule jdbc-conn #'my-job 1 2 3)
Explicitly specify another queue using schedule-to
:
(b/schedule-to jdbc-conn :my-queue #'my-job 1 2 3)
Or specify the queue in the job metadata at :byplay.core/queue
key:
(defn ^{::b/queue :my-queue} my-job
[ctx x y z]
,,,)
(b/schedule jdbc-conn #'my-job 1 2 3)
Define an instance of funcool/clojure.jdbc
database specification, e.g.:
(def dbspec {:classname "org.postgresql.Driver"
:subprotocol "postgresql"
:subname "//localhost:5432/myapp"})
Start a background worker with 2 concurrent work threads, each polling the specified queue for a new job every 5 seconds:
(b/start (b/new-worker dbspec {:threads-num 2
:queues [:my-queue]
:polling-interval 5000
:on-fail (fn on-fail
[worker exc {:keys [id job args queue state] :as _job}]
,,,)}))
on-fail
function will be called if exception is thrown from the job.
You can ask a worker to finish all currently running jobs and stop polling a database with interrupt
method.
For example this is how a worker can be gracefully stopped in
the application shutdown hook):
(.addShutdownHook (Runtime/getRuntime)
(Thread. #(do
; stop the worker before other services (to not break jobs in progress)
(doto worker b/interrupt b/join)
; stop other services
,,,)))
Because in rare cases a job may be started more than once.
E.g. a worker may die in the middle of a job execution leaving this job in new state.
Thanks to transactional guarantees, if job only updates the database then you don’t have to worry about this problem.
Just don’t forget to use a connection from the job context.
See funcool/clojure.jdbc docs.
Otherwise Byplay will create a new connection to the database on every poll.
If you schedule a job and than rename its namespace/function than worker won’t find the job var and will fail the task.
Also be careful with changing job args.
It is possible that an exception can occur in the worker thread outside of a job function.
By default such exceptions silently kill a background thread. So it’s a good practice to be ready to explicitly detect them with
Thread/setDefaultUncaughtExceptionHandler.
More information can be found at the project site:
Copyright © 2016 Yuri Govorushchenko.
Released under an MIT license.