summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-05-07 23:39:20 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-05-07 23:39:30 +0300
commit9a98ac50ff50dda2f2eed1ea825352c50c64440e (patch)
treecc5c7d36eee1ec934f8728e7f9104a9d2d760856
initial
-rw-r--r--.gitignore2
-rw-r--r--LICENSE21
-rw-r--r--README.md81
-rw-r--r--composer.json17
-rw-r--r--composer.lock59
-rw-r--r--jobd-1.conf35
-rw-r--r--jobd-2.conf35
-rw-r--r--jobd-master.conf13
-rw-r--r--schema.sql17
-rw-r--r--src/classes/Job.php51
-rw-r--r--src/classes/JobResult.php96
-rw-r--r--src/classes/jobs.php275
-rw-r--r--src/classes/model.php149
-rw-r--r--src/classes/mysql.php166
-rw-r--r--src/cron/jobs-cleanup.php5
-rw-r--r--src/functions.php47
-rw-r--r--src/init.php32
-rw-r--r--src/jobs/CreateFile.php15
-rw-r--r--src/jobs/Hello.php15
-rw-r--r--src/launcher.php31
-rw-r--r--src/main.php93
21 files changed, 1255 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..45b1244
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+.idea
+/vendor \ No newline at end of file
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..8362f72
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (C) 2021 Evgeny Zinoviev
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE. \ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..e1c64a5
--- /dev/null
+++ b/README.md
@@ -0,0 +1,81 @@
+# jobd-php-example
+
+This repository contains example of PHP integration with jobd. The code is
+mainly an excerpt from a real existent PHP application, with some changes.
+
+To launch this example and see how it works, you will need to set up a jobd-master
+instance along with two jobd worker instances.
+
+It was written and tested on my local machine with PHP 7.3, so I'm sharing all
+my configs as is. Don't forget to replace values, such as IP addresses,
+usernames, passwords and so on, with yours, and generally adjust it to your needs.
+
+## Configuration
+
+### jobd
+
+jobd configs are included in the repo: [`jobd-1.conf`](jobd-1.conf),
+[`jobd-2.conf`](jobd-2.conf), [`jobd-master.conf`](jobd-master.conf).
+
+### MySQL
+
+[`schema.sql`](schema.sql) contains schema of MySQL table used in the example.
+
+### Runtime
+
+For the sake of simplicity, runtime configuration (such as MySQL credentials)
+is stored in [`init.php`](src/init.php) as global constants. Adjust to your needs.
+
+## Usage
+
+1. Make sure **MySQL server** is running.
+
+2. Start **jobd-master** and two **jobd** instances:
+
+ ```
+ jobd-master --config jobd-master.conf
+ jobd --config jobd-1.conf
+ jobd --config jobd-2.conf
+ ```
+
+3. Install dependencies with composer:
+ ```
+ composer install
+ ```
+
+4. Test configuration:
+ ```
+ php src/main.php test
+ ```
+
+ This command will test MySQL and jobd connection.
+
+ You can also print the list of workers by executing:
+ ```
+ jobctl --master list-workers
+ ```
+
+5. Launch test jobs:
+ ```
+ php src/main.php hello
+ ```
+
+ This will launch two [`Hello`](src/jobs/Hello.php) jobs, wait for results
+ and print them.
+
+6. Launch another test job. [This one](src/jobs/CreateFile.php) will run in
+ background. It just creates a file with the name you give it. Not like
+ it's anything useful, but it's for the demo.
+ ```
+ php src/main.php createfile
+ ```
+
+ Note that if the path your specify is not absolute, it will be relative to
+ the jobd's working directory, specified `launcher.cwd` config option.
+
+ If it fails, just look into the MySQL table, there must be some error.
+
+
+## License
+
+MIT \ No newline at end of file
diff --git a/composer.json b/composer.json
new file mode 100644
index 0000000..eecd17d
--- /dev/null
+++ b/composer.json
@@ -0,0 +1,17 @@
+{
+ "name": "ch1p/jobd-php-example",
+ "description": "example usage of jobd and it's PHP client",
+ "minimum-stability": "stable",
+ "license": "MIT",
+ "authors": [
+ {
+ "name": "Evgeny Zinoviev",
+ "email": "me@ch1p.io"
+ }
+ ],
+ "require": {
+ "ch1p/jobd-client": "^1.5",
+ "ext-json": "*",
+ "ext-mysqli": "*"
+ }
+}
diff --git a/composer.lock b/composer.lock
new file mode 100644
index 0000000..e976166
--- /dev/null
+++ b/composer.lock
@@ -0,0 +1,59 @@
+{
+ "_readme": [
+ "This file locks the dependencies of your project to a known state",
+ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
+ "This file is @generated automatically"
+ ],
+ "content-hash": "96d78551c8c7b7a22cb20daf8025e024",
+ "packages": [
+ {
+ "name": "ch1p/jobd-client",
+ "version": "1.5.2",
+ "source": {
+ "type": "git",
+ "url": "https://github.com/gch1p/php-jobd-client.git",
+ "reference": "a3bb10feaea28fd54866dc9c4311a247a84a202c"
+ },
+ "dist": {
+ "type": "zip",
+ "url": "https://api.github.com/repos/gch1p/php-jobd-client/zipball/a3bb10feaea28fd54866dc9c4311a247a84a202c",
+ "reference": "a3bb10feaea28fd54866dc9c4311a247a84a202c",
+ "shasum": ""
+ },
+ "require": {
+ "ext-json": "*",
+ "ext-sockets": "*",
+ "php": ">=7.0"
+ },
+ "type": "library",
+ "autoload": {
+ "psr-4": {
+ "jobd\\": "src/"
+ }
+ },
+ "notification-url": "https://packagist.org/downloads/",
+ "license": [
+ "BSD-2-Clause"
+ ],
+ "keywords": [
+ "job",
+ "jobd",
+ "queue"
+ ],
+ "support": {
+ "issues": "https://github.com/gch1p/php-jobd-client/issues",
+ "source": "https://github.com/gch1p/php-jobd-client/tree/v1.5.2"
+ },
+ "time": "2021-03-15T22:02:40+00:00"
+ }
+ ],
+ "packages-dev": [],
+ "aliases": [],
+ "minimum-stability": "stable",
+ "stability-flags": [],
+ "prefer-stable": false,
+ "prefer-lowest": false,
+ "platform": [],
+ "platform-dev": [],
+ "plugin-api-version": "2.0.0"
+}
diff --git a/jobd-1.conf b/jobd-1.conf
new file mode 100644
index 0000000..88bac2e
--- /dev/null
+++ b/jobd-1.conf
@@ -0,0 +1,35 @@
+host = 0.0.0.0
+port = 7079
+; password =
+name = worker-1
+
+master_host = 127.0.0.1
+master_port = 7081
+master_reconnect_timeout = 10
+
+; Don't do this! Here i put it to /tmp only because it was
+; for a test. In a real world you should use something more
+; appropriate, like /var/log
+log_file = /tmp/jobd-1.log
+log_level_file = warn
+log_level_console = warn
+
+mysql_host = 10.211.55.6
+mysql_port = 3306
+mysql_user = jobd
+mysql_password = password
+mysql_database = jobd
+mysql_table = jobs2
+mysql_fetch_limit = 10
+
+launcher = php /Users/ch1p/dev/jobd-php-example/src/launcher.php {id}
+launcher.cwd = /Users/ch1p/dev/jobd-php-example/src
+launcher.env.LC_ALL = en_US.UTF-8
+launcher.env.LANGUAGE = en_US.UTF-8
+launcher.env.LANG = en_US.UTF-8
+max_output_buffer = 16777216
+
+[targets]
+1/high = 10
+1/low = 10
+any = 5
diff --git a/jobd-2.conf b/jobd-2.conf
new file mode 100644
index 0000000..27128df
--- /dev/null
+++ b/jobd-2.conf
@@ -0,0 +1,35 @@
+host = 0.0.0.0
+port = 7080
+; password =
+name = worker-2
+
+master_host = 127.0.0.1
+master_port = 7081
+master_reconnect_timeout = 10
+
+; Don't do this! Here i put it to /tmp only because it was
+; for a test. In a real world you should use something more
+; appropriate, like /var/log
+log_file = /tmp/jobd-2.log
+log_level_file = warn
+log_level_console = warn
+
+mysql_host = 10.211.55.6
+mysql_port = 3306
+mysql_user = jobd
+mysql_password = password
+mysql_database = jobd
+mysql_table = jobs2
+mysql_fetch_limit = 10
+
+launcher = php /Users/ch1p/dev/jobd-php-example/src/launcher.php {id}
+launcher.cwd = /Users/ch1p/dev/jobd-php-example/src
+launcher.env.LC_ALL = en_US.UTF-8
+launcher.env.LANGUAGE = en_US.UTF-8
+launcher.env.LANG = en_US.UTF-8
+max_output_buffer = 16777216
+
+[targets]
+2/high = 10
+2/low = 10
+any = 5 \ No newline at end of file
diff --git a/jobd-master.conf b/jobd-master.conf
new file mode 100644
index 0000000..8220162
--- /dev/null
+++ b/jobd-master.conf
@@ -0,0 +1,13 @@
+host = 0.0.0.0
+port = 7081
+; password =
+
+ping_interval = 30
+poke_throttle_interval = 0.5
+
+; Don't do this! Here i put it to /tmp only because it was
+; for a test. In a real world you should use something more
+; appropriate, like /var/log
+log_file = /tmp/jobd-master.log
+log_level_file = warn
+log_level_console = warn \ No newline at end of file
diff --git a/schema.sql b/schema.sql
new file mode 100644
index 0000000..b278a0d
--- /dev/null
+++ b/schema.sql
@@ -0,0 +1,17 @@
+CREATE TABLE `jobs2` (
+ `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
+ `target` char(32) NOT NULL,
+ `name` char(64) NOT NULL,
+ `time_created` int(10) UNSIGNED NOT NULL DEFAULT 0,
+ `time_started` int(10) UNSIGNED NOT NULL DEFAULT 0,
+ `time_finished` int(10) UNSIGNED NOT NULL DEFAULT 0,
+ `status` enum('waiting', 'manual', 'accepted', 'running', 'done', 'ignored') NOT NULL DEFAULT 'waiting',
+ `result` enum('ok', 'fail') DEFAULT NULL,
+ `return_code` tinyint(3) UNSIGNED DEFAULT NULL,
+ `sig` char(10) DEFAULT NULL,
+ `input` mediumtext NOT NULL,
+ `stdout` mediumtext NOT NULL DEFAULT '',
+ `stderr` mediumtext NOT NULL DEFAULT '',
+ PRIMARY KEY (`id`),
+ KEY `select_for_target_priority_idx` (`target`, `status`, `id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ No newline at end of file
diff --git a/src/classes/Job.php b/src/classes/Job.php
new file mode 100644
index 0000000..4ecbf6c
--- /dev/null
+++ b/src/classes/Job.php
@@ -0,0 +1,51 @@
+<?php
+
+abstract class Job extends model {
+
+ // ENUM status
+ const STATUS_WAITING = 'waiting';
+ const STATUS_MANUAL = 'manual';
+ const STATUS_ACCEPTED = 'accepted';
+ const STATUS_IGNORED = 'ignored';
+ const STATUS_RUNNING = 'running';
+ const STATUS_DONE = 'done';
+
+ // ENUM result
+ const RESULT_OK = 'ok';
+ const RESULT_FAIL = 'fail';
+
+ const DB_TABLE = 'jobs';
+
+ protected static $Fields = [
+ 'id' => model::INTEGER,
+ 'target' => model::STRING,
+ 'name' => model::STRING,
+ 'time_created' => model::INTEGER,
+ 'time_started' => model::INTEGER,
+ 'time_finished' => model::INTEGER,
+ 'status' => model::STRING, // ENUM
+ 'result' => model::STRING, // ENUM
+ 'return_code' => model::INTEGER,
+ 'sig' => model::STRING,
+ 'stdout' => model::STRING,
+ 'stderr' => model::STRING,
+ 'input' => model::SERIALIZED,
+ ];
+
+ public $id;
+ public $target;
+ public $name;
+ public $timeCreated;
+ public $timeStarted;
+ public $timeFinished;
+ public $status;
+ public $result;
+ public $returnCode;
+ public $sig;
+ public $stdout;
+ public $stderr;
+ public $input;
+
+ abstract public function run();
+
+} \ No newline at end of file
diff --git a/src/classes/JobResult.php b/src/classes/JobResult.php
new file mode 100644
index 0000000..c42b6b0
--- /dev/null
+++ b/src/classes/JobResult.php
@@ -0,0 +1,96 @@
+<?php
+
+class JobResult {
+
+ /**
+ * @var string $result
+ */
+ protected $result;
+
+ /**
+ * @var int $returnCode
+ */
+ protected $returnCode;
+
+ /**
+ * @var string|null $signal
+ */
+ protected $signal;
+
+ /**
+ * @var string $stdout
+ */
+ protected $stdout;
+
+ /**
+ * @var string $stderr
+ */
+ protected $stderr;
+
+ /**
+ * @param string $result
+ * @param int $return_code
+ * @param string $stdout
+ * @param string $stderr
+ * @param null $signal
+ * @return $this
+ */
+ public function setResult(string $result,
+ int $return_code,
+ string $stdout,
+ string $stderr,
+ $signal = null): JobResult
+ {
+ $this->result = $result;
+ $this->returnCode = $return_code;
+ $this->stdout = $stdout;
+ $this->stderr = $stderr;
+ $this->signal = $signal;
+
+ return $this;
+ }
+
+ /**
+ * @param string $error
+ * @return $this
+ */
+ public function setError(string $error): JobResult
+ {
+ $this->result = Job::RESULT_FAIL;
+ $this->stderr = $error;
+
+ return $this;
+ }
+
+ /**
+ * @return bool
+ */
+ public function isFailed(): bool
+ {
+ return $this->result == Job::RESULT_FAIL;
+ }
+
+ /**
+ * @return string
+ */
+ public function getStdout(): string
+ {
+ return $this->stdout;
+ }
+
+ /**
+ * @return mixed|null
+ */
+ public function getStdoutAsJSON() {
+ $json = jsonDecode($this->stdout);
+ return $json ? $json : null;
+ }
+
+ /**
+ * @return string
+ */
+ public function getError(): string {
+ return $this->stderr ?? '';
+ }
+
+} \ No newline at end of file
diff --git a/src/classes/jobs.php b/src/classes/jobs.php
new file mode 100644
index 0000000..9acbc50
--- /dev/null
+++ b/src/classes/jobs.php
@@ -0,0 +1,275 @@
+<?php
+
+class jobs
+{
+
+ /**
+ * @var jobs_destructor $destructor_instance
+ */
+ private static $destructor_instance;
+
+ /**
+ * @var array<int, array> $new_jobs
+ */
+ private static $new_jobs = [];
+
+ /**
+ * Automatically poke master on exit.
+ */
+ public static function destruct()
+ {
+ if (!empty(self::$new_jobs)) {
+ $targets = [];
+ foreach (self::$new_jobs as $new_job) {
+ if ($new_job['status'] === Job::STATUS_WAITING)
+ $targets[$new_job['target']] = true;
+ }
+
+ if (!empty($targets)) {
+ $targets = array_keys($targets);
+ self::poke($targets);
+ }
+ }
+ }
+
+ /**
+ * Create job.
+ *
+ * @param int|string $target
+ * @param string $name
+ * @param array $data
+ * @param string $status
+ * @return int|string Job ID
+ */
+ public static function add($target, string $name, array $data = [], string $status = Job::STATUS_WAITING): int
+ {
+ if (is_null(self::$destructor_instance))
+ self::$destructor_instance = new jobs_destructor();
+
+ if (strpos($name, '\\') !== false) {
+ $pos = strrpos($name, '\\');
+ $name = substr($name, $pos + 1);
+ }
+
+ $db = getMySQL();
+ $db->insert(JOBD_TABLE, [
+ 'target' => $target,
+ 'name' => $name,
+ 'time_created' => time(),
+ 'input' => serialize($data),
+ 'status' => $status
+ ]);
+ $id = $db->insertId();
+
+ self::$new_jobs[$id] = [
+ 'target' => $target,
+ 'status' => $status
+ ];
+
+ return $id;
+ }
+
+ /**
+ * Create manual job.
+ *
+ * @param int|string $target
+ * @param string $name
+ * @param array $data
+ * @return int
+ */
+ public static function manual($target, string $name, array $data = []): int
+ {
+ return self::add($target, $name, $data, Job::STATUS_MANUAL);
+ }
+
+ /**
+ * Run jobs with given ids and status=Job::STATUS_MANUAL and wait for results.
+ *
+ * If only one job was given and it's failed, an Exception will be thrown!
+ * If multiple jobs were given and some of them failed, an array of JobResults will be returned.
+ *
+ * @param int|int[] $job_ids
+ * @return array<int, JobResult>|JobResult
+ * @throws Exception
+ */
+ public static function run($job_ids)
+ {
+ if (!is_array($job_ids))
+ $job_ids = [$job_ids];
+
+ $job_ids_orig = $job_ids;
+ $job_ids = array_flip($job_ids);
+
+ $jobs = [];
+
+ // look for the given jobs in self::$new_jobs
+ foreach (self::$new_jobs as $id => $new_job) {
+ if ($new_job['status'] == Job::STATUS_MANUAL && isset($job_ids[$id])) {
+ $jobs[] = ['id' => $id, 'target' => $new_job['target']];
+ unset($job_ids[$id]);
+ }
+ }
+
+ // if some (or all) jobs were not found in self::$new_jobs, get them from the database
+ if (!empty($job_ids)) {
+ $job_ids = array_keys($job_ids);
+
+ $db = getMySQL();
+ $q = $db->query("SELECT id, target, status AS target FROM ".JOBD_TABLE." WHERE id IN (".implode(',', $job_ids).")");
+ $job_ids = array_flip($job_ids);
+
+ while ($row = $db->fetch($q)) {
+ // only manual jobs are allowed
+ if ($row['status'] != Job::STATUS_MANUAL)
+ throw new Exception("job id=${row['id']} has status = {$row['status']} != manual");
+
+ $jobs[] = [
+ 'id' => (int)$row['id'],
+ 'target' => $row['target']
+ ];
+
+ unset($job_ids[$row['id']]);
+ }
+
+ $q->free();
+
+ // we were given invalid ids, it seems. throw an exception and don't continue
+ if (!empty($job_ids))
+ throw new Exception("jobs with id ".implode(', ', array_keys($job_ids))." not found");
+ }
+
+ // connect to master and send run-manual request
+ $client = getJobdMaster();
+ $response = $client->runManual($jobs);
+
+ // master request failed
+ if (($error = $response->getError()) !== null)
+ throw new Exception("jobd returned error: ".$error);
+
+ // at this point, jobd-master request succeeded
+ // doesn't mean our jobs were successfully accepted and executed by workers,
+ // but at least we have some results
+
+ /**
+ * @var array<int, JobResult> $results
+ */
+ $results = [];
+ $data = $response->getData();
+
+ $client->close();
+
+ // collect results, successes and failures
+ if (!empty($data['jobs'])) {
+ foreach ($data['jobs'] as $job_id => $job_result_raw) {
+ $job_result = (new JobResult())->setResult(
+ $job_result_raw['result'],
+ $job_result_raw['code'],
+ $job_result_raw['stdout'],
+ $job_result_raw['stderr'],
+ $job_result_raw['signal']
+ );
+ $results[$job_id] = $job_result;
+ }
+ }
+ if (!empty($data['errors'])) {
+ foreach ($data['errors'] as $job_id => $job_result_raw) {
+ $job_result = (new JobResult())->setError($job_result_raw);
+ $results[$job_id] = $job_result;
+ }
+ }
+
+ // remove jobs from self::$new_jobs
+ foreach ($job_ids_orig as $id) {
+ if (isset(self::$new_jobs[$id]))
+ unset(self::$new_jobs[$id]);
+ }
+
+ // if the $job_ids arguments wasn't an array, return the JobResult instance
+ if (count($job_ids_orig) === 1 && count($results) === 1) {
+ $result = reset($results);
+ if ($result->isFailed())
+ throw new Exception($result->getError());
+ return $result;
+ }
+
+ // otherwise, return array of JobResult instances
+ return $results;
+ }
+
+ /**
+ * @param string|string[] $targets
+ */
+ public static function poke($targets)
+ {
+
+ $client = getJobdMaster();
+
+ if (!is_array($targets))
+ $targets = [$targets];
+
+ $client->poke($targets);
+ $targets = array_flip(array_unique($targets));
+
+ // remove poked targets from self::$new_jobs to avoid meaninglessly duplicating this poke from the destructor
+ if (!empty(self::$new_jobs)) {
+ foreach (self::$new_jobs as $new_job_id => $new_job) {
+ if ($new_job['status'] == Job::STATUS_WAITING && isset($targets[$new_job['target']]))
+ unset(self::$new_jobs[$new_job_id]);
+ }
+ }
+
+ $client->close();
+ return true;
+ }
+
+ /**
+ * @param int $id
+ * @return array
+ */
+ public static function get(int $id)
+ {
+ $db = getMySQL();
+ $q = $db->query("SELECT * FROM ".JOBD_TABLE." WHERE id=?", $id);
+ return $db->fetch($q);
+ }
+
+ /**
+ * Delete old succeeded jobs.
+ */
+ public static function cleanup()
+ {
+ $db = getMySQL();
+ $db->query("DELETE FROM ".JOBD_TABLE." WHERE status='done' AND result='ok' AND time_finished < ?",
+ time() - 86400);
+ }
+
+}
+
+
+class job_target
+{
+
+ const any = "any";
+
+ public static function high(int $server): string
+ {
+ return "$server/high";
+ }
+
+ public static function low(int $server): string
+ {
+ return "$server/low";
+ }
+
+}
+
+
+class jobs_destructor
+{
+
+ public function __destruct()
+ {
+ jobs::destruct();
+ }
+
+}
diff --git a/src/classes/model.php b/src/classes/model.php
new file mode 100644
index 0000000..ff44361
--- /dev/null
+++ b/src/classes/model.php
@@ -0,0 +1,149 @@
+<?php
+
+abstract class model {
+
+ const DB_TABLE = null;
+ const DB_KEY = 'id';
+
+ const STRING = 0;
+ const INTEGER = 1;
+ const FLOAT = 2;
+ const ARRAY = 3;
+ const BOOLEAN = 4;
+ const JSON = 5;
+ const SERIALIZED = 6;
+
+ protected static $Fields = [];
+
+ public static function create_instance(...$args) {
+ $cl = get_called_class();
+ return new $cl(...$args);
+ }
+
+ public function __construct($raw) {
+ foreach (static::$Fields as $name => $type)
+ $this->{toCamelCase($name)} = self::cast_to_type($type, $raw[$name]);
+
+ if (is_null(static::DB_TABLE))
+ trigger_error('class '.get_class($this).' doesn\'t have DB_TABLE defined');
+ }
+
+ /**
+ * @param $fields
+ */
+ public function edit($fields) {
+ $db = getMySQL();
+
+ $save = [];
+ foreach ($fields as $name => $value) {
+ switch (static::$Fields[$name]) {
+ case self::ARRAY:
+ if (is_array($value)) {
+ $fields[$name] = implode(',', $value);
+ $save[$name] = $value;
+ }
+ break;
+
+ case self::INTEGER:
+ $value = (int)$value;
+ $fields[$name] = $value;
+ $save[$name] = $value;
+ break;
+
+ case self::FLOAT:
+ $value = (float)$value;
+ $fields[$name] = $value;
+ $save[$name] = $value;
+ break;
+
+ case self::BOOLEAN:
+ $fields[$name] = $value ? 1 : 0;
+ $save[$name] = $value;
+ break;
+
+ case self::JSON:
+ $fields[$name] = jsonEncode($value);
+ $save[$name] = $value;
+ break;
+
+ case self::SERIALIZED:
+ $fields[$name] = serialize($value);
+ $save[$name] = $value;
+ break;
+
+ default:
+ $value = (string)$value;
+ $fields[$name] = $value;
+ $save[$name] = $value;
+ break;
+ }
+ }
+
+ if (!$db->update(static::DB_TABLE, $fields, static::DB_KEY."=?", $this->get_id())) {
+ //debugError(__METHOD__.': failed to update database');
+ return;
+ }
+
+ foreach ($save as $name => $value)
+ $this->{toCamelCase($name)} = $value;
+ }
+
+ /**
+ * @return int
+ */
+ public function get_id() {
+ return $this->{toCamelCase(static::DB_KEY)};
+ }
+
+ /**
+ * @param array $fields
+ * @param array $custom_getters
+ * @return array
+ */
+ public function as_array(array $fields = [], array $custom_getters = []) {
+ if (empty($fields))
+ $fields = array_keys(static::$Fields);
+
+ $array = [];
+ foreach ($fields as $field) {
+ if (isset($custom_getters[$field]) && is_callable($custom_getters[$field])) {
+ $array[$field] = $custom_getters[$field]();
+ } else {
+ $array[$field] = $this->{toCamelCase($field)};
+ }
+ }
+
+ return $array;
+ }
+
+ /**
+ * @param $type
+ * @param $value
+ * @return array|bool|false|float|int|string
+ */
+ protected static function cast_to_type($type, $value) {
+ switch ($type) {
+ case self::BOOLEAN:
+ return (bool)$value;
+
+ case self::INTEGER:
+ return (int)$value;
+
+ case self::FLOAT:
+ return (float)$value;
+
+ case self::ARRAY:
+ return array_filter(explode(',', $value));
+
+ case self::JSON:
+ return jsonDecode($value);
+
+ case self::SERIALIZED:
+ return unserialize($value);
+
+ default:
+ return (string)$value;
+ }
+ }
+
+}
diff --git a/src/classes/mysql.php b/src/classes/mysql.php
new file mode 100644
index 0000000..710390f
--- /dev/null
+++ b/src/classes/mysql.php
@@ -0,0 +1,166 @@
+<?php
+
+class mysql
+{
+
+ /** @var mysqli $link */
+ private $link = null;
+
+ public function __construct(string $host, string $user, string $password, string $name)
+ {
+ $this->link = new mysqli();
+ if (!$this->link->real_connect($host, $user, $password, $name)) {
+ $this->link = null;
+ throw new Exception('Could not connect to MySQL');
+ }
+ }
+
+ public function __destruct()
+ {
+ if ($this->link)
+ $this->link->close();
+ }
+
+ public function __get($k)
+ {
+ if ($k == 'error') {
+ return $this->link->error;
+ }
+ return $this->$k;
+ }
+
+ public function query(string $sql)
+ {
+ if (func_num_args() > 1) {
+ $mark_count = substr_count($sql, '?');
+ $positions = array();
+ $last_pos = -1;
+ for ($i = 0; $i < $mark_count; $i++) {
+ $last_pos = strpos($sql, '?', $last_pos + 1);
+ $positions[] = $last_pos;
+ }
+ for ($i = $mark_count - 1; $i >= 0; $i--) {
+ $arg_val = func_get_arg($i + 1);
+ if (is_null($arg_val)) {
+ $v = 'NULL';
+ } else {
+ $v = '\'' . $this->escape($arg_val) . '\'';
+ }
+ $sql = substr_replace($sql, $v, $positions[$i], 1);
+ }
+ }
+
+ $q = $this->link->query($sql);
+ if (!$q) {
+ $error = $this->link->error;
+ trigger_error($error, E_USER_WARNING);
+ return false;
+ }
+
+ return $q;
+ }
+
+ public function insert(string $table, array $fields)
+ {
+ return $this->performInsert('INSERT', $table, $fields);
+ }
+
+ public function replace(string $table, array $fields)
+ {
+ return $this->performInsert('REPLACE', $table, $fields);
+ }
+
+ protected function performInsert(string $command, string $table, array $fields)
+ {
+ $names = [];
+ $values = [];
+ $count = 0;
+ foreach ($fields as $k => $v) {
+ $names[] = $k;
+ $values[] = $v;
+ $count++;
+ }
+
+ $sql = "{$command} INTO `{$table}` (`" . implode('`, `', $names) . "`) VALUES (" . implode(', ', array_fill(0, $count, '?')) . ")";
+ array_unshift($values, $sql);
+
+ return call_user_func_array([$this, 'query'], $values);
+ }
+
+ public function multipleInsert(string $table, array $rows)
+ {
+ return $this->performMultipleInsert('INSERT', $table, $rows);
+ }
+
+ public function multipleReplace(string $table, array $rows)
+ {
+ return $this->performMultipleInsert('REPLACE', $table, $rows);
+ }
+
+ protected function performMultipleInsert(string $command, string $table, array $rows)
+ {
+ $names = [];
+ $sql_rows = [];
+ foreach ($rows as $i => $fields) {
+ $row_values = [];
+ foreach ($fields as $field_name => $field_val) {
+ if ($i == 0) {
+ $names[] = $field_name;
+ }
+ $row_values[] = $this->escape($field_val);
+ }
+ $sql_rows[] = "('" . implode("', '", $row_values) . "')";
+ }
+
+ $sql = "{$command} INTO `{$table}` (`" . implode('`, `', $names) . "`) VALUES " . implode(', ', $sql_rows);
+ return $this->query($sql);
+ }
+
+ public function update(string $table, arrow $rows, string ...$cond)
+ {
+ $fields = [];
+ $args = [];
+ foreach ($rows as $row_name => $row_value) {
+ $fields[] = "`{$row_name}`=?";
+ $args[] = $row_value;
+ }
+ $sql = "UPDATE `$table` SET " . implode(', ', $fields);
+ if (!empty($cond)) {
+ $sql .= " WHERE " . $cond[0];
+ if (count($cond) > 1)
+ $args = array_merge($args, array_slice($cond, 1));
+ }
+ return $this->query($sql, ...$args);
+ }
+
+ public function fetch(mysqli_result $q)
+ {
+ $row = $q->fetch_assoc();
+ if (!$row) {
+ $q->free();
+ return false;
+ }
+ return $row;
+ }
+
+ public function result(mysqli_result $q, int $field = 0)
+ {
+ return $q ? $q->fetch_row()[$field] : false;
+ }
+
+ public function insertId()
+ {
+ return $this->link->insert_id;
+ }
+
+ public function numRows(mysqli_result $query): int
+ {
+ return $query->num_rows;
+ }
+
+ public function escape(string $s): string
+ {
+ return $this->link->real_escape_string($s);
+ }
+
+}
diff --git a/src/cron/jobs-cleanup.php b/src/cron/jobs-cleanup.php
new file mode 100644
index 0000000..3e01d34
--- /dev/null
+++ b/src/cron/jobs-cleanup.php
@@ -0,0 +1,5 @@
+<?php
+
+require __DIR__.'/../init.php';
+
+jobs::cleanup(); \ No newline at end of file
diff --git a/src/functions.php b/src/functions.php
new file mode 100644
index 0000000..9558dd9
--- /dev/null
+++ b/src/functions.php
@@ -0,0 +1,47 @@
+<?php
+
+function jsonEncode($obj) {
+ return json_encode($obj, JSON_UNESCAPED_UNICODE);
+}
+
+function jsonDecode($json) {
+ return json_decode($json, true);
+}
+
+function toCamelCase(string $input, string $separator = '_'): string {
+ return lcfirst(str_replace($separator, '', ucwords($input, $separator)));
+}
+
+
+/* Connection helpers */
+
+function getMySQL(): mysql {
+ static $link = null;
+ if (is_null($link))
+ $link = new mysql(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DB);
+ return $link;
+}
+
+function getJobdMaster(): jobd\MasterClient {
+ return new jobd\MasterClient(JOBD_PORT, JOBD_HOST, JOBD_PASSWORD);
+}
+
+
+/* Command line helpers */
+
+function green(string $s): string {
+ return "\033[32m$s\033[0m";
+}
+
+function yellow(string $s): string {
+ return "\033[33m$s\033[0m";
+}
+
+function red(string $s): string {
+ return "\033[31m$s\033[0m";
+}
+
+function input(string $prompt): string {
+ echo $prompt;
+ return substr(fgets(STDIN), 0, -1);
+}
diff --git a/src/init.php b/src/init.php
new file mode 100644
index 0000000..0c18408
--- /dev/null
+++ b/src/init.php
@@ -0,0 +1,32 @@
+<?php
+
+require __DIR__.'/../vendor/autoload.php';
+
+error_reporting(E_ALL);
+ini_set('display_errors', 1);
+
+define('MYSQL_HOST', '10.211.55.6');
+define('MYSQL_USER', 'jobd');
+define('MYSQL_PASSWORD', 'password');
+define('MYSQL_DB', 'jobd');
+
+define('JOBD_TABLE', 'jobs2');
+define('JOBD_HOST', '127.0.0.1');
+define('JOBD_PORT', jobd\Client::MASTER_PORT);
+define('JOBD_PASSWORD', '');
+
+spl_autoload_register(function($class) {
+ if (strpos($class, '\\') !== false) {
+ $class = str_replace('\\', '/', $class);
+ $root = __DIR__;
+ } else {
+ $root = __DIR__.'/classes';
+ }
+
+ $path = $root.'/'.$class.'.php';
+
+ if (is_file($path))
+ require_once $path;
+});
+
+include __DIR__.'/functions.php'; \ No newline at end of file
diff --git a/src/jobs/CreateFile.php b/src/jobs/CreateFile.php
new file mode 100644
index 0000000..439925f
--- /dev/null
+++ b/src/jobs/CreateFile.php
@@ -0,0 +1,15 @@
+<?php
+
+namespace jobs;
+
+class CreateFile extends \Job
+{
+
+ public function run()
+ {
+ $file = $this->input['file'];
+ if (!touch($file))
+ throw new \Exception("failed to touch file '".$file."'");
+ }
+
+}
diff --git a/src/jobs/Hello.php b/src/jobs/Hello.php
new file mode 100644
index 0000000..4fa2446
--- /dev/null
+++ b/src/jobs/Hello.php
@@ -0,0 +1,15 @@
+<?php
+
+namespace jobs;
+
+class Hello extends \Job
+{
+
+ public function run()
+ {
+ $greetings = "Hello, ".($this->input['name'] ?? 'noname').".\n";
+ $greetings .= "I'm writing you from ".__METHOD__.", my PID is ".getmypid()." and I'm executing job #".$this->id.".";
+ echo jsonEncode(['response' => $greetings]);
+ }
+
+}
diff --git a/src/launcher.php b/src/launcher.php
new file mode 100644
index 0000000..d1b9892
--- /dev/null
+++ b/src/launcher.php
@@ -0,0 +1,31 @@
+<?php
+
+require_once __DIR__.'/init.php';
+
+set_time_limit(0);
+$job = null;
+
+register_shutdown_function(function() {
+ global $job;
+ if ($job !== true)
+ exit(1);
+});
+
+$job_id = $argv[1] ?? null;
+
+$job_raw = jobs::get($job_id);
+if (!$job_raw)
+ throw new InvalidArgumentException("job $job_id not found");
+
+$class_name = "jobs\\{$job_raw['name']}";
+$job = new $class_name($job_raw);
+if ($job->status != Job::STATUS_RUNNING)
+ throw new RuntimeException("job status is {$job->status}");
+
+try {
+ if ($job->run() !== false)
+ $job = true;
+} catch (Exception $e) {
+ fprintf(STDERR, $e.'');
+ exit(1);
+} \ No newline at end of file
diff --git a/src/main.php b/src/main.php
new file mode 100644
index 0000000..60779e5
--- /dev/null
+++ b/src/main.php
@@ -0,0 +1,93 @@
+<?php
+
+require __DIR__.'/init.php';
+
+if ($argc < 2) {
+ echo <<<EOF
+Usage: {$argv[0]} COMMAND
+
+Commands:
+ test
+ hello
+ createfile
+
+EOF;
+ exit;
+}
+
+$cmd = $argv[1];
+$func = "cmd_{$cmd}";
+if (!function_exists($func)) {
+ echo red("command '".$cmd."' is not implement")."\n";
+ exit(1);
+}
+
+call_user_func($func);
+
+
+/** Commands */
+
+function cmd_test() {
+ // MySQL
+ try {
+ $db = getMySQL();
+ $jobs_count = $db->result($db->query("SELECT COUNT(*) FROM ".JOBD_TABLE));
+ } catch (Exception $e) {
+ echo red("MySQL connection failed")."\n";
+ exit(1);
+ }
+ echo green("MySQL OK")."\n";
+
+ // jobd
+ try {
+ $jobd = getJobdMaster();
+ $status = $jobd->status(true);
+ $workers_count = count($status->getData()['workers']);
+ if ($workers_count == 2) {
+ echo green("jobd-master and jobd OK");
+ } else {
+ $message = "jobd-master OK, but ";
+ $message .= $workers_count == 1 ? "only 1 worker is connected" : "no workers are connected";
+ echo yellow($message);
+ }
+ echo "\n";
+ } catch (Exception $e) {
+ echo red("jobd-master connection failed: ".$e->getMessage())."\n";
+ exit(1);
+ }
+}
+
+function cmd_hello() {
+ $myname = input('Enter your name: ');
+ try {
+ $job_ids = [];
+ $job_server_map = [];
+
+ for ($server = 1; $server <= 2; $server++) {
+ $id = jobs::manual(job_target::high($server), jobs\Hello::class, ['name' => $myname]);
+ $job_server_map[$id] = $server;
+ $job_ids[] = $id;
+ }
+
+ $results = jobs::run($job_ids);
+ foreach ($results as $job_id => $job_result) {
+ $server = $job_server_map[$job_id];
+ echo "> server {$server}:\n";
+ if ($job_result->isFailed()) {
+ echo red("failed")."\n";
+ } else {
+ echo green($job_result->getStdoutAsJSON()['response'])."\n";
+ }
+ echo "\n";
+ }
+
+ } catch (Exception $e) {
+ echo red("error: ".$e->getMessage())."\n";
+ exit(1);
+ }
+}
+
+function cmd_createfile() {
+ $file = input('Enter file name: ');
+ jobs::add(job_target::any, jobs\CreateFile::class, ['file' => $file]);
+} \ No newline at end of file