aboutsummaryrefslogtreecommitdiff

jobd

jobd is a simple job queue daemon that works with persistent queue storage. It uses a MySQL table as a storage backend (for queue input and output).

Currently, MySQL is the only supported storage. Other backends may be easily supported though.

It is by design that jobd never adds nor deletes jobs from storage. It only reads (when a certain request arrives) and updates them (during execution, when job status changes). Succeeded or failed, your jobs are never lost.

jobd consists of 2 parts:

  1. jobd is a "worker" daemon that reads jobs from the database, enqueues and launches them. There may be multiple instances of jobd running on multiple hosts. Each jobd instance may have unlimited number of queues (called "targets"), each with its own concurrency limit.

  2. jobd-master is a "master" or "central" daemon that simplifies control over many jobd instances. There should be only one instance of jobd-master running. jobd-master is not required for jobd workers to work (they can work without it), but it's very very useful.

In addition, there is a command line utility called jobctl.

Originally, jobd was created as a saner alternative to Gearman. It's been used in production with a large PHP web application on multiple servers for quite some time already, and proven to be stable and efficient. We were also monitoring the memory usage of all our jobd instances for two months, and can confirm there are no leaks.

Table of Contents

How it works

Targets

Every jobd instance has its own set of queues, called targets. A name of a target is an arbitrary string, the length of which should be limited by the size of target field in the MySQL table.

Each target has its own concurrency limit (the maximum number of jobs that may be executed simultaneously). Targets are loaded from the config at startup, and also may be added or removed at runtime, by add-target(target: string, concurrency: int) and remove-target(target: string) requests.

The purpose of targets is to logically separate jobs of different kinds by putting them in different queues. For instance, targets can be used to simulate jobs priorities:

[targets]
low = 5
normal = 5
high = 5 

The config above defines three targets (or three queues), each with a concurrency limit of 5.

Or, let's imagine a scenario when you have two kinds of jobs: heavy, resource-consuming, long-running jobs (like video processing) and light, fast and quick jobs (like sending emails). In this case, you could define two targets, like so:

[targets]
heavy = 3
quick = 20

This config would allow running at most 3 heavy and up to 20 quick jobs simultaneously.

:thought_balloon: In the author's opinion, the approach of having different targets (queues) for different kinds of jobs is better than having a single queue with each job having a "priority".

Imagine you had a single queue with maximum number of simultaneously running jobs set to, say, 20. What would happen if you'd add a new job, even with the highest priority possible, when there's already 20 slow jobs running? No matter how high the priority of new job is, it would have to wait.

By defining different targets, jobd allows you to create dedicated queues for such jobs, making sure there's always a room for high-priority tasks to run as early as possible.

Creating jobs

Each job is described by one record in the MySQL table. Here is a table scheme with a minimal required set of fields:

CREATE TABLE `jobs` (
  `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
  `target` char(16) NOT NULL,
  `time_created` int(10) UNSIGNED NOT NULL,
  `time_started` int(10) UNSIGNED NOT NULL DEFAULT 0,
  `time_finished` int(10) UNSIGNED NOT NULL DEFAULT 0,
  `status` enum('waiting','manual','accepted','running','done','ignored') NOT NULL DEFAULT 'waiting',
  `result` enum('ok','fail') DEFAULT NULL,
  `return_code` tinyint(3) UNSIGNED DEFAULT NULL,
  `sig` char(10) DEFAULT NULL,
  `stdout` mediumtext DEFAULT NULL,
  `stderr` mediumtext DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `status_target_idx` (`status`, `target`, `id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

As you can see: 1. Each job has a unique ID. You don't need to care about assigning IDs because AUTO_INCREMENT is used. 2. Each job is associated with some target, or, in other words, is put to some queue. More about targets in the Targets section. 3. There are time_created, time_started and time_finished fields, and it's not hard to guess what their meaning. When creating a job, you should fill the time_created field with a UNIX timestamp. jobd will update the two other fields while executing the job. 4. Each job has a status. - A job must be created with status set to waiting or manual. - A status becomes accepted when jobd reads the job from the table and puts it to a queue, or it might become ignored in case of some error, like invalid target, or invalid status when processing a run-manual(ids: int) request. - Right before a job is getting started, its status becomes running. - Finally, when it's done, it is set to done. 5. The result field indicates whether a job completed successfully or not. - It is set to ok if the return code of launched command was 0. - Otherwise, it is set to fail. 6. The return_code field is filled with the actual return code. 7. If the job process was killed by a POSIX signal, the signal name is written to the sig field. 8. stdout and stderr of the process are written to stdout and stderr fields, accordingly.

:warning: In a real world, you'll want to have a few more additional fields, like job_name or job_data.
Check out the integration example.

To create a new job, it must be added to the table. As mentioned earlier, adding or removing rows from the table is by design outside the jobd's area of responsibility. A user must add jobs to the table manually.

There are two kinds of jobs, in terms of how they are executed: background and manual (or foreground).

  • Background jobs are created with waiting status. When jobd gets new jobs from the table (which happens upon receiving a poll(target: strings[]); this process is described in detail in the launching background jobs section), such jobs are added to their queues and get executed at some point, depending on the current queue status and concurrency limit. A user does not have control of the execution flow, the only feedback it has is the fields in the table that are going to be updated before, during and after the execution. At some point, status will become done, result and other fields will have their values filled too, and that's it.
  • Manual, or foreground jobs, is a different story. They must be created with status set to manual. These jobs are processed only upon a run-manual(ids: int[]) request. When jobd receives such request, it reads and launches the specified jobs, waits for the results and sends them back to the client in a response. Learn more about it under the launching manual jobs section.

Launching jobs

Launching (or executing) a job means running a command specified in the config as the launcher, replacing the {id} template with current job id.

For example, if you have this in the config:

launcher = php /home/user/job-launcher.php {id}

and jobd is currently executing a job with id 123, it will launch php /home/user/job-launcher.php 123.

Launching background jobs

After jobs have been added to storage, jobd must be notified about it. This is done by a poll(targets: string[]) request that a user (a client) sends to the jobd instance. The targets argument is an array (a list) of targets to poll. It can be omitted; in that case jobd will query for jobs for all targets it is serving.

When jobd receives a poll(targets: string[]) request and specified targets are not full (haven't reached their concurrency limit), it performs a SELECT query with status='waiting' condition and LIMIT set according to the mysql_fetch_limitconfig value.

For example, after receiving the poll(['1/low', '1/normal']) request, assuming mysql_fetch_limit is set to 100, jobd will query jobs from a table roughly like this:

SELECT id, status, target FROM jobs WHERE status='waiting' AND target IN ('1/low', '1/normal') ORDER BY id LIMIT 0, 100 FOR UPDATE

However, if all specified targets are full at the time of jobd receiving the poll(targets: string[]) request, the query will be delayed until at least one of the targets becomes available for new jobs.

Then it loops through results, and either accepts a job (by setting its status in the table to accepted) or ignores it (by setting a status to ignored). Accepted jobs are then added to internal queues according to their targets and executed.

Launching manual jobs

"Manual" jobs is a way of launching jobs in a blocking way ("blocking" from a client's point of view).

After jobs have been added to a storage with status set to manual, a client has to send a run-manual(ids: int[]) request to a jobd instance that serves targets the new jobs are assigned to. When jobd receives such request, it performs a SELECT query with id IN ({ids}) condition.

For example, while processing the run-manual([5,6,7]) request, jobd will make a query that looks roughly something like this:

SELECT id, status, target FROM jobs WHERE id IN ('5', '6', '7') FOR UPDATE

Then it loops through results, and either accepts a job (by setting its status in the table to accepted) or ignores it (by setting its status to ignored). Accepted jobs are then added to internal queues according to their targets and executed.

When all requested jobs are finished, one way or another (succeeded or failed), jobd compiles and sends a response to the client. The response format is described here.

Using jobd-master

If you had only one worker instance (one server, one node), it would not be a problem to use it directly. But what if you have tens or hundreds of servers, each of them serving different targets? This is where jobd-master comes in play: it's been created to simplify usage and management of multiple workers.

There should be only one instance of jobd-master running. All jobd workers are supposed to connect to it at startup. These connections between each worker and jobd-master are persistent.

When jobd worker connects to the master instance, it sends it the list of targets the worker is serving (see Fig. 1).

Let's imagine we have three servers (srv-1, srv-2 and srv-3), each having a jobd worker. All of them are serving common target named any, but they're also configured to serve their own low, normal and high targets s/low, s/normal and s/high respectively (where s is the server number):

Figure 1

┌────────────┐ ┌────────────┐  ┌────────────┐
│ jobd on    │ │ jobd on    │  │ jobd on    │
│ srv-1      │ │ srv-2      │  │ srv-3      │
├────────────┤ ├────────────┤  ├────────────┤
│ Targets:   │ │ Targets:   │  │ Targets:   │
│ - any      │ │ - any      │  │ - any      │
│ - 1/low    │ │ - 2/low    │  │ - 3/low    │
│ - 1/normal │ │ - 2/normal │  │ - 3/normal │
│ - 1/high   │ │ - 2/high   │  │ - 3/high   │
└──────┬─────┘ └─────┬──────┘  └────┬───────┘
       │             │              │
       │     ┌───────▼───────┐      │
       └─────►  jobd-master  ◄──────┘
             └───────────────┘

When targets are added or removed at runtime (by add-target() or remove-target() request), workers notify the master too. Thus, jobd-master always know which workers serve which targets.

To launch jobd via jobd-master, client needs to send a poke(targets: string[]) request to jobd-master instance, and jobd-master will send poll() requests to all appropriate workers.

For example, if you created, say, 5 jobs:

  • 3 for the any target,
  • 1 for target 2/normal, and
  • 1 for target 3/low,

you send poke('any', '2/normal', '3/low') request to jobd-master. As a result, it will send:

Also, you can launch manual (foreground) jobs in parallel on multiple workers via jobd-master and synchronously (in a blocking way) get all results. To do that, you can use the run-manual(jobs: {id: int, target: string}[]) request.

See the integration example for real code examples.

Integration example

PHP: jobd-php-example

Installation

First, you need Node.JS 14 or newer. See here now to install it using package manager.

Then install jobd using npm:

npm i -g jobd 

Usage

systemd

One of possible ways of launching jobd and jobd-master daemons is via systemd. This repository contains basic examples of jobd.service and jobd-master.service unit files. Note that jobs will be launched as the same user the jobd worker is running, so you might want to change that.

Copy .service file(s) to /etc/systemd/system, then do:

systemctl daemon-reload
systemctl enable jobd
systemctl start jobd
# repeat last two steps for jobd-master, if needed

supervisor

If you don't like systemd, supervisor might be an option. Create a configuration file in /etc/supervisor/conf.d with following content:

[program:jobd]
command=/usr/bin/jobd --config /etc/jobd.conf
numprocs=1
directory=/
stdout_logfile=/var/log/jobd-stdout.log
autostart=true
autorestart=true
user=nobody
stopsignal=TERM

Then use supervisorctl to start or stop jobd.

Other notes

:exclamation: Don't forget to filter access to jobd and jobd-master ports using a firewall. See a note here for more info.

Configuration

Configuration files are written in ini format. All available options for both daemons, as well as a command-line utility, are described below. You can copy jobd.conf.example and jobd-master.conf.example and use them as a template instead of writing configs from scratch.

jobd

Default config path is /etc/jobd.conf. Use the --config option to use a different path.

Without section:

  • host (required, string) — jobd server hostname
  • port (required, int) — jobd server port
  • password (string) — password for requests
  • always_allow_localhost (boolean, default: false) — when set to 1 or true, allows accepting requests from clients connecting from localhost without password
  • name (string, default: os.hostname()) — worker name
  • master_host (string) — master hostname
  • master_port (int) — master port. If hostname or port is omitted, jobd will not connect to master.
  • master_reconnect_timeout (int, default: 10) — if connection to master failed, jobd will be constantly trying to reconnect. This option specifies a delay between connection attempts, in seconds.
  • log_file (string) — path to a log file
  • log_level_file (string, default: warn) — minimum level of logs that are written to the file.
    Allowed values: trace, debug, info, warn, error
  • log_level_console (string, default: warn) — minimum level of logs that go to stdout.
  • mysql_host (required, string) — database host
  • mysql_port (required, int) — database port
  • mysql_user (required, string) — database user
  • mysql_password (required, string) — database password
  • mysql_database (required, string) — database name
  • mysql_table (required, string) — table name
  • mysql_fetch_limit (int, default: 100) — a number of new jobs to fetch in every request
  • launcher (required, string) — a template of shell command that will be launched for every job. {id} will be replaced with job id
  • launcher.cwd (string, default: process.cwd()) — current working directory for spawned launcher processes
  • launcher.env.{any} (string) — environment variable for spawned launcher processes
  • max_output_buffer (int, default: 1048576)

Under the [targets] section, targets are specified. Each target is specified on a separate line in the following format:

{target_name} = {n}

where: - {target_name} (string) is target name - {n} (int) is maximum count of simultaneously executing jobs for this target

jobd-master

Default config path is /etc/jobd-master.conf. Use the --config option to use a different path.

  • host (required, string)
  • port (required, int)
  • password (string)
  • always_allow_localhost (boolean, default: false)
  • ping_interval (int, default: 30) — specifies interval between workers pings.
  • poke_throttle_interval (int, default: 0.5)
  • log_file (string)
  • log_level_file (string, default: warn)
  • log_level_console (string, default: warn)

jobctl

Default config path is ~/.jobctl.conf.

  • master (boolean) — same as --master.
  • host (string) — same as --host.
  • port (int) — same as --port.
  • password (string)
  • log_level (string, default: warn)

Clients

PHP

php-jobd-client (official)

Protocol

By default, jobd and jobd-master listen on TCP ports 7080 and 7081 respectively, ports can be changed in a config.

:exclamation: jobd has been created with an assumption that it'll be used in more-or-less trusted environments (like LAN or, at least, servers within one data center) so no encryption nor authentication mechanisms have been implemented. All traffic between jobd and clients flow in plain text.

You can protect a jobd instance with a password though, so at least basic password-based authorization is supported.

Both daemons receive and send Messages. Each message is followed by EOT (0x4) byte which indicates an end of a message. Clients may send and receive multiple messages over a single connection. Usually, it's the client who must close the connection, when it's not needed anymore. A server, however, may close the connection in some situations (invalid password, server error, etc).

Messages are encoded as JSON arrays with at least one item, representing the message type:

[TYPE]

If a message of specific type has some data, it's placed as a second item:

[TYPE, DATA]

Type of TYPE is integer. Supported types are:

  • 0: Request
  • 1: Response
  • 2: Ping
  • 3: Pong

Request Message

DATA is a JSON object with following keys:

  • no (required, int)

    Unique (per connection) request number. Clients can start counting request numbers from one (1) or from any other random number. Each subsequent request should increment this number by 1. Note that zero (0) is reserved.

  • type (required, string)

    Request type. Supported request types for jobd and jobd-master are listed below.

  • data (object)

    Request arguments (if needed): an object, whose keys and values represent argument names and values.

  • password (string)

    A password, for password-protected instances. Only needed for first request.

Example (w/o trailing EOT):

[0,{no:1,type:'poll',data:{'targets':['target_1','target_2']}}]

Here is the list of supported requests, using type(arguments) notation.

jobd requests

  • poll(targets: string[])
    Get new tasks for specified targets from database. If targets argument is no specified, get tasks for all serving targets.

Response data type: string ('ok').

  • pause(targets: string[])
    Pause execution of tasks of specified targets. If targets argument is omitted, pauses all targets.

Response data type: string ('ok').

Response data type: string ('ok').

  • status()
    Returns status of internal queues and memory usage.

Response data type: object with following keys: - targets (object) - jobPromisesCount (int) - memoryUsage (NodeJS.MemoryUsage)

Response data type: string ('ok').

Response data type: string ('ok').

Response data type: string ('ok').

  • run-manual(ids: int[])
    Enqueue and run jobs with specified IDs and status set to manual, and return results.

Response data type: object with following keys: - jobs (object)

An object whose keys represent succeeded job IDs and whose values are objects
with following keys:
- `result` (string)
- `code` (int)
- `signal` (string|null)
- `stdout` (string)
- `stderr` (string)

Response data type: object with job IDs as keys and kill status (boolean where true means that signal is successfully delivered) as values.

jobd-master requests

Response Message

DATA is a JSON object with following keys:

Example (w/o trailing EOT):

[1,{no:1,data:'ok'}]

Ping and Pong Messages

No DATA.

Example (w/o trailing EOT):

[2]

TODO

  • graceful shutdown of jobd
  • support signals in jobctl

License

MIT