From 9a98ac50ff50dda2f2eed1ea825352c50c64440e Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Fri, 7 May 2021 23:39:20 +0300 Subject: initial --- src/classes/Job.php | 51 +++++++++ src/classes/JobResult.php | 96 ++++++++++++++++ src/classes/jobs.php | 275 ++++++++++++++++++++++++++++++++++++++++++++++ src/classes/model.php | 149 +++++++++++++++++++++++++ src/classes/mysql.php | 166 ++++++++++++++++++++++++++++ src/cron/jobs-cleanup.php | 5 + src/functions.php | 47 ++++++++ src/init.php | 32 ++++++ src/jobs/CreateFile.php | 15 +++ src/jobs/Hello.php | 15 +++ src/launcher.php | 31 ++++++ src/main.php | 93 ++++++++++++++++ 12 files changed, 975 insertions(+) create mode 100644 src/classes/Job.php create mode 100644 src/classes/JobResult.php create mode 100644 src/classes/jobs.php create mode 100644 src/classes/model.php create mode 100644 src/classes/mysql.php create mode 100644 src/cron/jobs-cleanup.php create mode 100644 src/functions.php create mode 100644 src/init.php create mode 100644 src/jobs/CreateFile.php create mode 100644 src/jobs/Hello.php create mode 100644 src/launcher.php create mode 100644 src/main.php (limited to 'src') diff --git a/src/classes/Job.php b/src/classes/Job.php new file mode 100644 index 0000000..4ecbf6c --- /dev/null +++ b/src/classes/Job.php @@ -0,0 +1,51 @@ + model::INTEGER, + 'target' => model::STRING, + 'name' => model::STRING, + 'time_created' => model::INTEGER, + 'time_started' => model::INTEGER, + 'time_finished' => model::INTEGER, + 'status' => model::STRING, // ENUM + 'result' => model::STRING, // ENUM + 'return_code' => model::INTEGER, + 'sig' => model::STRING, + 'stdout' => model::STRING, + 'stderr' => model::STRING, + 'input' => model::SERIALIZED, + ]; + + public $id; + public $target; + public $name; + public $timeCreated; + public $timeStarted; + public $timeFinished; + public $status; + public $result; + public $returnCode; + public $sig; + public $stdout; + public $stderr; + public $input; + + abstract public function run(); + +} \ No newline at end of file diff --git a/src/classes/JobResult.php b/src/classes/JobResult.php new file mode 100644 index 0000000..c42b6b0 --- /dev/null +++ b/src/classes/JobResult.php @@ -0,0 +1,96 @@ +result = $result; + $this->returnCode = $return_code; + $this->stdout = $stdout; + $this->stderr = $stderr; + $this->signal = $signal; + + return $this; + } + + /** + * @param string $error + * @return $this + */ + public function setError(string $error): JobResult + { + $this->result = Job::RESULT_FAIL; + $this->stderr = $error; + + return $this; + } + + /** + * @return bool + */ + public function isFailed(): bool + { + return $this->result == Job::RESULT_FAIL; + } + + /** + * @return string + */ + public function getStdout(): string + { + return $this->stdout; + } + + /** + * @return mixed|null + */ + public function getStdoutAsJSON() { + $json = jsonDecode($this->stdout); + return $json ? $json : null; + } + + /** + * @return string + */ + public function getError(): string { + return $this->stderr ?? ''; + } + +} \ No newline at end of file diff --git a/src/classes/jobs.php b/src/classes/jobs.php new file mode 100644 index 0000000..9acbc50 --- /dev/null +++ b/src/classes/jobs.php @@ -0,0 +1,275 @@ + $new_jobs + */ + private static $new_jobs = []; + + /** + * Automatically poke master on exit. + */ + public static function destruct() + { + if (!empty(self::$new_jobs)) { + $targets = []; + foreach (self::$new_jobs as $new_job) { + if ($new_job['status'] === Job::STATUS_WAITING) + $targets[$new_job['target']] = true; + } + + if (!empty($targets)) { + $targets = array_keys($targets); + self::poke($targets); + } + } + } + + /** + * Create job. + * + * @param int|string $target + * @param string $name + * @param array $data + * @param string $status + * @return int|string Job ID + */ + public static function add($target, string $name, array $data = [], string $status = Job::STATUS_WAITING): int + { + if (is_null(self::$destructor_instance)) + self::$destructor_instance = new jobs_destructor(); + + if (strpos($name, '\\') !== false) { + $pos = strrpos($name, '\\'); + $name = substr($name, $pos + 1); + } + + $db = getMySQL(); + $db->insert(JOBD_TABLE, [ + 'target' => $target, + 'name' => $name, + 'time_created' => time(), + 'input' => serialize($data), + 'status' => $status + ]); + $id = $db->insertId(); + + self::$new_jobs[$id] = [ + 'target' => $target, + 'status' => $status + ]; + + return $id; + } + + /** + * Create manual job. + * + * @param int|string $target + * @param string $name + * @param array $data + * @return int + */ + public static function manual($target, string $name, array $data = []): int + { + return self::add($target, $name, $data, Job::STATUS_MANUAL); + } + + /** + * Run jobs with given ids and status=Job::STATUS_MANUAL and wait for results. + * + * If only one job was given and it's failed, an Exception will be thrown! + * If multiple jobs were given and some of them failed, an array of JobResults will be returned. + * + * @param int|int[] $job_ids + * @return array|JobResult + * @throws Exception + */ + public static function run($job_ids) + { + if (!is_array($job_ids)) + $job_ids = [$job_ids]; + + $job_ids_orig = $job_ids; + $job_ids = array_flip($job_ids); + + $jobs = []; + + // look for the given jobs in self::$new_jobs + foreach (self::$new_jobs as $id => $new_job) { + if ($new_job['status'] == Job::STATUS_MANUAL && isset($job_ids[$id])) { + $jobs[] = ['id' => $id, 'target' => $new_job['target']]; + unset($job_ids[$id]); + } + } + + // if some (or all) jobs were not found in self::$new_jobs, get them from the database + if (!empty($job_ids)) { + $job_ids = array_keys($job_ids); + + $db = getMySQL(); + $q = $db->query("SELECT id, target, status AS target FROM ".JOBD_TABLE." WHERE id IN (".implode(',', $job_ids).")"); + $job_ids = array_flip($job_ids); + + while ($row = $db->fetch($q)) { + // only manual jobs are allowed + if ($row['status'] != Job::STATUS_MANUAL) + throw new Exception("job id=${row['id']} has status = {$row['status']} != manual"); + + $jobs[] = [ + 'id' => (int)$row['id'], + 'target' => $row['target'] + ]; + + unset($job_ids[$row['id']]); + } + + $q->free(); + + // we were given invalid ids, it seems. throw an exception and don't continue + if (!empty($job_ids)) + throw new Exception("jobs with id ".implode(', ', array_keys($job_ids))." not found"); + } + + // connect to master and send run-manual request + $client = getJobdMaster(); + $response = $client->runManual($jobs); + + // master request failed + if (($error = $response->getError()) !== null) + throw new Exception("jobd returned error: ".$error); + + // at this point, jobd-master request succeeded + // doesn't mean our jobs were successfully accepted and executed by workers, + // but at least we have some results + + /** + * @var array $results + */ + $results = []; + $data = $response->getData(); + + $client->close(); + + // collect results, successes and failures + if (!empty($data['jobs'])) { + foreach ($data['jobs'] as $job_id => $job_result_raw) { + $job_result = (new JobResult())->setResult( + $job_result_raw['result'], + $job_result_raw['code'], + $job_result_raw['stdout'], + $job_result_raw['stderr'], + $job_result_raw['signal'] + ); + $results[$job_id] = $job_result; + } + } + if (!empty($data['errors'])) { + foreach ($data['errors'] as $job_id => $job_result_raw) { + $job_result = (new JobResult())->setError($job_result_raw); + $results[$job_id] = $job_result; + } + } + + // remove jobs from self::$new_jobs + foreach ($job_ids_orig as $id) { + if (isset(self::$new_jobs[$id])) + unset(self::$new_jobs[$id]); + } + + // if the $job_ids arguments wasn't an array, return the JobResult instance + if (count($job_ids_orig) === 1 && count($results) === 1) { + $result = reset($results); + if ($result->isFailed()) + throw new Exception($result->getError()); + return $result; + } + + // otherwise, return array of JobResult instances + return $results; + } + + /** + * @param string|string[] $targets + */ + public static function poke($targets) + { + + $client = getJobdMaster(); + + if (!is_array($targets)) + $targets = [$targets]; + + $client->poke($targets); + $targets = array_flip(array_unique($targets)); + + // remove poked targets from self::$new_jobs to avoid meaninglessly duplicating this poke from the destructor + if (!empty(self::$new_jobs)) { + foreach (self::$new_jobs as $new_job_id => $new_job) { + if ($new_job['status'] == Job::STATUS_WAITING && isset($targets[$new_job['target']])) + unset(self::$new_jobs[$new_job_id]); + } + } + + $client->close(); + return true; + } + + /** + * @param int $id + * @return array + */ + public static function get(int $id) + { + $db = getMySQL(); + $q = $db->query("SELECT * FROM ".JOBD_TABLE." WHERE id=?", $id); + return $db->fetch($q); + } + + /** + * Delete old succeeded jobs. + */ + public static function cleanup() + { + $db = getMySQL(); + $db->query("DELETE FROM ".JOBD_TABLE." WHERE status='done' AND result='ok' AND time_finished < ?", + time() - 86400); + } + +} + + +class job_target +{ + + const any = "any"; + + public static function high(int $server): string + { + return "$server/high"; + } + + public static function low(int $server): string + { + return "$server/low"; + } + +} + + +class jobs_destructor +{ + + public function __destruct() + { + jobs::destruct(); + } + +} diff --git a/src/classes/model.php b/src/classes/model.php new file mode 100644 index 0000000..ff44361 --- /dev/null +++ b/src/classes/model.php @@ -0,0 +1,149 @@ + $type) + $this->{toCamelCase($name)} = self::cast_to_type($type, $raw[$name]); + + if (is_null(static::DB_TABLE)) + trigger_error('class '.get_class($this).' doesn\'t have DB_TABLE defined'); + } + + /** + * @param $fields + */ + public function edit($fields) { + $db = getMySQL(); + + $save = []; + foreach ($fields as $name => $value) { + switch (static::$Fields[$name]) { + case self::ARRAY: + if (is_array($value)) { + $fields[$name] = implode(',', $value); + $save[$name] = $value; + } + break; + + case self::INTEGER: + $value = (int)$value; + $fields[$name] = $value; + $save[$name] = $value; + break; + + case self::FLOAT: + $value = (float)$value; + $fields[$name] = $value; + $save[$name] = $value; + break; + + case self::BOOLEAN: + $fields[$name] = $value ? 1 : 0; + $save[$name] = $value; + break; + + case self::JSON: + $fields[$name] = jsonEncode($value); + $save[$name] = $value; + break; + + case self::SERIALIZED: + $fields[$name] = serialize($value); + $save[$name] = $value; + break; + + default: + $value = (string)$value; + $fields[$name] = $value; + $save[$name] = $value; + break; + } + } + + if (!$db->update(static::DB_TABLE, $fields, static::DB_KEY."=?", $this->get_id())) { + //debugError(__METHOD__.': failed to update database'); + return; + } + + foreach ($save as $name => $value) + $this->{toCamelCase($name)} = $value; + } + + /** + * @return int + */ + public function get_id() { + return $this->{toCamelCase(static::DB_KEY)}; + } + + /** + * @param array $fields + * @param array $custom_getters + * @return array + */ + public function as_array(array $fields = [], array $custom_getters = []) { + if (empty($fields)) + $fields = array_keys(static::$Fields); + + $array = []; + foreach ($fields as $field) { + if (isset($custom_getters[$field]) && is_callable($custom_getters[$field])) { + $array[$field] = $custom_getters[$field](); + } else { + $array[$field] = $this->{toCamelCase($field)}; + } + } + + return $array; + } + + /** + * @param $type + * @param $value + * @return array|bool|false|float|int|string + */ + protected static function cast_to_type($type, $value) { + switch ($type) { + case self::BOOLEAN: + return (bool)$value; + + case self::INTEGER: + return (int)$value; + + case self::FLOAT: + return (float)$value; + + case self::ARRAY: + return array_filter(explode(',', $value)); + + case self::JSON: + return jsonDecode($value); + + case self::SERIALIZED: + return unserialize($value); + + default: + return (string)$value; + } + } + +} diff --git a/src/classes/mysql.php b/src/classes/mysql.php new file mode 100644 index 0000000..710390f --- /dev/null +++ b/src/classes/mysql.php @@ -0,0 +1,166 @@ +link = new mysqli(); + if (!$this->link->real_connect($host, $user, $password, $name)) { + $this->link = null; + throw new Exception('Could not connect to MySQL'); + } + } + + public function __destruct() + { + if ($this->link) + $this->link->close(); + } + + public function __get($k) + { + if ($k == 'error') { + return $this->link->error; + } + return $this->$k; + } + + public function query(string $sql) + { + if (func_num_args() > 1) { + $mark_count = substr_count($sql, '?'); + $positions = array(); + $last_pos = -1; + for ($i = 0; $i < $mark_count; $i++) { + $last_pos = strpos($sql, '?', $last_pos + 1); + $positions[] = $last_pos; + } + for ($i = $mark_count - 1; $i >= 0; $i--) { + $arg_val = func_get_arg($i + 1); + if (is_null($arg_val)) { + $v = 'NULL'; + } else { + $v = '\'' . $this->escape($arg_val) . '\''; + } + $sql = substr_replace($sql, $v, $positions[$i], 1); + } + } + + $q = $this->link->query($sql); + if (!$q) { + $error = $this->link->error; + trigger_error($error, E_USER_WARNING); + return false; + } + + return $q; + } + + public function insert(string $table, array $fields) + { + return $this->performInsert('INSERT', $table, $fields); + } + + public function replace(string $table, array $fields) + { + return $this->performInsert('REPLACE', $table, $fields); + } + + protected function performInsert(string $command, string $table, array $fields) + { + $names = []; + $values = []; + $count = 0; + foreach ($fields as $k => $v) { + $names[] = $k; + $values[] = $v; + $count++; + } + + $sql = "{$command} INTO `{$table}` (`" . implode('`, `', $names) . "`) VALUES (" . implode(', ', array_fill(0, $count, '?')) . ")"; + array_unshift($values, $sql); + + return call_user_func_array([$this, 'query'], $values); + } + + public function multipleInsert(string $table, array $rows) + { + return $this->performMultipleInsert('INSERT', $table, $rows); + } + + public function multipleReplace(string $table, array $rows) + { + return $this->performMultipleInsert('REPLACE', $table, $rows); + } + + protected function performMultipleInsert(string $command, string $table, array $rows) + { + $names = []; + $sql_rows = []; + foreach ($rows as $i => $fields) { + $row_values = []; + foreach ($fields as $field_name => $field_val) { + if ($i == 0) { + $names[] = $field_name; + } + $row_values[] = $this->escape($field_val); + } + $sql_rows[] = "('" . implode("', '", $row_values) . "')"; + } + + $sql = "{$command} INTO `{$table}` (`" . implode('`, `', $names) . "`) VALUES " . implode(', ', $sql_rows); + return $this->query($sql); + } + + public function update(string $table, arrow $rows, string ...$cond) + { + $fields = []; + $args = []; + foreach ($rows as $row_name => $row_value) { + $fields[] = "`{$row_name}`=?"; + $args[] = $row_value; + } + $sql = "UPDATE `$table` SET " . implode(', ', $fields); + if (!empty($cond)) { + $sql .= " WHERE " . $cond[0]; + if (count($cond) > 1) + $args = array_merge($args, array_slice($cond, 1)); + } + return $this->query($sql, ...$args); + } + + public function fetch(mysqli_result $q) + { + $row = $q->fetch_assoc(); + if (!$row) { + $q->free(); + return false; + } + return $row; + } + + public function result(mysqli_result $q, int $field = 0) + { + return $q ? $q->fetch_row()[$field] : false; + } + + public function insertId() + { + return $this->link->insert_id; + } + + public function numRows(mysqli_result $query): int + { + return $query->num_rows; + } + + public function escape(string $s): string + { + return $this->link->real_escape_string($s); + } + +} diff --git a/src/cron/jobs-cleanup.php b/src/cron/jobs-cleanup.php new file mode 100644 index 0000000..3e01d34 --- /dev/null +++ b/src/cron/jobs-cleanup.php @@ -0,0 +1,5 @@ +input['file']; + if (!touch($file)) + throw new \Exception("failed to touch file '".$file."'"); + } + +} diff --git a/src/jobs/Hello.php b/src/jobs/Hello.php new file mode 100644 index 0000000..4fa2446 --- /dev/null +++ b/src/jobs/Hello.php @@ -0,0 +1,15 @@ +input['name'] ?? 'noname').".\n"; + $greetings .= "I'm writing you from ".__METHOD__.", my PID is ".getmypid()." and I'm executing job #".$this->id."."; + echo jsonEncode(['response' => $greetings]); + } + +} diff --git a/src/launcher.php b/src/launcher.php new file mode 100644 index 0000000..d1b9892 --- /dev/null +++ b/src/launcher.php @@ -0,0 +1,31 @@ +status != Job::STATUS_RUNNING) + throw new RuntimeException("job status is {$job->status}"); + +try { + if ($job->run() !== false) + $job = true; +} catch (Exception $e) { + fprintf(STDERR, $e.''); + exit(1); +} \ No newline at end of file diff --git a/src/main.php b/src/main.php new file mode 100644 index 0000000..60779e5 --- /dev/null +++ b/src/main.php @@ -0,0 +1,93 @@ +result($db->query("SELECT COUNT(*) FROM ".JOBD_TABLE)); + } catch (Exception $e) { + echo red("MySQL connection failed")."\n"; + exit(1); + } + echo green("MySQL OK")."\n"; + + // jobd + try { + $jobd = getJobdMaster(); + $status = $jobd->status(true); + $workers_count = count($status->getData()['workers']); + if ($workers_count == 2) { + echo green("jobd-master and jobd OK"); + } else { + $message = "jobd-master OK, but "; + $message .= $workers_count == 1 ? "only 1 worker is connected" : "no workers are connected"; + echo yellow($message); + } + echo "\n"; + } catch (Exception $e) { + echo red("jobd-master connection failed: ".$e->getMessage())."\n"; + exit(1); + } +} + +function cmd_hello() { + $myname = input('Enter your name: '); + try { + $job_ids = []; + $job_server_map = []; + + for ($server = 1; $server <= 2; $server++) { + $id = jobs::manual(job_target::high($server), jobs\Hello::class, ['name' => $myname]); + $job_server_map[$id] = $server; + $job_ids[] = $id; + } + + $results = jobs::run($job_ids); + foreach ($results as $job_id => $job_result) { + $server = $job_server_map[$job_id]; + echo "> server {$server}:\n"; + if ($job_result->isFailed()) { + echo red("failed")."\n"; + } else { + echo green($job_result->getStdoutAsJSON()['response'])."\n"; + } + echo "\n"; + } + + } catch (Exception $e) { + echo red("error: ".$e->getMessage())."\n"; + exit(1); + } +} + +function cmd_createfile() { + $file = input('Enter file name: '); + jobs::add(job_target::any, jobs\CreateFile::class, ['file' => $file]); +} \ No newline at end of file -- cgit v1.2.3