path: root/src
diff options
Diffstat (limited to 'src')
12 files changed, 975 insertions, 0 deletions
diff --git a/src/classes/Job.php b/src/classes/Job.php
new file mode 100644
index 0000000..4ecbf6c
--- /dev/null
+++ b/src/classes/Job.php
@@ -0,0 +1,51 @@
+abstract class Job extends model {
+ // ENUM status
+ const STATUS_WAITING = 'waiting';
+ const STATUS_MANUAL = 'manual';
+ const STATUS_ACCEPTED = 'accepted';
+ const STATUS_IGNORED = 'ignored';
+ const STATUS_RUNNING = 'running';
+ const STATUS_DONE = 'done';
+ // ENUM result
+ const RESULT_OK = 'ok';
+ const RESULT_FAIL = 'fail';
+ const DB_TABLE = 'jobs';
+ protected static $Fields = [
+ 'id' => model::INTEGER,
+ 'target' => model::STRING,
+ 'name' => model::STRING,
+ 'time_created' => model::INTEGER,
+ 'time_started' => model::INTEGER,
+ 'time_finished' => model::INTEGER,
+ 'status' => model::STRING, // ENUM
+ 'result' => model::STRING, // ENUM
+ 'return_code' => model::INTEGER,
+ 'sig' => model::STRING,
+ 'stdout' => model::STRING,
+ 'stderr' => model::STRING,
+ 'input' => model::SERIALIZED,
+ ];
+ public $id;
+ public $target;
+ public $name;
+ public $timeCreated;
+ public $timeStarted;
+ public $timeFinished;
+ public $status;
+ public $result;
+ public $returnCode;
+ public $sig;
+ public $stdout;
+ public $stderr;
+ public $input;
+ abstract public function run();
+} \ No newline at end of file
diff --git a/src/classes/JobResult.php b/src/classes/JobResult.php
new file mode 100644
index 0000000..c42b6b0
--- /dev/null
+++ b/src/classes/JobResult.php
@@ -0,0 +1,96 @@
+class JobResult {
+ /**
+ * @var string $result
+ */
+ protected $result;
+ /**
+ * @var int $returnCode
+ */
+ protected $returnCode;
+ /**
+ * @var string|null $signal
+ */
+ protected $signal;
+ /**
+ * @var string $stdout
+ */
+ protected $stdout;
+ /**
+ * @var string $stderr
+ */
+ protected $stderr;
+ /**
+ * @param string $result
+ * @param int $return_code
+ * @param string $stdout
+ * @param string $stderr
+ * @param null $signal
+ * @return $this
+ */
+ public function setResult(string $result,
+ int $return_code,
+ string $stdout,
+ string $stderr,
+ $signal = null): JobResult
+ {
+ $this->result = $result;
+ $this->returnCode = $return_code;
+ $this->stdout = $stdout;
+ $this->stderr = $stderr;
+ $this->signal = $signal;
+ return $this;
+ }
+ /**
+ * @param string $error
+ * @return $this
+ */
+ public function setError(string $error): JobResult
+ {
+ $this->result = Job::RESULT_FAIL;
+ $this->stderr = $error;
+ return $this;
+ }
+ /**
+ * @return bool
+ */
+ public function isFailed(): bool
+ {
+ return $this->result == Job::RESULT_FAIL;
+ }
+ /**
+ * @return string
+ */
+ public function getStdout(): string
+ {
+ return $this->stdout;
+ }
+ /**
+ * @return mixed|null
+ */
+ public function getStdoutAsJSON() {
+ $json = jsonDecode($this->stdout);
+ return $json ? $json : null;
+ }
+ /**
+ * @return string
+ */
+ public function getError(): string {
+ return $this->stderr ?? '';
+ }
+} \ No newline at end of file
diff --git a/src/classes/jobs.php b/src/classes/jobs.php
new file mode 100644
index 0000000..9acbc50
--- /dev/null
+++ b/src/classes/jobs.php
@@ -0,0 +1,275 @@
+class jobs
+ /**
+ * @var jobs_destructor $destructor_instance
+ */
+ private static $destructor_instance;
+ /**
+ * @var array<int, array> $new_jobs
+ */
+ private static $new_jobs = [];
+ /**
+ * Automatically poke master on exit.
+ */
+ public static function destruct()
+ {
+ if (!empty(self::$new_jobs)) {
+ $targets = [];
+ foreach (self::$new_jobs as $new_job) {
+ if ($new_job['status'] === Job::STATUS_WAITING)
+ $targets[$new_job['target']] = true;
+ }
+ if (!empty($targets)) {
+ $targets = array_keys($targets);
+ self::poke($targets);
+ }
+ }
+ }
+ /**
+ * Create job.
+ *
+ * @param int|string $target
+ * @param string $name
+ * @param array $data
+ * @param string $status
+ * @return int|string Job ID
+ */
+ public static function add($target, string $name, array $data = [], string $status = Job::STATUS_WAITING): int
+ {
+ if (is_null(self::$destructor_instance))
+ self::$destructor_instance = new jobs_destructor();
+ if (strpos($name, '\\') !== false) {
+ $pos = strrpos($name, '\\');
+ $name = substr($name, $pos + 1);
+ }
+ $db = getMySQL();
+ $db->insert(JOBD_TABLE, [
+ 'target' => $target,
+ 'name' => $name,
+ 'time_created' => time(),
+ 'input' => serialize($data),
+ 'status' => $status
+ ]);
+ $id = $db->insertId();
+ self::$new_jobs[$id] = [
+ 'target' => $target,
+ 'status' => $status
+ ];
+ return $id;
+ }
+ /**
+ * Create manual job.
+ *
+ * @param int|string $target
+ * @param string $name
+ * @param array $data
+ * @return int
+ */
+ public static function manual($target, string $name, array $data = []): int
+ {
+ return self::add($target, $name, $data, Job::STATUS_MANUAL);
+ }
+ /**
+ * Run jobs with given ids and status=Job::STATUS_MANUAL and wait for results.
+ *
+ * If only one job was given and it's failed, an Exception will be thrown!
+ * If multiple jobs were given and some of them failed, an array of JobResults will be returned.
+ *
+ * @param int|int[] $job_ids
+ * @return array<int, JobResult>|JobResult
+ * @throws Exception
+ */
+ public static function run($job_ids)
+ {
+ if (!is_array($job_ids))
+ $job_ids = [$job_ids];
+ $job_ids_orig = $job_ids;
+ $job_ids = array_flip($job_ids);
+ $jobs = [];
+ // look for the given jobs in self::$new_jobs
+ foreach (self::$new_jobs as $id => $new_job) {
+ if ($new_job['status'] == Job::STATUS_MANUAL && isset($job_ids[$id])) {
+ $jobs[] = ['id' => $id, 'target' => $new_job['target']];
+ unset($job_ids[$id]);
+ }
+ }
+ // if some (or all) jobs were not found in self::$new_jobs, get them from the database
+ if (!empty($job_ids)) {
+ $job_ids = array_keys($job_ids);
+ $db = getMySQL();
+ $q = $db->query("SELECT id, target, status AS target FROM ".JOBD_TABLE." WHERE id IN (".implode(',', $job_ids).")");
+ $job_ids = array_flip($job_ids);
+ while ($row = $db->fetch($q)) {
+ // only manual jobs are allowed
+ if ($row['status'] != Job::STATUS_MANUAL)
+ throw new Exception("job id=${row['id']} has status = {$row['status']} != manual");
+ $jobs[] = [
+ 'id' => (int)$row['id'],
+ 'target' => $row['target']
+ ];
+ unset($job_ids[$row['id']]);
+ }
+ $q->free();
+ // we were given invalid ids, it seems. throw an exception and don't continue
+ if (!empty($job_ids))
+ throw new Exception("jobs with id ".implode(', ', array_keys($job_ids))." not found");
+ }
+ // connect to master and send run-manual request
+ $client = getJobdMaster();
+ $response = $client->runManual($jobs);
+ // master request failed
+ if (($error = $response->getError()) !== null)
+ throw new Exception("jobd returned error: ".$error);
+ // at this point, jobd-master request succeeded
+ // doesn't mean our jobs were successfully accepted and executed by workers,
+ // but at least we have some results
+ /**
+ * @var array<int, JobResult> $results
+ */
+ $results = [];
+ $data = $response->getData();
+ $client->close();
+ // collect results, successes and failures
+ if (!empty($data['jobs'])) {
+ foreach ($data['jobs'] as $job_id => $job_result_raw) {
+ $job_result = (new JobResult())->setResult(
+ $job_result_raw['result'],
+ $job_result_raw['code'],
+ $job_result_raw['stdout'],
+ $job_result_raw['stderr'],
+ $job_result_raw['signal']
+ );
+ $results[$job_id] = $job_result;
+ }
+ }
+ if (!empty($data['errors'])) {
+ foreach ($data['errors'] as $job_id => $job_result_raw) {
+ $job_result = (new JobResult())->setError($job_result_raw);
+ $results[$job_id] = $job_result;
+ }
+ }
+ // remove jobs from self::$new_jobs
+ foreach ($job_ids_orig as $id) {
+ if (isset(self::$new_jobs[$id]))
+ unset(self::$new_jobs[$id]);
+ }
+ // if the $job_ids arguments wasn't an array, return the JobResult instance
+ if (count($job_ids_orig) === 1 && count($results) === 1) {
+ $result = reset($results);
+ if ($result->isFailed())
+ throw new Exception($result->getError());
+ return $result;
+ }
+ // otherwise, return array of JobResult instances
+ return $results;
+ }
+ /**
+ * @param string|string[] $targets
+ */
+ public static function poke($targets)
+ {
+ $client = getJobdMaster();
+ if (!is_array($targets))
+ $targets = [$targets];
+ $client->poke($targets);
+ $targets = array_flip(array_unique($targets));
+ // remove poked targets from self::$new_jobs to avoid meaninglessly duplicating this poke from the destructor
+ if (!empty(self::$new_jobs)) {
+ foreach (self::$new_jobs as $new_job_id => $new_job) {
+ if ($new_job['status'] == Job::STATUS_WAITING && isset($targets[$new_job['target']]))
+ unset(self::$new_jobs[$new_job_id]);
+ }
+ }
+ $client->close();
+ return true;
+ }
+ /**
+ * @param int $id
+ * @return array
+ */
+ public static function get(int $id)
+ {
+ $db = getMySQL();
+ $q = $db->query("SELECT * FROM ".JOBD_TABLE." WHERE id=?", $id);
+ return $db->fetch($q);
+ }
+ /**
+ * Delete old succeeded jobs.
+ */
+ public static function cleanup()
+ {
+ $db = getMySQL();
+ $db->query("DELETE FROM ".JOBD_TABLE." WHERE status='done' AND result='ok' AND time_finished < ?",
+ time() - 86400);
+ }
+class job_target
+ const any = "any";
+ public static function high(int $server): string
+ {
+ return "$server/high";
+ }
+ public static function low(int $server): string
+ {
+ return "$server/low";
+ }
+class jobs_destructor
+ public function __destruct()
+ {
+ jobs::destruct();
+ }
diff --git a/src/classes/model.php b/src/classes/model.php
new file mode 100644
index 0000000..ff44361
--- /dev/null
+++ b/src/classes/model.php
@@ -0,0 +1,149 @@
+abstract class model {
+ const DB_TABLE = null;
+ const DB_KEY = 'id';
+ const STRING = 0;
+ const INTEGER = 1;
+ const FLOAT = 2;
+ const ARRAY = 3;
+ const BOOLEAN = 4;
+ const JSON = 5;
+ const SERIALIZED = 6;
+ protected static $Fields = [];
+ public static function create_instance(...$args) {
+ $cl = get_called_class();
+ return new $cl(...$args);
+ }
+ public function __construct($raw) {
+ foreach (static::$Fields as $name => $type)
+ $this->{toCamelCase($name)} = self::cast_to_type($type, $raw[$name]);
+ if (is_null(static::DB_TABLE))
+ trigger_error('class '.get_class($this).' doesn\'t have DB_TABLE defined');
+ }
+ /**
+ * @param $fields
+ */
+ public function edit($fields) {
+ $db = getMySQL();
+ $save = [];
+ foreach ($fields as $name => $value) {
+ switch (static::$Fields[$name]) {
+ case self::ARRAY:
+ if (is_array($value)) {
+ $fields[$name] = implode(',', $value);
+ $save[$name] = $value;
+ }
+ break;
+ case self::INTEGER:
+ $value = (int)$value;
+ $fields[$name] = $value;
+ $save[$name] = $value;
+ break;
+ case self::FLOAT:
+ $value = (float)$value;
+ $fields[$name] = $value;
+ $save[$name] = $value;
+ break;
+ case self::BOOLEAN:
+ $fields[$name] = $value ? 1 : 0;
+ $save[$name] = $value;
+ break;
+ case self::JSON:
+ $fields[$name] = jsonEncode($value);
+ $save[$name] = $value;
+ break;
+ case self::SERIALIZED:
+ $fields[$name] = serialize($value);
+ $save[$name] = $value;
+ break;
+ default:
+ $value = (string)$value;
+ $fields[$name] = $value;
+ $save[$name] = $value;
+ break;
+ }
+ }
+ if (!$db->update(static::DB_TABLE, $fields, static::DB_KEY."=?", $this->get_id())) {
+ //debugError(__METHOD__.': failed to update database');
+ return;
+ }
+ foreach ($save as $name => $value)
+ $this->{toCamelCase($name)} = $value;
+ }
+ /**
+ * @return int
+ */
+ public function get_id() {
+ return $this->{toCamelCase(static::DB_KEY)};
+ }
+ /**
+ * @param array $fields
+ * @param array $custom_getters
+ * @return array
+ */
+ public function as_array(array $fields = [], array $custom_getters = []) {
+ if (empty($fields))
+ $fields = array_keys(static::$Fields);
+ $array = [];
+ foreach ($fields as $field) {
+ if (isset($custom_getters[$field]) && is_callable($custom_getters[$field])) {
+ $array[$field] = $custom_getters[$field]();
+ } else {
+ $array[$field] = $this->{toCamelCase($field)};
+ }
+ }
+ return $array;
+ }
+ /**
+ * @param $type
+ * @param $value
+ * @return array|bool|false|float|int|string
+ */
+ protected static function cast_to_type($type, $value) {
+ switch ($type) {
+ case self::BOOLEAN:
+ return (bool)$value;
+ case self::INTEGER:
+ return (int)$value;
+ case self::FLOAT:
+ return (float)$value;
+ case self::ARRAY:
+ return array_filter(explode(',', $value));
+ case self::JSON:
+ return jsonDecode($value);
+ case self::SERIALIZED:
+ return unserialize($value);
+ default:
+ return (string)$value;
+ }
+ }
diff --git a/src/classes/mysql.php b/src/classes/mysql.php
new file mode 100644
index 0000000..710390f
--- /dev/null
+++ b/src/classes/mysql.php
@@ -0,0 +1,166 @@
+class mysql
+ /** @var mysqli $link */
+ private $link = null;
+ public function __construct(string $host, string $user, string $password, string $name)
+ {
+ $this->link = new mysqli();
+ if (!$this->link->real_connect($host, $user, $password, $name)) {
+ $this->link = null;
+ throw new Exception('Could not connect to MySQL');
+ }
+ }
+ public function __destruct()
+ {
+ if ($this->link)
+ $this->link->close();
+ }
+ public function __get($k)
+ {
+ if ($k == 'error') {
+ return $this->link->error;
+ }
+ return $this->$k;
+ }
+ public function query(string $sql)
+ {
+ if (func_num_args() > 1) {
+ $mark_count = substr_count($sql, '?');
+ $positions = array();
+ $last_pos = -1;
+ for ($i = 0; $i < $mark_count; $i++) {
+ $last_pos = strpos($sql, '?', $last_pos + 1);
+ $positions[] = $last_pos;
+ }
+ for ($i = $mark_count - 1; $i >= 0; $i--) {
+ $arg_val = func_get_arg($i + 1);
+ if (is_null($arg_val)) {
+ $v = 'NULL';
+ } else {
+ $v = '\'' . $this->escape($arg_val) . '\'';
+ }
+ $sql = substr_replace($sql, $v, $positions[$i], 1);
+ }
+ }
+ $q = $this->link->query($sql);
+ if (!$q) {
+ $error = $this->link->error;
+ trigger_error($error, E_USER_WARNING);
+ return false;
+ }
+ return $q;
+ }
+ public function insert(string $table, array $fields)
+ {
+ return $this->performInsert('INSERT', $table, $fields);
+ }
+ public function replace(string $table, array $fields)
+ {
+ return $this->performInsert('REPLACE', $table, $fields);
+ }
+ protected function performInsert(string $command, string $table, array $fields)
+ {
+ $names = [];
+ $values = [];
+ $count = 0;
+ foreach ($fields as $k => $v) {
+ $names[] = $k;
+ $values[] = $v;
+ $count++;
+ }
+ $sql = "{$command} INTO `{$table}` (`" . implode('`, `', $names) . "`) VALUES (" . implode(', ', array_fill(0, $count, '?')) . ")";
+ array_unshift($values, $sql);
+ return call_user_func_array([$this, 'query'], $values);
+ }
+ public function multipleInsert(string $table, array $rows)
+ {
+ return $this->performMultipleInsert('INSERT', $table, $rows);
+ }
+ public function multipleReplace(string $table, array $rows)
+ {
+ return $this->performMultipleInsert('REPLACE', $table, $rows);
+ }
+ protected function performMultipleInsert(string $command, string $table, array $rows)
+ {
+ $names = [];
+ $sql_rows = [];
+ foreach ($rows as $i => $fields) {
+ $row_values = [];
+ foreach ($fields as $field_name => $field_val) {
+ if ($i == 0) {
+ $names[] = $field_name;
+ }
+ $row_values[] = $this->escape($field_val);
+ }
+ $sql_rows[] = "('" . implode("', '", $row_values) . "')";
+ }
+ $sql = "{$command} INTO `{$table}` (`" . implode('`, `', $names) . "`) VALUES " . implode(', ', $sql_rows);
+ return $this->query($sql);
+ }
+ public function update(string $table, arrow $rows, string ...$cond)
+ {
+ $fields = [];
+ $args = [];
+ foreach ($rows as $row_name => $row_value) {
+ $fields[] = "`{$row_name}`=?";
+ $args[] = $row_value;
+ }
+ $sql = "UPDATE `$table` SET " . implode(', ', $fields);
+ if (!empty($cond)) {
+ $sql .= " WHERE " . $cond[0];
+ if (count($cond) > 1)
+ $args = array_merge($args, array_slice($cond, 1));
+ }
+ return $this->query($sql, ...$args);
+ }
+ public function fetch(mysqli_result $q)
+ {
+ $row = $q->fetch_assoc();
+ if (!$row) {
+ $q->free();
+ return false;
+ }
+ return $row;
+ }
+ public function result(mysqli_result $q, int $field = 0)
+ {
+ return $q ? $q->fetch_row()[$field] : false;
+ }
+ public function insertId()
+ {
+ return $this->link->insert_id;
+ }
+ public function numRows(mysqli_result $query): int
+ {
+ return $query->num_rows;
+ }
+ public function escape(string $s): string
+ {
+ return $this->link->real_escape_string($s);
+ }
diff --git a/src/cron/jobs-cleanup.php b/src/cron/jobs-cleanup.php
new file mode 100644
index 0000000..3e01d34
--- /dev/null
+++ b/src/cron/jobs-cleanup.php
@@ -0,0 +1,5 @@
+require __DIR__.'/../init.php';
+jobs::cleanup(); \ No newline at end of file
diff --git a/src/functions.php b/src/functions.php
new file mode 100644
index 0000000..9558dd9
--- /dev/null
+++ b/src/functions.php
@@ -0,0 +1,47 @@
+function jsonEncode($obj) {
+ return json_encode($obj, JSON_UNESCAPED_UNICODE);
+function jsonDecode($json) {
+ return json_decode($json, true);
+function toCamelCase(string $input, string $separator = '_'): string {
+ return lcfirst(str_replace($separator, '', ucwords($input, $separator)));
+/* Connection helpers */
+function getMySQL(): mysql {
+ static $link = null;
+ if (is_null($link))
+ return $link;
+function getJobdMaster(): jobd\MasterClient {
+ return new jobd\MasterClient(JOBD_PORT, JOBD_HOST, JOBD_PASSWORD);
+/* Command line helpers */
+function green(string $s): string {
+ return "\033[32m$s\033[0m";
+function yellow(string $s): string {
+ return "\033[33m$s\033[0m";
+function red(string $s): string {
+ return "\033[31m$s\033[0m";
+function input(string $prompt): string {
+ echo $prompt;
+ return substr(fgets(STDIN), 0, -1);
diff --git a/src/init.php b/src/init.php
new file mode 100644
index 0000000..0c18408
--- /dev/null
+++ b/src/init.php
@@ -0,0 +1,32 @@
+require __DIR__.'/../vendor/autoload.php';
+ini_set('display_errors', 1);
+define('MYSQL_HOST', '');
+define('MYSQL_USER', 'jobd');
+define('MYSQL_PASSWORD', 'password');
+define('MYSQL_DB', 'jobd');
+define('JOBD_TABLE', 'jobs2');
+define('JOBD_HOST', '');
+define('JOBD_PORT', jobd\Client::MASTER_PORT);
+define('JOBD_PASSWORD', '');
+spl_autoload_register(function($class) {
+ if (strpos($class, '\\') !== false) {
+ $class = str_replace('\\', '/', $class);
+ $root = __DIR__;
+ } else {
+ $root = __DIR__.'/classes';
+ }
+ $path = $root.'/'.$class.'.php';
+ if (is_file($path))
+ require_once $path;
+include __DIR__.'/functions.php'; \ No newline at end of file
diff --git a/src/jobs/CreateFile.php b/src/jobs/CreateFile.php
new file mode 100644
index 0000000..439925f
--- /dev/null
+++ b/src/jobs/CreateFile.php
@@ -0,0 +1,15 @@
+namespace jobs;
+class CreateFile extends \Job
+ public function run()
+ {
+ $file = $this->input['file'];
+ if (!touch($file))
+ throw new \Exception("failed to touch file '".$file."'");
+ }
diff --git a/src/jobs/Hello.php b/src/jobs/Hello.php
new file mode 100644
index 0000000..4fa2446
--- /dev/null
+++ b/src/jobs/Hello.php
@@ -0,0 +1,15 @@
+namespace jobs;
+class Hello extends \Job
+ public function run()
+ {
+ $greetings = "Hello, ".($this->input['name'] ?? 'noname').".\n";
+ $greetings .= "I'm writing you from ".__METHOD__.", my PID is ".getmypid()." and I'm executing job #".$this->id.".";
+ echo jsonEncode(['response' => $greetings]);
+ }
diff --git a/src/launcher.php b/src/launcher.php
new file mode 100644
index 0000000..d1b9892
--- /dev/null
+++ b/src/launcher.php
@@ -0,0 +1,31 @@
+require_once __DIR__.'/init.php';
+$job = null;
+register_shutdown_function(function() {
+ global $job;
+ if ($job !== true)
+ exit(1);
+$job_id = $argv[1] ?? null;
+$job_raw = jobs::get($job_id);
+if (!$job_raw)
+ throw new InvalidArgumentException("job $job_id not found");
+$class_name = "jobs\\{$job_raw['name']}";
+$job = new $class_name($job_raw);
+if ($job->status != Job::STATUS_RUNNING)
+ throw new RuntimeException("job status is {$job->status}");
+try {
+ if ($job->run() !== false)
+ $job = true;
+} catch (Exception $e) {
+ fprintf(STDERR, $e.'');
+ exit(1);
+} \ No newline at end of file
diff --git a/src/main.php b/src/main.php
new file mode 100644
index 0000000..60779e5
--- /dev/null
+++ b/src/main.php
@@ -0,0 +1,93 @@
+require __DIR__.'/init.php';
+if ($argc < 2) {
+ echo <<<EOF
+Usage: {$argv[0]} COMMAND
+ test
+ hello
+ createfile
+ exit;
+$cmd = $argv[1];
+$func = "cmd_{$cmd}";
+if (!function_exists($func)) {
+ echo red("command '".$cmd."' is not implement")."\n";
+ exit(1);
+/** Commands */
+function cmd_test() {
+ // MySQL
+ try {
+ $db = getMySQL();
+ $jobs_count = $db->result($db->query("SELECT COUNT(*) FROM ".JOBD_TABLE));
+ } catch (Exception $e) {
+ echo red("MySQL connection failed")."\n";
+ exit(1);
+ }
+ echo green("MySQL OK")."\n";
+ // jobd
+ try {
+ $jobd = getJobdMaster();
+ $status = $jobd->status(true);
+ $workers_count = count($status->getData()['workers']);
+ if ($workers_count == 2) {
+ echo green("jobd-master and jobd OK");
+ } else {
+ $message = "jobd-master OK, but ";
+ $message .= $workers_count == 1 ? "only 1 worker is connected" : "no workers are connected";
+ echo yellow($message);
+ }
+ echo "\n";
+ } catch (Exception $e) {
+ echo red("jobd-master connection failed: ".$e->getMessage())."\n";
+ exit(1);
+ }
+function cmd_hello() {
+ $myname = input('Enter your name: ');
+ try {
+ $job_ids = [];
+ $job_server_map = [];
+ for ($server = 1; $server <= 2; $server++) {
+ $id = jobs::manual(job_target::high($server), jobs\Hello::class, ['name' => $myname]);
+ $job_server_map[$id] = $server;
+ $job_ids[] = $id;
+ }
+ $results = jobs::run($job_ids);
+ foreach ($results as $job_id => $job_result) {
+ $server = $job_server_map[$job_id];
+ echo "> server {$server}:\n";
+ if ($job_result->isFailed()) {
+ echo red("failed")."\n";
+ } else {
+ echo green($job_result->getStdoutAsJSON()['response'])."\n";
+ }
+ echo "\n";
+ }
+ } catch (Exception $e) {
+ echo red("error: ".$e->getMessage())."\n";
+ exit(1);
+ }
+function cmd_createfile() {
+ $file = input('Enter file name: ');
+ jobs::add(job_target::any, jobs\CreateFile::class, ['file' => $file]);
+} \ No newline at end of file