From d0fa8d6f63003a6cb2f1897158bc587ab2344ea3 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Thu, 13 Apr 2023 02:14:53 +0300 Subject: 1.6.0: support signals --- .gitignore | 1 + README.md | 4 +- composer.json | 6 ++- composer.lock | 22 ++++++++++ src/Client.php | 56 +++++++++++++----------- src/Exception.php | 5 --- src/MasterClient.php | 39 +++++++++++++++-- src/Message.php | 43 ------------------- src/PingMessage.php | 17 -------- src/PongMessage.php | 17 -------- src/RequestMessage.php | 60 -------------------------- src/ResponseMessage.php | 69 ------------------------------ src/WorkerClient.php | 22 ++++++++-- src/exceptions/JobInterruptedException.php | 11 +++++ src/exceptions/JobdException.php | 5 +++ src/messages/Message.php | 43 +++++++++++++++++++ src/messages/PingMessage.php | 17 ++++++++ src/messages/PongMessage.php | 17 ++++++++ src/messages/RequestMessage.php | 60 ++++++++++++++++++++++++++ src/messages/ResponseMessage.php | 69 ++++++++++++++++++++++++++++++ 20 files changed, 337 insertions(+), 246 deletions(-) create mode 100644 composer.lock delete mode 100644 src/Exception.php delete mode 100644 src/Message.php delete mode 100644 src/PingMessage.php delete mode 100644 src/PongMessage.php delete mode 100644 src/RequestMessage.php delete mode 100644 src/ResponseMessage.php create mode 100644 src/exceptions/JobInterruptedException.php create mode 100644 src/exceptions/JobdException.php create mode 100644 src/messages/Message.php create mode 100644 src/messages/PingMessage.php create mode 100644 src/messages/PongMessage.php create mode 100644 src/messages/RequestMessage.php create mode 100644 src/messages/ResponseMessage.php diff --git a/.gitignore b/.gitignore index 485dee6..45b1244 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .idea +/vendor \ No newline at end of file diff --git a/README.md b/README.md index 97d8d02..9d6993b 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ Here's a small example. ```php try { $jobd = new jobd\MasterClient(); -} catch (jobd\Exception $e) { +} catch (\jobd\exceptions\JobdException $e) { die("Failed to connect.\n"); } @@ -29,7 +29,7 @@ try { // get status from master $status = $jobd->status()->getData(); -} catch (jobd\Exception $e) { +} catch (\jobd\exceptions\JobdException $e) { die('jobd error: '.$e->getMessage()."\n"); } diff --git a/composer.json b/composer.json index f2dde7c..0fbb9f7 100644 --- a/composer.json +++ b/composer.json @@ -1,6 +1,6 @@ { "name": "ch1p/jobd-client", - "version": "1.5.2", + "version": "1.6.0", "license": "BSD-2-Clause", "keywords": ["queue", "job", "jobd"], "repositories": [ @@ -11,7 +11,9 @@ ], "autoload": { "psr-4": { - "jobd\\": "src/" + "jobd\\": "src/", + "jobd\\messages\\": "src/messages", + "jobd\\exceptions\\": "src/exceptions" } }, "require": { diff --git a/composer.lock b/composer.lock new file mode 100644 index 0000000..768bcea --- /dev/null +++ b/composer.lock @@ -0,0 +1,22 @@ +{ + "_readme": [ + "This file locks the dependencies of your project to a known state", + "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", + "This file is @generated automatically" + ], + "content-hash": "3ea2178ffa7d2dde56d1c2ac872d7c34", + "packages": [], + "packages-dev": [], + "aliases": [], + "minimum-stability": "stable", + "stability-flags": [], + "prefer-stable": false, + "prefer-lowest": false, + "platform": { + "php": ">=7.0", + "ext-json": "*", + "ext-sockets": "*" + }, + "platform-dev": [], + "plugin-api-version": "2.3.0" +} diff --git a/src/Client.php b/src/Client.php index b4628ca..b66b6e7 100644 --- a/src/Client.php +++ b/src/Client.php @@ -2,8 +2,14 @@ namespace jobd; +use jobd\exceptions\JobdException; +use jobd\messages\Message; +use jobd\messages\PingMessage; +use jobd\messages\PongMessage; +use jobd\messages\RequestMessage; +use jobd\messages\ResponseMessage; -class Client { +abstract class Client { const WORKER_PORT = 7080; const MASTER_PORT = 7081; @@ -23,7 +29,7 @@ class Client { * @param int $port * @param string $host * @param string $password - * @throws Exception + * @throws JobdException */ public function __construct(int $port, string $host = '127.0.0.1', string $password = '') { @@ -32,12 +38,12 @@ class Client { $this->password = $password; if (($socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) === false) - throw new Exception("socket_create() failed: ".$this->getSocketError()); + throw new JobdException("socket_create() failed: ".$this->getSocketError()); $this->sock = $socket; if ((socket_connect($socket, $host, $port)) === false) - throw new Exception("socket_connect() failed: ".$this->getSocketError()); + throw new JobdException("socket_connect() failed: ".$this->getSocketError()); $this->lastOutgoingRequestNo = mt_rand(1 /* 0 is reserved */, self::REQUEST_NO_LIMIT); } @@ -53,7 +59,7 @@ class Client { /** * @param string[] $targets * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function pause(array $targets = []): ResponseMessage { @@ -69,7 +75,7 @@ class Client { /** * @param string[] $targets * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function continue(array $targets = []): ResponseMessage { @@ -84,7 +90,7 @@ class Client { /** * @return PongMessage - * @throws Exception + * @throws JobdException */ public function ping(): PongMessage { @@ -113,7 +119,7 @@ class Client { /** * @param Message $message - * @throws Exception + * @throws JobdException */ public function send(Message $message) { @@ -123,7 +129,7 @@ class Client { while ($remained > 0) { $result = socket_write($this->sock, $data); if ($result === false) - throw new Exception(__METHOD__ . ": socket_write() failed: ".$this->getSocketError()); + throw new JobdException(__METHOD__ . ": socket_write() failed: ".$this->getSocketError()); $remained -= $result; if ($remained > 0) @@ -134,7 +140,7 @@ class Client { /** * @param int $request_no * @return RequestMessage|ResponseMessage|PingMessage|PongMessage - * @throws Exception + * @throws JobdException */ public function recv(int $request_no = -1) { @@ -145,7 +151,7 @@ class Client { while (true) { $result = socket_recv($this->sock, $recv_buf, 1024, 0); if ($result === false) - throw new Exception(__METHOD__ . ": socket_recv() failed: " . $this->getSocketError()); + throw new JobdException(__METHOD__ . ": socket_recv() failed: " . $this->getSocketError()); // peer disconnected if ($result === 0) @@ -172,7 +178,7 @@ class Client { } while ($eot_pos !== false && $offset < $buflen-1); if (empty($messages)) - throw new Exception("Malformed response: no messages found. Response: {$buf}"); + throw new JobdException("Malformed response: no messages found. Response: {$buf}"); if (count($messages) > 1) trigger_error(__METHOD__.": received more than one message"); @@ -196,7 +202,7 @@ class Client { ); if (empty($messages)) - throw new Exception("Malformed response: response for {$request_no} not found."); + throw new JobdException("Malformed response: response for {$request_no} not found."); if (count($messages) == 2) { @@ -220,7 +226,7 @@ class Client { if ($response instanceof ResponseMessage) { if ($error = $response->getError()) - throw new Exception($response->getError()); + throw new JobdException($response->getError()); } return $response; @@ -242,13 +248,13 @@ class Client { /** * @param string $raw_string * @return RequestMessage|ResponseMessage|PingMessage|PongMessage - * @throws Exception + * @throws JobdException */ protected static function parseMessage(string $raw_string) { $raw = json_decode($raw_string, true); if (!is_array($raw) || count($raw) < 1) - throw new Exception("Malformed message: {$raw_string}"); + throw new JobdException("Malformed message: {$raw_string}"); list($type) = $raw; @@ -263,8 +269,8 @@ class Client { ['password', 's', false], ['data', 'a', false] ]); - } catch (Exception $e) { - throw new Exception("Malformed REQUEST message: {$e->getMessage()}"); + } catch (JobdException $e) { + throw new JobdException("Malformed REQUEST message: {$e->getMessage()}"); } $message = new RequestMessage($data['type'], $data['data'] ?? null); @@ -283,8 +289,8 @@ class Client { ['data', 'aifs', false], ['error', 's', false], ]); - } catch (Exception $e) { - throw new Exception("Malformed RESPONSE message: {$e->getMessage()}"); + } catch (JobdException $e) { + throw new JobdException("Malformed RESPONSE message: {$e->getMessage()}"); } return new ResponseMessage($data['no'], $data['error'] ?? null, $data['data'] ?? null); @@ -296,25 +302,25 @@ class Client { return new PongMessage(); default: - throw new Exception("Malformed message: unexpected type {$type}"); + throw new JobdException("Malformed message: unexpected type {$type}"); } } /** * @param mixed $data * @param array $schema - * @throws Exception + * @throws JobdException */ protected static function validateData($data, array $schema) { if (!$data || !is_array($data)) - throw new Exception('data must be array'); + throw new JobdException('data must be array'); foreach ($schema as $schema_item) { list ($key_name, $key_types, $key_required) = $schema_item; if (!isset($data[$key_name])) { if ($key_required) - throw new Exception("'{$key_name}' is missing"); + throw new JobdException("'{$key_name}' is missing"); continue; } @@ -354,7 +360,7 @@ class Client { } if (!$passed) - throw new Exception("{$key_name}: required type is '{$key_types}'"); + throw new JobdException("{$key_name}: required type is '{$key_types}'"); } } diff --git a/src/Exception.php b/src/Exception.php deleted file mode 100644 index be64d4c..0000000 --- a/src/Exception.php +++ /dev/null @@ -1,5 +0,0 @@ -sendSignals([ + [ + 'id' => $job_id, + 'signal' => $signal, + 'target' => $target + ] + ]); + } + + /** + * @param array $data + * @return ResponseMessage + * @throws JobdException + */ + public function sendSignals(array $data): ResponseMessage { + return $this->recv( + $this->sendRequest(new RequestMessage('send-signal', ['jobs' => $data])) + ); + } + } \ No newline at end of file diff --git a/src/Message.php b/src/Message.php deleted file mode 100644 index d5df6ab..0000000 --- a/src/Message.php +++ /dev/null @@ -1,43 +0,0 @@ -type = $type; - } - - /** - * @return array - */ - abstract protected function getContent(): array; - - /** - * @return string - */ - public function serialize(): string - { - $data = [$this->type]; - $content = $this->getContent(); - - if (!empty($content)) - $data[] = $content; - - return json_encode($data); - } - -} \ No newline at end of file diff --git a/src/PingMessage.php b/src/PingMessage.php deleted file mode 100644 index f3441c9..0000000 --- a/src/PingMessage.php +++ /dev/null @@ -1,17 +0,0 @@ -requestData = $request_data; - $this->requestType = $request_type; - } - - /** - * @param string $password - */ - public function setPassword(string $password) - { - $this->password = $password; - } - - /** - * @param int $no - */ - public function setRequestNo(int $no) - { - $this->requestNo = $no; - } - - /** - * @return string[] - */ - protected function getContent(): array - { - $request = [ - 'type' => $this->requestType, - 'no' => $this->requestNo, - ]; - - if (!is_null($this->requestData)) - $request['data'] = (object)$this->requestData; - - if (!is_null($this->password)) - $request['password'] = $this->password; - - return $request; - } - -} \ No newline at end of file diff --git a/src/ResponseMessage.php b/src/ResponseMessage.php deleted file mode 100644 index 1f91dd4..0000000 --- a/src/ResponseMessage.php +++ /dev/null @@ -1,69 +0,0 @@ -requestNo = $request_no; - $this->error = $error; - $this->data = $data; - } - - /** - * @return array - */ - protected function getContent(): array - { - $response = [ - 'no' => $this->requestNo - ]; - - if (!is_null($this->error)) - $response['error'] = $this->error; - - if (!is_null(!$this->data)) - $response['data'] = $this->data; - - return $response; - } - - /** - * @return mixed - */ - public function getError() - { - return $this->error; - } - - /** - * @return mixed - */ - public function getData() - { - return $this->data; - } - - /** - * @return int - */ - public function getRequestNo(): int - { - return $this->requestNo; - } - -} \ No newline at end of file diff --git a/src/WorkerClient.php b/src/WorkerClient.php index 2773de5..16f8963 100644 --- a/src/WorkerClient.php +++ b/src/WorkerClient.php @@ -2,6 +2,10 @@ namespace jobd; +use jobd\exceptions\JobdException; +use jobd\messages\RequestMessage; +use jobd\messages\ResponseMessage; + class WorkerClient extends Client { public function __construct(int $port = Client::WORKER_PORT, ...$args) @@ -48,11 +52,23 @@ class WorkerClient extends Client { ); } + /** + * @param array[] $jobs + * @return ResponseMessage + * @throws JobdException + */ + public function sendSignal(array $jobs): ResponseMessage + { + return $this->recv( + $this->sendRequest(new RequestMessage('send-signal', ['jobs' => $jobs])) + ); + } + /** * @param string $target * @param int $concurrency * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function addTarget(string $target, int $concurrency): ResponseMessage { @@ -67,7 +83,7 @@ class WorkerClient extends Client { /** * @param string $target * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function removeTarget(string $target): ResponseMessage { @@ -82,7 +98,7 @@ class WorkerClient extends Client { * @param string $target * @param int $concurrency * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function setTargetConcurrency(string $target, int $concurrency): ResponseMessage { diff --git a/src/exceptions/JobInterruptedException.php b/src/exceptions/JobInterruptedException.php new file mode 100644 index 0000000..c645cc5 --- /dev/null +++ b/src/exceptions/JobInterruptedException.php @@ -0,0 +1,11 @@ +type = $type; + } + + /** + * @return array + */ + abstract protected function getContent(): array; + + /** + * @return string + */ + public function serialize(): string + { + $data = [$this->type]; + $content = $this->getContent(); + + if (!empty($content)) + $data[] = $content; + + return json_encode($data); + } + +} \ No newline at end of file diff --git a/src/messages/PingMessage.php b/src/messages/PingMessage.php new file mode 100644 index 0000000..fa61379 --- /dev/null +++ b/src/messages/PingMessage.php @@ -0,0 +1,17 @@ +requestData = $request_data; + $this->requestType = $request_type; + } + + /** + * @param string $password + */ + public function setPassword(string $password) + { + $this->password = $password; + } + + /** + * @param int $no + */ + public function setRequestNo(int $no) + { + $this->requestNo = $no; + } + + /** + * @return string[] + */ + protected function getContent(): array + { + $request = [ + 'type' => $this->requestType, + 'no' => $this->requestNo, + ]; + + if (!is_null($this->requestData)) + $request['data'] = (object)$this->requestData; + + if (!is_null($this->password)) + $request['password'] = $this->password; + + return $request; + } + +} \ No newline at end of file diff --git a/src/messages/ResponseMessage.php b/src/messages/ResponseMessage.php new file mode 100644 index 0000000..5101ed4 --- /dev/null +++ b/src/messages/ResponseMessage.php @@ -0,0 +1,69 @@ +requestNo = $request_no; + $this->error = $error; + $this->data = $data; + } + + /** + * @return array + */ + protected function getContent(): array + { + $response = [ + 'no' => $this->requestNo + ]; + + if (!is_null($this->error)) + $response['error'] = $this->error; + + if (!is_null(!$this->data)) + $response['data'] = $this->data; + + return $response; + } + + /** + * @return mixed + */ + public function getError() + { + return $this->error; + } + + /** + * @return mixed + */ + public function getData() + { + return $this->data; + } + + /** + * @return int + */ + public function getRequestNo(): int + { + return $this->requestNo; + } + +} \ No newline at end of file -- cgit v1.2.3