diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2021-05-07 23:39:20 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2021-05-07 23:39:30 +0300 |
commit | 9a98ac50ff50dda2f2eed1ea825352c50c64440e (patch) | |
tree | cc5c7d36eee1ec934f8728e7f9104a9d2d760856 /src/classes/jobs.php |
initial
Diffstat (limited to 'src/classes/jobs.php')
-rw-r--r-- | src/classes/jobs.php | 275 |
1 files changed, 275 insertions, 0 deletions
diff --git a/src/classes/jobs.php b/src/classes/jobs.php new file mode 100644 index 0000000..9acbc50 --- /dev/null +++ b/src/classes/jobs.php @@ -0,0 +1,275 @@ +<?php + +class jobs +{ + + /** + * @var jobs_destructor $destructor_instance + */ + private static $destructor_instance; + + /** + * @var array<int, array> $new_jobs + */ + private static $new_jobs = []; + + /** + * Automatically poke master on exit. + */ + public static function destruct() + { + if (!empty(self::$new_jobs)) { + $targets = []; + foreach (self::$new_jobs as $new_job) { + if ($new_job['status'] === Job::STATUS_WAITING) + $targets[$new_job['target']] = true; + } + + if (!empty($targets)) { + $targets = array_keys($targets); + self::poke($targets); + } + } + } + + /** + * Create job. + * + * @param int|string $target + * @param string $name + * @param array $data + * @param string $status + * @return int|string Job ID + */ + public static function add($target, string $name, array $data = [], string $status = Job::STATUS_WAITING): int + { + if (is_null(self::$destructor_instance)) + self::$destructor_instance = new jobs_destructor(); + + if (strpos($name, '\\') !== false) { + $pos = strrpos($name, '\\'); + $name = substr($name, $pos + 1); + } + + $db = getMySQL(); + $db->insert(JOBD_TABLE, [ + 'target' => $target, + 'name' => $name, + 'time_created' => time(), + 'input' => serialize($data), + 'status' => $status + ]); + $id = $db->insertId(); + + self::$new_jobs[$id] = [ + 'target' => $target, + 'status' => $status + ]; + + return $id; + } + + /** + * Create manual job. + * + * @param int|string $target + * @param string $name + * @param array $data + * @return int + */ + public static function manual($target, string $name, array $data = []): int + { + return self::add($target, $name, $data, Job::STATUS_MANUAL); + } + + /** + * Run jobs with given ids and status=Job::STATUS_MANUAL and wait for results. + * + * If only one job was given and it's failed, an Exception will be thrown! + * If multiple jobs were given and some of them failed, an array of JobResults will be returned. + * + * @param int|int[] $job_ids + * @return array<int, JobResult>|JobResult + * @throws Exception + */ + public static function run($job_ids) + { + if (!is_array($job_ids)) + $job_ids = [$job_ids]; + + $job_ids_orig = $job_ids; + $job_ids = array_flip($job_ids); + + $jobs = []; + + // look for the given jobs in self::$new_jobs + foreach (self::$new_jobs as $id => $new_job) { + if ($new_job['status'] == Job::STATUS_MANUAL && isset($job_ids[$id])) { + $jobs[] = ['id' => $id, 'target' => $new_job['target']]; + unset($job_ids[$id]); + } + } + + // if some (or all) jobs were not found in self::$new_jobs, get them from the database + if (!empty($job_ids)) { + $job_ids = array_keys($job_ids); + + $db = getMySQL(); + $q = $db->query("SELECT id, target, status AS target FROM ".JOBD_TABLE." WHERE id IN (".implode(',', $job_ids).")"); + $job_ids = array_flip($job_ids); + + while ($row = $db->fetch($q)) { + // only manual jobs are allowed + if ($row['status'] != Job::STATUS_MANUAL) + throw new Exception("job id=${row['id']} has status = {$row['status']} != manual"); + + $jobs[] = [ + 'id' => (int)$row['id'], + 'target' => $row['target'] + ]; + + unset($job_ids[$row['id']]); + } + + $q->free(); + + // we were given invalid ids, it seems. throw an exception and don't continue + if (!empty($job_ids)) + throw new Exception("jobs with id ".implode(', ', array_keys($job_ids))." not found"); + } + + // connect to master and send run-manual request + $client = getJobdMaster(); + $response = $client->runManual($jobs); + + // master request failed + if (($error = $response->getError()) !== null) + throw new Exception("jobd returned error: ".$error); + + // at this point, jobd-master request succeeded + // doesn't mean our jobs were successfully accepted and executed by workers, + // but at least we have some results + + /** + * @var array<int, JobResult> $results + */ + $results = []; + $data = $response->getData(); + + $client->close(); + + // collect results, successes and failures + if (!empty($data['jobs'])) { + foreach ($data['jobs'] as $job_id => $job_result_raw) { + $job_result = (new JobResult())->setResult( + $job_result_raw['result'], + $job_result_raw['code'], + $job_result_raw['stdout'], + $job_result_raw['stderr'], + $job_result_raw['signal'] + ); + $results[$job_id] = $job_result; + } + } + if (!empty($data['errors'])) { + foreach ($data['errors'] as $job_id => $job_result_raw) { + $job_result = (new JobResult())->setError($job_result_raw); + $results[$job_id] = $job_result; + } + } + + // remove jobs from self::$new_jobs + foreach ($job_ids_orig as $id) { + if (isset(self::$new_jobs[$id])) + unset(self::$new_jobs[$id]); + } + + // if the $job_ids arguments wasn't an array, return the JobResult instance + if (count($job_ids_orig) === 1 && count($results) === 1) { + $result = reset($results); + if ($result->isFailed()) + throw new Exception($result->getError()); + return $result; + } + + // otherwise, return array of JobResult instances + return $results; + } + + /** + * @param string|string[] $targets + */ + public static function poke($targets) + { + + $client = getJobdMaster(); + + if (!is_array($targets)) + $targets = [$targets]; + + $client->poke($targets); + $targets = array_flip(array_unique($targets)); + + // remove poked targets from self::$new_jobs to avoid meaninglessly duplicating this poke from the destructor + if (!empty(self::$new_jobs)) { + foreach (self::$new_jobs as $new_job_id => $new_job) { + if ($new_job['status'] == Job::STATUS_WAITING && isset($targets[$new_job['target']])) + unset(self::$new_jobs[$new_job_id]); + } + } + + $client->close(); + return true; + } + + /** + * @param int $id + * @return array + */ + public static function get(int $id) + { + $db = getMySQL(); + $q = $db->query("SELECT * FROM ".JOBD_TABLE." WHERE id=?", $id); + return $db->fetch($q); + } + + /** + * Delete old succeeded jobs. + */ + public static function cleanup() + { + $db = getMySQL(); + $db->query("DELETE FROM ".JOBD_TABLE." WHERE status='done' AND result='ok' AND time_finished < ?", + time() - 86400); + } + +} + + +class job_target +{ + + const any = "any"; + + public static function high(int $server): string + { + return "$server/high"; + } + + public static function low(int $server): string + { + return "$server/low"; + } + +} + + +class jobs_destructor +{ + + public function __destruct() + { + jobs::destruct(); + } + +} |