# jobd **jobd** is a simple job queue daemon that works with persistent queue storage. It uses a MySQL table as a storage backend (for queue input and output). Currently, MySQL is the only supported storage. Other backends may be easily supported though. It is by design that jobd never adds nor deletes jobs from storage. It only reads (when a certain request arrives) and updates them (during execution, when job status changes). Succeeded or failed, your jobs are never lost. jobd consists of 2 parts: 1. **jobd** is a "worker" daemon that reads jobs from the database, enqueues and launches them. There may be multiple instances of jobd running on multiple hosts. Each jobd instance may have unlimited number of queues (called "targets"), each with its own concurrency limit. 2. **jobd-master** is a "master" or "central" daemon that simplifies control over many jobd instances. There should be only one instance of jobd-master running. jobd-master is not required for jobd workers to work (they can work without it), but it's very very useful. In addition, there is a command line utility called **jobctl**. Originally, jobd was created as a saner alternative to Gearman. It's been used in production with a large PHP web application on multiple servers for quite some time already, and proven to be stable and efficient. We were also monitoring the memory usage of all our jobd instances for two months, and can confirm there are no leaks. ## Table of Contents - [How it works](#how-it-works) - [Targets](#targets) - [Creating jobs](#creating-jobs) - [Launching jobs](#launching-jobs) - [Launching background jobs](#launching-background-jobs) - [Launching manual jobs](#launching-manual-jobs) - [Using jobd-master](#using-jobd-master) - [Integration example](#integration-example) - [Installation](#installation) - [Usage](#usage) - [systemd](#systemd) - [supervisor](#supervisor) - [Other notes](#other-notes) - [Configuration](#configuration) - [jobd](#jobd-1) - [jobd-master](#jobd-master) - [jobctl](#jobctl) - [Clients](#clients) - [PHP](#php) - [Protocol](#protocol) - [Request Message](#request-message) - [jobd requests](#jobd-requests) - [poll(targets: string[])](#polltargets-string) - [pause(targets: string[])](#pausetargets-string) - [continue(targets: string[])](#continuetargets-string) - [status()](#status) - [run-manual(ids: int[])](#run-manualids-int) - [add-target(target: string, concurrency: int)](#add-targettarget-string-concurrency-int) - [remove-target(target: string)](#remove-targettarget-string) - [set-target-concurrency(target: string, concurrency: int)](#set-target-concurrencytarget-string-concurrency-int) - [jobd-master requests](#jobd-master-requests) - [register-worker(targets: string[], name: string)](#register-workertargets-string-name-string) - [poke(targets: string[])](#poketargets-string) - [pause(targets: string[])](#pausetargets-string-1) - [continue(targets: string[])](#continuetargets-string-1) - [status(poll_workers=false: bool)](#statuspoll_workersfalse-bool) - [run-manual(jobs: {id: int, target: string}[])](#run-manualjobs-id-int-target-string) - [Response Message](#response-message) - [Ping and Pong Messages](#ping-and-pong-messages) q- [TODO](#todo) - [License](#license) ## How it works ### Targets Every jobd instance has its own set of queues, called **targets**. A name of a target is an arbitrary string, the length of which should be limited by the size of `target` field in the MySQL table. Each target has its own concurrency limit (the maximum number of jobs that may be executed simultaneously). Targets are loaded from the config at startup, and also may be added or removed at runtime, by [`add-target(target: string, concurrency: int)`](#add-targettarget-string-concurrency-int) and [`remove-target(target: string)`](#remove-targettarget-string) requests. The purpose of targets is to logically separate jobs of different kinds by putting them in different queues. For instance, targets can be used to simulate jobs priorities: ```ini [targets] low = 5 normal = 5 high = 5 ``` The config above defines three targets (or three queues), each with a concurrency limit of `5`. Or, let's imagine a scenario when you have two kinds of jobs: heavy, resource-consuming, long-running jobs (like video processing) and light, fast and quick jobs (like sending emails). In this case, you could define two targets, like so: ```ini [targets] heavy = 3 quick = 20 ``` This config would allow running at most 3 heavy and up to 20 quick jobs simultaneously. > :thought_balloon: In the author's opinion, the approach of having different > targets (queues) for different kinds of jobs is better than having a single > queue with each job having a "priority". > > Imagine you had a single queue with maximum number of simultaneously running > jobs set to, say, 20. What would happen if you'd add a new job, even with the > highest priority possible, when there's already 20 slow jobs running? No matter > how high the priority of new job is, it would have to wait. > > By defining different targets, jobd allows you to create dedicated queues for > such jobs, making sure there's always a room for high-priority tasks to run as > early as possible. ### Creating jobs Each job is described by one record in the MySQL table. Here is a table scheme with a minimal required set of fields: ```mysql CREATE TABLE `jobs` ( `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT, `target` char(16) NOT NULL, `time_created` int(10) UNSIGNED NOT NULL, `time_started` int(10) UNSIGNED NOT NULL DEFAULT 0, `time_finished` int(10) UNSIGNED NOT NULL DEFAULT 0, `status` enum('waiting','manual','accepted','running','done','ignored') NOT NULL DEFAULT 'waiting', `result` enum('ok','fail') DEFAULT NULL, `return_code` tinyint(3) UNSIGNED DEFAULT NULL, `sig` char(10) DEFAULT NULL, `stdout` mediumtext DEFAULT NULL, `stderr` mediumtext DEFAULT NULL, PRIMARY KEY (`id`), KEY `status_target_idx` (`status`, `target`, `id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; ``` As you can see: 1. Each job has a unique ID. You don't need to care about assigning IDs because `AUTO_INCREMENT` is used. 2. Each job is associated with some target, or, in other words, is put to some queue. More about targets in the [Targets](#targets) section. 3. There are `time_created`, `time_started` and `time_finished` fields, and it's not hard to guess what their meaning. When creating a job, you should fill the `time_created` field with a UNIX timestamp. jobd will update the two other fields while executing the job. 4. Each job has a `status`. - A job must be created with status set to `waiting` or `manual`. - A status becomes `accepted` when jobd reads the job from the table and puts it to a queue, or it might become `ignored` in case of some error, like invalid `target`, or invalid `status` when processing a [run-manual(ids: int)](#run-manualids-int) request. - Right before a job is getting started, its status becomes `running`. - Finally, when it's done, it is set to `done`. 5. The `result` field indicates whether a job completed successfully or not. - It is set to `ok` if the return code of launched command was `0`. - Otherwise, it is set to `fail`. 6. The `return_code` field is filled with the actual return code. 7. If the job process was killed by a POSIX signal, the signal name is written to the `sig` field. 8. stdout and stderr of the process are written to `stdout` and `stderr` fields, accordingly. > :warning: In a real world, you'll want to have a few more additional fields, > like `job_name` or `job_data`.
> Check out the [integration example](#integration-example). To create a new job, it must be added to the table. As mentioned earlier, adding or removing rows from the table is by design outside the jobd's area of responsibility. A user must add jobs to the table manually. There are two kinds of jobs, in terms of how they are executed: **background** and **manual** (or foreground). * Background jobs are created with `waiting` status. When jobd gets new jobs from the table (which happens upon receiving a [`poll(target: strings[])`](#polltargets-string); this process is described in detail in the [launching background jobs](#launching-background-jobs) section), such jobs are added to their queues and get executed at some point, depending on the current queue status and concurrency limit. A user does not have control of the execution flow, the only feedback it has is the fields in the table that are going to be updated before, during and after the execution. At some point, `status` will become `done`, `result` and other fields will have their values filled too, and that's it. * Manual, or foreground jobs, is a different story. They must be created with `status` set to `manual`. These jobs are processed only upon a [`run-manual(ids: int[])`](#run-manualids-int) request. When jobd receives such request, it reads and launches the specified jobs, waits for the results and sends them back to the client in a response. Learn more about it under the [launching manual jobs](#launching-manual-jobs) section. ### Launching jobs **Launching** (or **executing**) a job means **running a command** specified in the config as the `launcher`, replacing the `{id}` template with current job id. For example, if you have this in the config: ``` launcher = php /home/user/job-launcher.php {id} ``` and jobd is currently executing a job with id 123, it will launch `php /home/user/job-launcher.php 123`. ### Launching background jobs After jobs have been added to storage, jobd must be notified about it. This is done by a [`poll(targets: string[])`](#polltargets-string) request that a user (a client) sends to the jobd instance. The `targets` argument is an array (a list) of `targets` to poll. It can be omitted; in that case jobd will query for jobs for all targets it is serving. When jobd receives a [`poll(targets: string[])`](#polltargets-string) request and specified targets are not full (haven't reached their concurrency limit), it performs a `SELECT` query with `status='waiting'` condition and `LIMIT` set according to the `mysql_fetch_limit`config value. For example, after receiving the [`poll(['1/low', '1/normal'])`](#polltargets-string) request, assuming `mysql_fetch_limit` is set to `100`, jobd will query jobs from a table roughly like this: ```mysql SELECT id, status, target FROM jobs WHERE status='waiting' AND target IN ('1/low', '1/normal') ORDER BY id LIMIT 0, 100 FOR UPDATE ``` > However, if all specified targets are full at the time of jobd receiving the > [`poll(targets: string[])`](#polltargets-string) request, the query will be > delayed until at least one of the targets becomes available for new jobs. Then it loops through results, and either accepts a job (by setting its status in the table to `accepted`) or ignores it (by setting a status to `ignored`). Accepted jobs are then added to internal queues according to their targets and executed. ### Launching manual jobs "Manual" jobs is a way of launching jobs in a blocking way ("blocking" from a client's point of view). After jobs have been added to a storage with `status` set to `manual`, a client has to send a [`run-manual(ids: int[])`](#run-manualids-int) request to a jobd instance that serves targets the new jobs are assigned to. When jobd receives such request, it performs a `SELECT` query with `id IN ({ids})` condition. For example, while processing the [`run-manual([5,6,7])`](#run-manualids-int) request, jobd will make a query that looks roughly something like this: ```mysql SELECT id, status, target FROM jobs WHERE id IN ('5', '6', '7') FOR UPDATE ``` Then it loops through results, and either accepts a job (by setting its status in the table to `accepted`) or ignores it (by setting its status to `ignored`). Accepted jobs are then added to internal queues according to their targets and executed. When all requested jobs are finished, one way or another (succeeded or failed), jobd compiles and sends a response to the client. The response format is described [here](#run-manualids-int). ### Using jobd-master If you had only one worker instance (one server, one node), it would not be a problem to use it directly. But what if you have tens or hundreds of servers, each of them serving different targets? This is where **jobd-master** comes in play: it's been created to simplify usage and management of multiple workers. There should be only one instance of **jobd-master** running. All jobd workers are supposed to connect to it at startup. These connections between each worker and jobd-master are persistent. When jobd worker connects to the master instance, it sends it the list of targets the worker is serving (see Fig. 1). Let's imagine we have three servers (`srv-1`, `srv-2` and `srv-3`), each having a jobd worker. All of them are serving common target named `any`, but they're also configured to serve their own `low`, `normal` and `high` targets `s/low`, `s/normal` and `s/high` respectively (where `s` is the server number): ``` Figure 1 ┌────────────┐ ┌────────────┐ ┌────────────┐ │ jobd on │ │ jobd on │ │ jobd on │ │ srv-1 │ │ srv-2 │ │ srv-3 │ ├────────────┤ ├────────────┤ ├────────────┤ │ Targets: │ │ Targets: │ │ Targets: │ │ - any │ │ - any │ │ - any │ │ - 1/low │ │ - 2/low │ │ - 3/low │ │ - 1/normal │ │ - 2/normal │ │ - 3/normal │ │ - 1/high │ │ - 2/high │ │ - 3/high │ └──────┬─────┘ └─────┬──────┘ └────┬───────┘ │ │ │ │ ┌───────▼───────┐ │ └─────► jobd-master ◄──────┘ └───────────────┘ ``` When targets are added or removed at runtime (by [`add-target()`](#add-targettarget-string-concurrency-int) or [`remove-target()`](#remove-targettarget-string) request), workers notify the master too. Thus, jobd-master always know which workers serve which targets. To launch jobd via jobd-master, client needs to send a [`poke(targets: string[])`](#poketargets-string) request to jobd-master instance, and jobd-master will send [`poll()`](#polltargets-string) requests to all appropriate workers. For example, if you created, say, 5 jobs: - 3 for the `any` target, - 1 for target `2/normal`, and - 1 for target `3/low`, you send [`poke('any', '2/normal', '3/low')`](#poketargets-string) request to jobd-master. As a result, it will send: - [`poll('any')`](#polltargets-string) request to a random worker serving the `any` target, - [`poll('2/normal')`](#polltargets-string) request to `srv-2`, and - [`poll('3/low')`](#polltargets-string) request to `srv-3`. Also, you can launch manual (foreground) jobs in parallel on multiple workers via jobd-master and synchronously (in a blocking way) get all results. To do that, you can use the [`run-manual(jobs: {id: int, target: string}[])`](#run-manualjobs-id-int-target-string) request. See the integration example for real code examples. ## Integration example PHP: [jobd-php-example](https://github.com/gch1p/jobd-php-example) ## Installation First, you need Node.JS 14 or newer. See [here](https://nodejs.org/en/download/package-manager/) now to install it using package manager. Then install jobd using npm: ``` npm i -g jobd ``` ## Usage ### systemd One of possible ways of launching jobd and jobd-master daemons is via systemd. This repository contains basic examples of [`jobd.service`](systemd/jobd.service) and [`jobd-master.service`](systemd/jobd-master.service) unit files. Note that jobs will be launched as the same user the jobd worker is running, so you might want to change that. Copy `.service` file(s) to `/etc/systemd/system`, then do: ```shell systemctl daemon-reload systemctl enable jobd systemctl start jobd # repeat last two steps for jobd-master, if needed ``` ### supervisor If you don't like systemd, [supervisor](http://supervisord.org/) might be an option. Create a configuration file in `/etc/supervisor/conf.d` with following content: ```ini [program:jobd] command=/usr/bin/jobd --config /etc/jobd.conf numprocs=1 directory=/ stdout_logfile=/var/log/jobd-stdout.log autostart=true autorestart=true user=nobody stopsignal=TERM ``` Then use `supervisorctl` to start or stop jobd. ### Other notes :exclamation: Don't forget to filter access to jobd and jobd-master ports using a firewall. See a note [here](#protocol) for more info. ## Configuration Configuration files are written in ini format. All available options for both daemons, as well as a command-line utility, are described below. You can copy [`jobd.conf.example`](jobd.conf.example) and [`jobd-master.conf.example`](jobd-master.conf.example) and use them as a template instead of writing configs from scratch. ### jobd Default config path is `/etc/jobd.conf`. Use the `--config` option to use a different path. Without section: - `host` *(required, string)* — jobd server hostname - `port` *(required, int)* — jobd server port - `password` *(string)* — password for requests - `always_allow_localhost` *(boolean, default: `false`)* — when set to `1` or `true`, allows accepting requests from clients connecting from localhost without password - `name` *(string, default: `os.hostname()`)* — worker name - `master_host` *(string)* — master hostname - `master_port` *(int)* — master port. If hostname or port is omitted, jobd will not connect to master. - `master_reconnect_timeout` *(int, default: `10`)* — if connection to master failed, jobd will be constantly trying to reconnect. This option specifies a delay between connection attempts, in seconds. - `log_file` *(string)* — path to a log file - `log_level_file` *(string, default: `warn`)* — minimum level of logs that are written to the file.
Allowed values: `trace`, `debug`, `info`, `warn`, `error` - `log_level_console` *(string, default: `warn`)* — minimum level of logs that go to stdout. - `mysql_host` *(required, string)* — database host - `mysql_port` *(required, int)* — database port - `mysql_user` *(required, string)* — database user - `mysql_password` *(required, string)* — database password - `mysql_database` *(required, string)* — database name - `mysql_table` *(required, string)* — table name - `mysql_fetch_limit` *(int, default: `100`)* — a number of new jobs to fetch in every request - `launcher` *(required, string)* — a template of shell command that will be launched for every job. `{id}` will be replaced with job id - `launcher.cwd` *(string, default: `process.cwd()`)* — current working directory for spawned launcher processes - `launcher.env.{any}` *(string)* — environment variable for spawned launcher processes - `max_output_buffer` *(int, default: `1048576`)* Under the `[targets]` section, targets are specified. Each target is specified on a separate line in the following format: ```ini {target_name} = {n} ``` where: - `{target_name}` *(string)* is target name - `{n}` *(int)* is maximum count of simultaneously executing jobs for this target ### jobd-master Default config path is `/etc/jobd-master.conf`. Use the `--config` option to use a different path. - `host` *(required, string)* - `port` *(required, int)* - `password` *(string)* - `always_allow_localhost` *(boolean, default: `false`)* - `ping_interval` *(int, default: `30`)* — specifies interval between workers pings. - `poke_throttle_interval` *(int, default: `0.5`)* - `log_file` *(string)* - `log_level_file` *(string, default: `warn`)* - `log_level_console` *(string, default: `warn`)* ### jobctl Default config path is `~/.jobctl.conf`. - `master` (boolean) — same as `--master`. - `host` *(string)* — same as `--host`. - `port` *(int)* — same as `--port`. - `password` *(string)* - `log_level` *(string, default: `warn`)* ## Clients ### PHP [php-jobd-client](https://github.com/gch1p/php-jobd-client) (official) ## Protocol By default, jobd and jobd-master listen on TCP ports 7080 and 7081 respectively, ports can be changed in a config. > :exclamation: jobd has been created with an assumption that it'll be used in > more-or-less trusted environments (like LAN or, at least, servers within one > data center) so **no encryption nor authentication mechanisms have been > implemented.** All traffic between jobd and clients flow **in plain text**. > > You can protect a jobd instance with a password though, so at least basic > password-based authorization is supported. Both daemons receive and send Messages. Each message is followed by `EOT` (`0x4`) byte which indicates an end of a message. Clients may send and receive multiple messages over a single connection. Usually, it's the client who must close the connection, when it's not needed anymore. A server, however, may close the connection in some situations (invalid password, server error, etc). Messages are encoded as JSON arrays with at least one item, representing the message type: ``` [TYPE] ``` If a message of specific type has some data, it's placed as a second item: ``` [TYPE, DATA] ``` Type of `TYPE` is integer. Supported types are: - `0`: Request - `1`: Response - `2`: Ping - `3`: Pong ### Request Message `DATA` is a JSON object with following keys: - ##### `no` (required, int) Unique (per connection) request number. Clients can start counting request numbers from one (`1`) or from any other random number. Each subsequent request should increment this number by 1. Note that zero (`0`) is reserved. - ##### `type` (required, string) Request type. Supported request types for [jobd](#jobd-requests) and [jobd-master](#jobd-master-requests) are listed below. - ##### `data` (object) Request arguments (if needed): an object, whose keys and values represent argument names and values. - ##### `password` (string) A password, for password-protected instances. Only needed for first request. Example (w/o trailing `EOT`): ``` [0,{no:1,type:'poll',data:{'targets':['target_1','target_2']}}] ``` Here is the list of supported requests, using `type(arguments)` notation. #### jobd requests * ##### `poll(targets: string[])` Get new tasks for specified `targets` from database. If `targets` argument is no specified, get tasks for all serving targets. Response [data](#data-array--object--string--int) type: **string** ('ok'). * ##### `pause(targets: string[])` Pause execution of tasks of specified targets. If `targets` argument is omitted, pauses all targets. Response [data](#data-array--object--string--int) type: **string** ('ok'). * ##### `continue(targets: string[])` Continue execution of tasks of specified targets. If `targets` argument is omitted, continues all targets. Response [data](#data-array--object--string--int) type: **string** ('ok'). * ##### `status()` Returns status of internal queues and memory usage. Response [data](#data-array--object--string--int) type: **object** with following keys: - `targets` (object) - `jobPromisesCount` (int) - `memoryUsage` (NodeJS.MemoryUsage) * ##### `add-target(target: string, concurrency: int)` Add target. Response [data](#data-array--object--string--int) type: **string** ('ok'). * ##### `remove-target(target: string)` Remove target. Response [data](#data-array--object--string--int) type: **string** ('ok'). * ##### `set-target-concurrency(target: string, concurrency: int)` Set concurrency limit of target `target`. Response [data](#data-array--object--string--int) type: **string** ('ok'). * ##### `run-manual(ids: int[])` Enqueue and run jobs with specified IDs and `status` set to `manual`, and return results. Response [data](#data-array--object--string--int) type: **object** with following keys: - `jobs` (object) An object whose keys represent succeeded job IDs and whose values are objects with following keys: - `result` (string) - `code` (int) - `signal` (string|null) - `stdout` (string) - `stderr` (string) - `errors` (object) An object whose keys represent failed job IDs and whose values are error messages. * ##### `send-signal(jobs: object)` Send signals to jobs which are still executing and return results. Response [data](#data-array--object--string--int) type: **object** with job IDs as keys and kill status (boolean where true means that signal is successfully delivered) as values. #### jobd-master requests * ##### `register-worker(targets: string[], name: string)` Used by jobd instances to register themselves with master. Clients don't need it. * ##### `poke(targets: string[])` Send [`poll(targets)`](#polltargets-string) requests to all registered workers that serve specified `targets`. * ##### `pause(targets: string[])` Send [`pause(targets)`](#pausetargets-string) requests to workers serving specified `targets`. If `targets` argument is omitted, sends [`pause()`](#pausetargets-string) to all workers. * ##### `continue(targets: string[])` Send [`continue(targets)`](#continuetargets-string) requests to workers serving specified `targets`. If `targets` argument is omitted, sends [`continue()`](#continuetargets-string) to all workers. * ##### `status(poll_workers=false: bool)` Returns list of registered workers and memory usage. If `poll_workers` argument is true, sends [`status()`](#status) request to all workers and includes their responses. * ##### `run-manual(jobs: {id: int, target: string}[])` Send [`run-manual()`](#run-manualids-int) requests to registered jobd instances serving specified targets, aggregate and return results. * ##### `send-signal(jobs: {id: int, signal: int, target: string}[])` Send [`send-signal()`](#send-signal-jobs) requests to registered jobd instances serving specified targets, aggregate and return results. ### Response Message `DATA` is a JSON object with following keys: - ##### `no` (required, int) [`no`](#no-required-int) of request this response is related to. - ##### `data` (array | object | string | int) Data, if request succeeded. - ##### `error` (string) Error message, if request failed. Example (w/o trailing `EOT`): ``` [1,{no:1,data:'ok'}] ``` ### Ping and Pong Messages No `DATA`. Example (w/o trailing `EOT`): ``` [2] ``` ## TODO - graceful shutdown of jobd - support signals in jobctl ## License MIT