From 9a98ac50ff50dda2f2eed1ea825352c50c64440e Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Fri, 7 May 2021 23:39:20 +0300 Subject: initial --- .gitignore | 2 + LICENSE | 21 ++++ README.md | 81 ++++++++++++++ composer.json | 17 +++ composer.lock | 59 ++++++++++ jobd-1.conf | 35 ++++++ jobd-2.conf | 35 ++++++ jobd-master.conf | 13 +++ schema.sql | 17 +++ src/classes/Job.php | 51 +++++++++ src/classes/JobResult.php | 96 ++++++++++++++++ src/classes/jobs.php | 275 ++++++++++++++++++++++++++++++++++++++++++++++ src/classes/model.php | 149 +++++++++++++++++++++++++ src/classes/mysql.php | 166 ++++++++++++++++++++++++++++ src/cron/jobs-cleanup.php | 5 + src/functions.php | 47 ++++++++ src/init.php | 32 ++++++ src/jobs/CreateFile.php | 15 +++ src/jobs/Hello.php | 15 +++ src/launcher.php | 31 ++++++ src/main.php | 93 ++++++++++++++++ 21 files changed, 1255 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 composer.json create mode 100644 composer.lock create mode 100644 jobd-1.conf create mode 100644 jobd-2.conf create mode 100644 jobd-master.conf create mode 100644 schema.sql create mode 100644 src/classes/Job.php create mode 100644 src/classes/JobResult.php create mode 100644 src/classes/jobs.php create mode 100644 src/classes/model.php create mode 100644 src/classes/mysql.php create mode 100644 src/cron/jobs-cleanup.php create mode 100644 src/functions.php create mode 100644 src/init.php create mode 100644 src/jobs/CreateFile.php create mode 100644 src/jobs/Hello.php create mode 100644 src/launcher.php create mode 100644 src/main.php diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..45b1244 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea +/vendor \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..8362f72 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (C) 2021 Evgeny Zinoviev + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e1c64a5 --- /dev/null +++ b/README.md @@ -0,0 +1,81 @@ +# jobd-php-example + +This repository contains example of PHP integration with jobd. The code is +mainly an excerpt from a real existent PHP application, with some changes. + +To launch this example and see how it works, you will need to set up a jobd-master +instance along with two jobd worker instances. + +It was written and tested on my local machine with PHP 7.3, so I'm sharing all +my configs as is. Don't forget to replace values, such as IP addresses, +usernames, passwords and so on, with yours, and generally adjust it to your needs. + +## Configuration + +### jobd + +jobd configs are included in the repo: [`jobd-1.conf`](jobd-1.conf), +[`jobd-2.conf`](jobd-2.conf), [`jobd-master.conf`](jobd-master.conf). + +### MySQL + +[`schema.sql`](schema.sql) contains schema of MySQL table used in the example. + +### Runtime + +For the sake of simplicity, runtime configuration (such as MySQL credentials) +is stored in [`init.php`](src/init.php) as global constants. Adjust to your needs. + +## Usage + +1. Make sure **MySQL server** is running. + +2. Start **jobd-master** and two **jobd** instances: + + ``` + jobd-master --config jobd-master.conf + jobd --config jobd-1.conf + jobd --config jobd-2.conf + ``` + +3. Install dependencies with composer: + ``` + composer install + ``` + +4. Test configuration: + ``` + php src/main.php test + ``` + + This command will test MySQL and jobd connection. + + You can also print the list of workers by executing: + ``` + jobctl --master list-workers + ``` + +5. Launch test jobs: + ``` + php src/main.php hello + ``` + + This will launch two [`Hello`](src/jobs/Hello.php) jobs, wait for results + and print them. + +6. Launch another test job. [This one](src/jobs/CreateFile.php) will run in + background. It just creates a file with the name you give it. Not like + it's anything useful, but it's for the demo. + ``` + php src/main.php createfile + ``` + + Note that if the path your specify is not absolute, it will be relative to + the jobd's working directory, specified `launcher.cwd` config option. + + If it fails, just look into the MySQL table, there must be some error. + + +## License + +MIT \ No newline at end of file diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..eecd17d --- /dev/null +++ b/composer.json @@ -0,0 +1,17 @@ +{ + "name": "ch1p/jobd-php-example", + "description": "example usage of jobd and it's PHP client", + "minimum-stability": "stable", + "license": "MIT", + "authors": [ + { + "name": "Evgeny Zinoviev", + "email": "me@ch1p.io" + } + ], + "require": { + "ch1p/jobd-client": "^1.5", + "ext-json": "*", + "ext-mysqli": "*" + } +} diff --git a/composer.lock b/composer.lock new file mode 100644 index 0000000..e976166 --- /dev/null +++ b/composer.lock @@ -0,0 +1,59 @@ +{ + "_readme": [ + "This file locks the dependencies of your project to a known state", + "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", + "This file is @generated automatically" + ], + "content-hash": "96d78551c8c7b7a22cb20daf8025e024", + "packages": [ + { + "name": "ch1p/jobd-client", + "version": "1.5.2", + "source": { + "type": "git", + "url": "https://github.com/gch1p/php-jobd-client.git", + "reference": "a3bb10feaea28fd54866dc9c4311a247a84a202c" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/gch1p/php-jobd-client/zipball/a3bb10feaea28fd54866dc9c4311a247a84a202c", + "reference": "a3bb10feaea28fd54866dc9c4311a247a84a202c", + "shasum": "" + }, + "require": { + "ext-json": "*", + "ext-sockets": "*", + "php": ">=7.0" + }, + "type": "library", + "autoload": { + "psr-4": { + "jobd\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-2-Clause" + ], + "keywords": [ + "job", + "jobd", + "queue" + ], + "support": { + "issues": "https://github.com/gch1p/php-jobd-client/issues", + "source": "https://github.com/gch1p/php-jobd-client/tree/v1.5.2" + }, + "time": "2021-03-15T22:02:40+00:00" + } + ], + "packages-dev": [], + "aliases": [], + "minimum-stability": "stable", + "stability-flags": [], + "prefer-stable": false, + "prefer-lowest": false, + "platform": [], + "platform-dev": [], + "plugin-api-version": "2.0.0" +} diff --git a/jobd-1.conf b/jobd-1.conf new file mode 100644 index 0000000..88bac2e --- /dev/null +++ b/jobd-1.conf @@ -0,0 +1,35 @@ +host = 0.0.0.0 +port = 7079 +; password = +name = worker-1 + +master_host = 127.0.0.1 +master_port = 7081 +master_reconnect_timeout = 10 + +; Don't do this! Here i put it to /tmp only because it was +; for a test. In a real world you should use something more +; appropriate, like /var/log +log_file = /tmp/jobd-1.log +log_level_file = warn +log_level_console = warn + +mysql_host = 10.211.55.6 +mysql_port = 3306 +mysql_user = jobd +mysql_password = password +mysql_database = jobd +mysql_table = jobs2 +mysql_fetch_limit = 10 + +launcher = php /Users/ch1p/dev/jobd-php-example/src/launcher.php {id} +launcher.cwd = /Users/ch1p/dev/jobd-php-example/src +launcher.env.LC_ALL = en_US.UTF-8 +launcher.env.LANGUAGE = en_US.UTF-8 +launcher.env.LANG = en_US.UTF-8 +max_output_buffer = 16777216 + +[targets] +1/high = 10 +1/low = 10 +any = 5 diff --git a/jobd-2.conf b/jobd-2.conf new file mode 100644 index 0000000..27128df --- /dev/null +++ b/jobd-2.conf @@ -0,0 +1,35 @@ +host = 0.0.0.0 +port = 7080 +; password = +name = worker-2 + +master_host = 127.0.0.1 +master_port = 7081 +master_reconnect_timeout = 10 + +; Don't do this! Here i put it to /tmp only because it was +; for a test. In a real world you should use something more +; appropriate, like /var/log +log_file = /tmp/jobd-2.log +log_level_file = warn +log_level_console = warn + +mysql_host = 10.211.55.6 +mysql_port = 3306 +mysql_user = jobd +mysql_password = password +mysql_database = jobd +mysql_table = jobs2 +mysql_fetch_limit = 10 + +launcher = php /Users/ch1p/dev/jobd-php-example/src/launcher.php {id} +launcher.cwd = /Users/ch1p/dev/jobd-php-example/src +launcher.env.LC_ALL = en_US.UTF-8 +launcher.env.LANGUAGE = en_US.UTF-8 +launcher.env.LANG = en_US.UTF-8 +max_output_buffer = 16777216 + +[targets] +2/high = 10 +2/low = 10 +any = 5 \ No newline at end of file diff --git a/jobd-master.conf b/jobd-master.conf new file mode 100644 index 0000000..8220162 --- /dev/null +++ b/jobd-master.conf @@ -0,0 +1,13 @@ +host = 0.0.0.0 +port = 7081 +; password = + +ping_interval = 30 +poke_throttle_interval = 0.5 + +; Don't do this! Here i put it to /tmp only because it was +; for a test. In a real world you should use something more +; appropriate, like /var/log +log_file = /tmp/jobd-master.log +log_level_file = warn +log_level_console = warn \ No newline at end of file diff --git a/schema.sql b/schema.sql new file mode 100644 index 0000000..b278a0d --- /dev/null +++ b/schema.sql @@ -0,0 +1,17 @@ +CREATE TABLE `jobs2` ( + `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT, + `target` char(32) NOT NULL, + `name` char(64) NOT NULL, + `time_created` int(10) UNSIGNED NOT NULL DEFAULT 0, + `time_started` int(10) UNSIGNED NOT NULL DEFAULT 0, + `time_finished` int(10) UNSIGNED NOT NULL DEFAULT 0, + `status` enum('waiting', 'manual', 'accepted', 'running', 'done', 'ignored') NOT NULL DEFAULT 'waiting', + `result` enum('ok', 'fail') DEFAULT NULL, + `return_code` tinyint(3) UNSIGNED DEFAULT NULL, + `sig` char(10) DEFAULT NULL, + `input` mediumtext NOT NULL, + `stdout` mediumtext NOT NULL DEFAULT '', + `stderr` mediumtext NOT NULL DEFAULT '', + PRIMARY KEY (`id`), + KEY `select_for_target_priority_idx` (`target`, `status`, `id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ No newline at end of file diff --git a/src/classes/Job.php b/src/classes/Job.php new file mode 100644 index 0000000..4ecbf6c --- /dev/null +++ b/src/classes/Job.php @@ -0,0 +1,51 @@ + model::INTEGER, + 'target' => model::STRING, + 'name' => model::STRING, + 'time_created' => model::INTEGER, + 'time_started' => model::INTEGER, + 'time_finished' => model::INTEGER, + 'status' => model::STRING, // ENUM + 'result' => model::STRING, // ENUM + 'return_code' => model::INTEGER, + 'sig' => model::STRING, + 'stdout' => model::STRING, + 'stderr' => model::STRING, + 'input' => model::SERIALIZED, + ]; + + public $id; + public $target; + public $name; + public $timeCreated; + public $timeStarted; + public $timeFinished; + public $status; + public $result; + public $returnCode; + public $sig; + public $stdout; + public $stderr; + public $input; + + abstract public function run(); + +} \ No newline at end of file diff --git a/src/classes/JobResult.php b/src/classes/JobResult.php new file mode 100644 index 0000000..c42b6b0 --- /dev/null +++ b/src/classes/JobResult.php @@ -0,0 +1,96 @@ +result = $result; + $this->returnCode = $return_code; + $this->stdout = $stdout; + $this->stderr = $stderr; + $this->signal = $signal; + + return $this; + } + + /** + * @param string $error + * @return $this + */ + public function setError(string $error): JobResult + { + $this->result = Job::RESULT_FAIL; + $this->stderr = $error; + + return $this; + } + + /** + * @return bool + */ + public function isFailed(): bool + { + return $this->result == Job::RESULT_FAIL; + } + + /** + * @return string + */ + public function getStdout(): string + { + return $this->stdout; + } + + /** + * @return mixed|null + */ + public function getStdoutAsJSON() { + $json = jsonDecode($this->stdout); + return $json ? $json : null; + } + + /** + * @return string + */ + public function getError(): string { + return $this->stderr ?? ''; + } + +} \ No newline at end of file diff --git a/src/classes/jobs.php b/src/classes/jobs.php new file mode 100644 index 0000000..9acbc50 --- /dev/null +++ b/src/classes/jobs.php @@ -0,0 +1,275 @@ + $new_jobs + */ + private static $new_jobs = []; + + /** + * Automatically poke master on exit. + */ + public static function destruct() + { + if (!empty(self::$new_jobs)) { + $targets = []; + foreach (self::$new_jobs as $new_job) { + if ($new_job['status'] === Job::STATUS_WAITING) + $targets[$new_job['target']] = true; + } + + if (!empty($targets)) { + $targets = array_keys($targets); + self::poke($targets); + } + } + } + + /** + * Create job. + * + * @param int|string $target + * @param string $name + * @param array $data + * @param string $status + * @return int|string Job ID + */ + public static function add($target, string $name, array $data = [], string $status = Job::STATUS_WAITING): int + { + if (is_null(self::$destructor_instance)) + self::$destructor_instance = new jobs_destructor(); + + if (strpos($name, '\\') !== false) { + $pos = strrpos($name, '\\'); + $name = substr($name, $pos + 1); + } + + $db = getMySQL(); + $db->insert(JOBD_TABLE, [ + 'target' => $target, + 'name' => $name, + 'time_created' => time(), + 'input' => serialize($data), + 'status' => $status + ]); + $id = $db->insertId(); + + self::$new_jobs[$id] = [ + 'target' => $target, + 'status' => $status + ]; + + return $id; + } + + /** + * Create manual job. + * + * @param int|string $target + * @param string $name + * @param array $data + * @return int + */ + public static function manual($target, string $name, array $data = []): int + { + return self::add($target, $name, $data, Job::STATUS_MANUAL); + } + + /** + * Run jobs with given ids and status=Job::STATUS_MANUAL and wait for results. + * + * If only one job was given and it's failed, an Exception will be thrown! + * If multiple jobs were given and some of them failed, an array of JobResults will be returned. + * + * @param int|int[] $job_ids + * @return array|JobResult + * @throws Exception + */ + public static function run($job_ids) + { + if (!is_array($job_ids)) + $job_ids = [$job_ids]; + + $job_ids_orig = $job_ids; + $job_ids = array_flip($job_ids); + + $jobs = []; + + // look for the given jobs in self::$new_jobs + foreach (self::$new_jobs as $id => $new_job) { + if ($new_job['status'] == Job::STATUS_MANUAL && isset($job_ids[$id])) { + $jobs[] = ['id' => $id, 'target' => $new_job['target']]; + unset($job_ids[$id]); + } + } + + // if some (or all) jobs were not found in self::$new_jobs, get them from the database + if (!empty($job_ids)) { + $job_ids = array_keys($job_ids); + + $db = getMySQL(); + $q = $db->query("SELECT id, target, status AS target FROM ".JOBD_TABLE." WHERE id IN (".implode(',', $job_ids).")"); + $job_ids = array_flip($job_ids); + + while ($row = $db->fetch($q)) { + // only manual jobs are allowed + if ($row['status'] != Job::STATUS_MANUAL) + throw new Exception("job id=${row['id']} has status = {$row['status']} != manual"); + + $jobs[] = [ + 'id' => (int)$row['id'], + 'target' => $row['target'] + ]; + + unset($job_ids[$row['id']]); + } + + $q->free(); + + // we were given invalid ids, it seems. throw an exception and don't continue + if (!empty($job_ids)) + throw new Exception("jobs with id ".implode(', ', array_keys($job_ids))." not found"); + } + + // connect to master and send run-manual request + $client = getJobdMaster(); + $response = $client->runManual($jobs); + + // master request failed + if (($error = $response->getError()) !== null) + throw new Exception("jobd returned error: ".$error); + + // at this point, jobd-master request succeeded + // doesn't mean our jobs were successfully accepted and executed by workers, + // but at least we have some results + + /** + * @var array $results + */ + $results = []; + $data = $response->getData(); + + $client->close(); + + // collect results, successes and failures + if (!empty($data['jobs'])) { + foreach ($data['jobs'] as $job_id => $job_result_raw) { + $job_result = (new JobResult())->setResult( + $job_result_raw['result'], + $job_result_raw['code'], + $job_result_raw['stdout'], + $job_result_raw['stderr'], + $job_result_raw['signal'] + ); + $results[$job_id] = $job_result; + } + } + if (!empty($data['errors'])) { + foreach ($data['errors'] as $job_id => $job_result_raw) { + $job_result = (new JobResult())->setError($job_result_raw); + $results[$job_id] = $job_result; + } + } + + // remove jobs from self::$new_jobs + foreach ($job_ids_orig as $id) { + if (isset(self::$new_jobs[$id])) + unset(self::$new_jobs[$id]); + } + + // if the $job_ids arguments wasn't an array, return the JobResult instance + if (count($job_ids_orig) === 1 && count($results) === 1) { + $result = reset($results); + if ($result->isFailed()) + throw new Exception($result->getError()); + return $result; + } + + // otherwise, return array of JobResult instances + return $results; + } + + /** + * @param string|string[] $targets + */ + public static function poke($targets) + { + + $client = getJobdMaster(); + + if (!is_array($targets)) + $targets = [$targets]; + + $client->poke($targets); + $targets = array_flip(array_unique($targets)); + + // remove poked targets from self::$new_jobs to avoid meaninglessly duplicating this poke from the destructor + if (!empty(self::$new_jobs)) { + foreach (self::$new_jobs as $new_job_id => $new_job) { + if ($new_job['status'] == Job::STATUS_WAITING && isset($targets[$new_job['target']])) + unset(self::$new_jobs[$new_job_id]); + } + } + + $client->close(); + return true; + } + + /** + * @param int $id + * @return array + */ + public static function get(int $id) + { + $db = getMySQL(); + $q = $db->query("SELECT * FROM ".JOBD_TABLE." WHERE id=?", $id); + return $db->fetch($q); + } + + /** + * Delete old succeeded jobs. + */ + public static function cleanup() + { + $db = getMySQL(); + $db->query("DELETE FROM ".JOBD_TABLE." WHERE status='done' AND result='ok' AND time_finished < ?", + time() - 86400); + } + +} + + +class job_target +{ + + const any = "any"; + + public static function high(int $server): string + { + return "$server/high"; + } + + public static function low(int $server): string + { + return "$server/low"; + } + +} + + +class jobs_destructor +{ + + public function __destruct() + { + jobs::destruct(); + } + +} diff --git a/src/classes/model.php b/src/classes/model.php new file mode 100644 index 0000000..ff44361 --- /dev/null +++ b/src/classes/model.php @@ -0,0 +1,149 @@ + $type) + $this->{toCamelCase($name)} = self::cast_to_type($type, $raw[$name]); + + if (is_null(static::DB_TABLE)) + trigger_error('class '.get_class($this).' doesn\'t have DB_TABLE defined'); + } + + /** + * @param $fields + */ + public function edit($fields) { + $db = getMySQL(); + + $save = []; + foreach ($fields as $name => $value) { + switch (static::$Fields[$name]) { + case self::ARRAY: + if (is_array($value)) { + $fields[$name] = implode(',', $value); + $save[$name] = $value; + } + break; + + case self::INTEGER: + $value = (int)$value; + $fields[$name] = $value; + $save[$name] = $value; + break; + + case self::FLOAT: + $value = (float)$value; + $fields[$name] = $value; + $save[$name] = $value; + break; + + case self::BOOLEAN: + $fields[$name] = $value ? 1 : 0; + $save[$name] = $value; + break; + + case self::JSON: + $fields[$name] = jsonEncode($value); + $save[$name] = $value; + break; + + case self::SERIALIZED: + $fields[$name] = serialize($value); + $save[$name] = $value; + break; + + default: + $value = (string)$value; + $fields[$name] = $value; + $save[$name] = $value; + break; + } + } + + if (!$db->update(static::DB_TABLE, $fields, static::DB_KEY."=?", $this->get_id())) { + //debugError(__METHOD__.': failed to update database'); + return; + } + + foreach ($save as $name => $value) + $this->{toCamelCase($name)} = $value; + } + + /** + * @return int + */ + public function get_id() { + return $this->{toCamelCase(static::DB_KEY)}; + } + + /** + * @param array $fields + * @param array $custom_getters + * @return array + */ + public function as_array(array $fields = [], array $custom_getters = []) { + if (empty($fields)) + $fields = array_keys(static::$Fields); + + $array = []; + foreach ($fields as $field) { + if (isset($custom_getters[$field]) && is_callable($custom_getters[$field])) { + $array[$field] = $custom_getters[$field](); + } else { + $array[$field] = $this->{toCamelCase($field)}; + } + } + + return $array; + } + + /** + * @param $type + * @param $value + * @return array|bool|false|float|int|string + */ + protected static function cast_to_type($type, $value) { + switch ($type) { + case self::BOOLEAN: + return (bool)$value; + + case self::INTEGER: + return (int)$value; + + case self::FLOAT: + return (float)$value; + + case self::ARRAY: + return array_filter(explode(',', $value)); + + case self::JSON: + return jsonDecode($value); + + case self::SERIALIZED: + return unserialize($value); + + default: + return (string)$value; + } + } + +} diff --git a/src/classes/mysql.php b/src/classes/mysql.php new file mode 100644 index 0000000..710390f --- /dev/null +++ b/src/classes/mysql.php @@ -0,0 +1,166 @@ +link = new mysqli(); + if (!$this->link->real_connect($host, $user, $password, $name)) { + $this->link = null; + throw new Exception('Could not connect to MySQL'); + } + } + + public function __destruct() + { + if ($this->link) + $this->link->close(); + } + + public function __get($k) + { + if ($k == 'error') { + return $this->link->error; + } + return $this->$k; + } + + public function query(string $sql) + { + if (func_num_args() > 1) { + $mark_count = substr_count($sql, '?'); + $positions = array(); + $last_pos = -1; + for ($i = 0; $i < $mark_count; $i++) { + $last_pos = strpos($sql, '?', $last_pos + 1); + $positions[] = $last_pos; + } + for ($i = $mark_count - 1; $i >= 0; $i--) { + $arg_val = func_get_arg($i + 1); + if (is_null($arg_val)) { + $v = 'NULL'; + } else { + $v = '\'' . $this->escape($arg_val) . '\''; + } + $sql = substr_replace($sql, $v, $positions[$i], 1); + } + } + + $q = $this->link->query($sql); + if (!$q) { + $error = $this->link->error; + trigger_error($error, E_USER_WARNING); + return false; + } + + return $q; + } + + public function insert(string $table, array $fields) + { + return $this->performInsert('INSERT', $table, $fields); + } + + public function replace(string $table, array $fields) + { + return $this->performInsert('REPLACE', $table, $fields); + } + + protected function performInsert(string $command, string $table, array $fields) + { + $names = []; + $values = []; + $count = 0; + foreach ($fields as $k => $v) { + $names[] = $k; + $values[] = $v; + $count++; + } + + $sql = "{$command} INTO `{$table}` (`" . implode('`, `', $names) . "`) VALUES (" . implode(', ', array_fill(0, $count, '?')) . ")"; + array_unshift($values, $sql); + + return call_user_func_array([$this, 'query'], $values); + } + + public function multipleInsert(string $table, array $rows) + { + return $this->performMultipleInsert('INSERT', $table, $rows); + } + + public function multipleReplace(string $table, array $rows) + { + return $this->performMultipleInsert('REPLACE', $table, $rows); + } + + protected function performMultipleInsert(string $command, string $table, array $rows) + { + $names = []; + $sql_rows = []; + foreach ($rows as $i => $fields) { + $row_values = []; + foreach ($fields as $field_name => $field_val) { + if ($i == 0) { + $names[] = $field_name; + } + $row_values[] = $this->escape($field_val); + } + $sql_rows[] = "('" . implode("', '", $row_values) . "')"; + } + + $sql = "{$command} INTO `{$table}` (`" . implode('`, `', $names) . "`) VALUES " . implode(', ', $sql_rows); + return $this->query($sql); + } + + public function update(string $table, arrow $rows, string ...$cond) + { + $fields = []; + $args = []; + foreach ($rows as $row_name => $row_value) { + $fields[] = "`{$row_name}`=?"; + $args[] = $row_value; + } + $sql = "UPDATE `$table` SET " . implode(', ', $fields); + if (!empty($cond)) { + $sql .= " WHERE " . $cond[0]; + if (count($cond) > 1) + $args = array_merge($args, array_slice($cond, 1)); + } + return $this->query($sql, ...$args); + } + + public function fetch(mysqli_result $q) + { + $row = $q->fetch_assoc(); + if (!$row) { + $q->free(); + return false; + } + return $row; + } + + public function result(mysqli_result $q, int $field = 0) + { + return $q ? $q->fetch_row()[$field] : false; + } + + public function insertId() + { + return $this->link->insert_id; + } + + public function numRows(mysqli_result $query): int + { + return $query->num_rows; + } + + public function escape(string $s): string + { + return $this->link->real_escape_string($s); + } + +} diff --git a/src/cron/jobs-cleanup.php b/src/cron/jobs-cleanup.php new file mode 100644 index 0000000..3e01d34 --- /dev/null +++ b/src/cron/jobs-cleanup.php @@ -0,0 +1,5 @@ +input['file']; + if (!touch($file)) + throw new \Exception("failed to touch file '".$file."'"); + } + +} diff --git a/src/jobs/Hello.php b/src/jobs/Hello.php new file mode 100644 index 0000000..4fa2446 --- /dev/null +++ b/src/jobs/Hello.php @@ -0,0 +1,15 @@ +input['name'] ?? 'noname').".\n"; + $greetings .= "I'm writing you from ".__METHOD__.", my PID is ".getmypid()." and I'm executing job #".$this->id."."; + echo jsonEncode(['response' => $greetings]); + } + +} diff --git a/src/launcher.php b/src/launcher.php new file mode 100644 index 0000000..d1b9892 --- /dev/null +++ b/src/launcher.php @@ -0,0 +1,31 @@ +status != Job::STATUS_RUNNING) + throw new RuntimeException("job status is {$job->status}"); + +try { + if ($job->run() !== false) + $job = true; +} catch (Exception $e) { + fprintf(STDERR, $e.''); + exit(1); +} \ No newline at end of file diff --git a/src/main.php b/src/main.php new file mode 100644 index 0000000..60779e5 --- /dev/null +++ b/src/main.php @@ -0,0 +1,93 @@ +result($db->query("SELECT COUNT(*) FROM ".JOBD_TABLE)); + } catch (Exception $e) { + echo red("MySQL connection failed")."\n"; + exit(1); + } + echo green("MySQL OK")."\n"; + + // jobd + try { + $jobd = getJobdMaster(); + $status = $jobd->status(true); + $workers_count = count($status->getData()['workers']); + if ($workers_count == 2) { + echo green("jobd-master and jobd OK"); + } else { + $message = "jobd-master OK, but "; + $message .= $workers_count == 1 ? "only 1 worker is connected" : "no workers are connected"; + echo yellow($message); + } + echo "\n"; + } catch (Exception $e) { + echo red("jobd-master connection failed: ".$e->getMessage())."\n"; + exit(1); + } +} + +function cmd_hello() { + $myname = input('Enter your name: '); + try { + $job_ids = []; + $job_server_map = []; + + for ($server = 1; $server <= 2; $server++) { + $id = jobs::manual(job_target::high($server), jobs\Hello::class, ['name' => $myname]); + $job_server_map[$id] = $server; + $job_ids[] = $id; + } + + $results = jobs::run($job_ids); + foreach ($results as $job_id => $job_result) { + $server = $job_server_map[$job_id]; + echo "> server {$server}:\n"; + if ($job_result->isFailed()) { + echo red("failed")."\n"; + } else { + echo green($job_result->getStdoutAsJSON()['response'])."\n"; + } + echo "\n"; + } + + } catch (Exception $e) { + echo red("error: ".$e->getMessage())."\n"; + exit(1); + } +} + +function cmd_createfile() { + $file = input('Enter file name: '); + jobs::add(job_target::any, jobs\CreateFile::class, ['file' => $file]); +} \ No newline at end of file -- cgit v1.2.3