diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2023-04-13 02:14:53 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2023-04-13 02:14:53 +0300 |
commit | d0fa8d6f63003a6cb2f1897158bc587ab2344ea3 (patch) | |
tree | 5a6c165e928354a59eea48030c5de51067d381e4 | |
parent | 5272c5f541a75b2a7824df4264ad7e69c6e346fa (diff) |
1.6.0: support signals
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | README.md | 4 | ||||
-rw-r--r-- | composer.json | 6 | ||||
-rw-r--r-- | composer.lock | 22 | ||||
-rw-r--r-- | src/Client.php | 56 | ||||
-rw-r--r-- | src/Exception.php | 5 | ||||
-rw-r--r-- | src/MasterClient.php | 39 | ||||
-rw-r--r-- | src/WorkerClient.php | 22 | ||||
-rw-r--r-- | src/exceptions/JobInterruptedException.php | 11 | ||||
-rw-r--r-- | src/exceptions/JobdException.php | 5 | ||||
-rw-r--r-- | src/messages/Message.php (renamed from src/Message.php) | 2 | ||||
-rw-r--r-- | src/messages/PingMessage.php (renamed from src/PingMessage.php) | 2 | ||||
-rw-r--r-- | src/messages/PongMessage.php (renamed from src/PongMessage.php) | 2 | ||||
-rw-r--r-- | src/messages/RequestMessage.php (renamed from src/RequestMessage.php) | 2 | ||||
-rw-r--r-- | src/messages/ResponseMessage.php (renamed from src/ResponseMessage.php) | 2 |
15 files changed, 136 insertions, 45 deletions
@@ -1 +1,2 @@ .idea +/vendor
\ No newline at end of file @@ -19,7 +19,7 @@ Here's a small example. ```php try { $jobd = new jobd\MasterClient(); -} catch (jobd\Exception $e) { +} catch (\jobd\exceptions\JobdException $e) { die("Failed to connect.\n"); } @@ -29,7 +29,7 @@ try { // get status from master $status = $jobd->status()->getData(); -} catch (jobd\Exception $e) { +} catch (\jobd\exceptions\JobdException $e) { die('jobd error: '.$e->getMessage()."\n"); } diff --git a/composer.json b/composer.json index f2dde7c..0fbb9f7 100644 --- a/composer.json +++ b/composer.json @@ -1,6 +1,6 @@ { "name": "ch1p/jobd-client", - "version": "1.5.2", + "version": "1.6.0", "license": "BSD-2-Clause", "keywords": ["queue", "job", "jobd"], "repositories": [ @@ -11,7 +11,9 @@ ], "autoload": { "psr-4": { - "jobd\\": "src/" + "jobd\\": "src/", + "jobd\\messages\\": "src/messages", + "jobd\\exceptions\\": "src/exceptions" } }, "require": { diff --git a/composer.lock b/composer.lock new file mode 100644 index 0000000..768bcea --- /dev/null +++ b/composer.lock @@ -0,0 +1,22 @@ +{ + "_readme": [ + "This file locks the dependencies of your project to a known state", + "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", + "This file is @generated automatically" + ], + "content-hash": "3ea2178ffa7d2dde56d1c2ac872d7c34", + "packages": [], + "packages-dev": [], + "aliases": [], + "minimum-stability": "stable", + "stability-flags": [], + "prefer-stable": false, + "prefer-lowest": false, + "platform": { + "php": ">=7.0", + "ext-json": "*", + "ext-sockets": "*" + }, + "platform-dev": [], + "plugin-api-version": "2.3.0" +} diff --git a/src/Client.php b/src/Client.php index b4628ca..b66b6e7 100644 --- a/src/Client.php +++ b/src/Client.php @@ -2,8 +2,14 @@ namespace jobd; +use jobd\exceptions\JobdException; +use jobd\messages\Message; +use jobd\messages\PingMessage; +use jobd\messages\PongMessage; +use jobd\messages\RequestMessage; +use jobd\messages\ResponseMessage; -class Client { +abstract class Client { const WORKER_PORT = 7080; const MASTER_PORT = 7081; @@ -23,7 +29,7 @@ class Client { * @param int $port * @param string $host * @param string $password - * @throws Exception + * @throws JobdException */ public function __construct(int $port, string $host = '127.0.0.1', string $password = '') { @@ -32,12 +38,12 @@ class Client { $this->password = $password; if (($socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) === false) - throw new Exception("socket_create() failed: ".$this->getSocketError()); + throw new JobdException("socket_create() failed: ".$this->getSocketError()); $this->sock = $socket; if ((socket_connect($socket, $host, $port)) === false) - throw new Exception("socket_connect() failed: ".$this->getSocketError()); + throw new JobdException("socket_connect() failed: ".$this->getSocketError()); $this->lastOutgoingRequestNo = mt_rand(1 /* 0 is reserved */, self::REQUEST_NO_LIMIT); } @@ -53,7 +59,7 @@ class Client { /** * @param string[] $targets * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function pause(array $targets = []): ResponseMessage { @@ -69,7 +75,7 @@ class Client { /** * @param string[] $targets * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function continue(array $targets = []): ResponseMessage { @@ -84,7 +90,7 @@ class Client { /** * @return PongMessage - * @throws Exception + * @throws JobdException */ public function ping(): PongMessage { @@ -113,7 +119,7 @@ class Client { /** * @param Message $message - * @throws Exception + * @throws JobdException */ public function send(Message $message) { @@ -123,7 +129,7 @@ class Client { while ($remained > 0) { $result = socket_write($this->sock, $data); if ($result === false) - throw new Exception(__METHOD__ . ": socket_write() failed: ".$this->getSocketError()); + throw new JobdException(__METHOD__ . ": socket_write() failed: ".$this->getSocketError()); $remained -= $result; if ($remained > 0) @@ -134,7 +140,7 @@ class Client { /** * @param int $request_no * @return RequestMessage|ResponseMessage|PingMessage|PongMessage - * @throws Exception + * @throws JobdException */ public function recv(int $request_no = -1) { @@ -145,7 +151,7 @@ class Client { while (true) { $result = socket_recv($this->sock, $recv_buf, 1024, 0); if ($result === false) - throw new Exception(__METHOD__ . ": socket_recv() failed: " . $this->getSocketError()); + throw new JobdException(__METHOD__ . ": socket_recv() failed: " . $this->getSocketError()); // peer disconnected if ($result === 0) @@ -172,7 +178,7 @@ class Client { } while ($eot_pos !== false && $offset < $buflen-1); if (empty($messages)) - throw new Exception("Malformed response: no messages found. Response: {$buf}"); + throw new JobdException("Malformed response: no messages found. Response: {$buf}"); if (count($messages) > 1) trigger_error(__METHOD__.": received more than one message"); @@ -196,7 +202,7 @@ class Client { ); if (empty($messages)) - throw new Exception("Malformed response: response for {$request_no} not found."); + throw new JobdException("Malformed response: response for {$request_no} not found."); if (count($messages) == 2) { @@ -220,7 +226,7 @@ class Client { if ($response instanceof ResponseMessage) { if ($error = $response->getError()) - throw new Exception($response->getError()); + throw new JobdException($response->getError()); } return $response; @@ -242,13 +248,13 @@ class Client { /** * @param string $raw_string * @return RequestMessage|ResponseMessage|PingMessage|PongMessage - * @throws Exception + * @throws JobdException */ protected static function parseMessage(string $raw_string) { $raw = json_decode($raw_string, true); if (!is_array($raw) || count($raw) < 1) - throw new Exception("Malformed message: {$raw_string}"); + throw new JobdException("Malformed message: {$raw_string}"); list($type) = $raw; @@ -263,8 +269,8 @@ class Client { ['password', 's', false], ['data', 'a', false] ]); - } catch (Exception $e) { - throw new Exception("Malformed REQUEST message: {$e->getMessage()}"); + } catch (JobdException $e) { + throw new JobdException("Malformed REQUEST message: {$e->getMessage()}"); } $message = new RequestMessage($data['type'], $data['data'] ?? null); @@ -283,8 +289,8 @@ class Client { ['data', 'aifs', false], ['error', 's', false], ]); - } catch (Exception $e) { - throw new Exception("Malformed RESPONSE message: {$e->getMessage()}"); + } catch (JobdException $e) { + throw new JobdException("Malformed RESPONSE message: {$e->getMessage()}"); } return new ResponseMessage($data['no'], $data['error'] ?? null, $data['data'] ?? null); @@ -296,25 +302,25 @@ class Client { return new PongMessage(); default: - throw new Exception("Malformed message: unexpected type {$type}"); + throw new JobdException("Malformed message: unexpected type {$type}"); } } /** * @param mixed $data * @param array $schema - * @throws Exception + * @throws JobdException */ protected static function validateData($data, array $schema) { if (!$data || !is_array($data)) - throw new Exception('data must be array'); + throw new JobdException('data must be array'); foreach ($schema as $schema_item) { list ($key_name, $key_types, $key_required) = $schema_item; if (!isset($data[$key_name])) { if ($key_required) - throw new Exception("'{$key_name}' is missing"); + throw new JobdException("'{$key_name}' is missing"); continue; } @@ -354,7 +360,7 @@ class Client { } if (!$passed) - throw new Exception("{$key_name}: required type is '{$key_types}'"); + throw new JobdException("{$key_name}: required type is '{$key_types}'"); } } diff --git a/src/Exception.php b/src/Exception.php deleted file mode 100644 index be64d4c..0000000 --- a/src/Exception.php +++ /dev/null @@ -1,5 +0,0 @@ -<?php - -namespace jobd; - -class Exception extends \Exception {}
\ No newline at end of file diff --git a/src/MasterClient.php b/src/MasterClient.php index d85e454..2fed338 100644 --- a/src/MasterClient.php +++ b/src/MasterClient.php @@ -2,6 +2,10 @@ namespace jobd; +use jobd\exceptions\JobdException; +use jobd\messages\RequestMessage; +use jobd\messages\ResponseMessage; + class MasterClient extends Client { public function __construct(int $port = Client::MASTER_PORT, ...$args) @@ -12,7 +16,7 @@ class MasterClient extends Client { /** * @param array $targets * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function poke(array $targets): ResponseMessage { @@ -24,7 +28,7 @@ class MasterClient extends Client { /** * @param bool $poll_workers * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function status(bool $poll_workers = false): ResponseMessage { @@ -36,7 +40,7 @@ class MasterClient extends Client { /** * @param array[] $jobs * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function runManual(array $jobs): ResponseMessage { @@ -45,4 +49,33 @@ class MasterClient extends Client { ); } + /** + * @param int $job_id + * @param int $signal + * @param string $target + * @return ResponseMessage + * @throws JobdException + */ + public function sendSignal(int $job_id, int $signal, string $target): ResponseMessage + { + return $this->sendSignals([ + [ + 'id' => $job_id, + 'signal' => $signal, + 'target' => $target + ] + ]); + } + + /** + * @param array $data + * @return ResponseMessage + * @throws JobdException + */ + public function sendSignals(array $data): ResponseMessage { + return $this->recv( + $this->sendRequest(new RequestMessage('send-signal', ['jobs' => $data])) + ); + } + }
\ No newline at end of file diff --git a/src/WorkerClient.php b/src/WorkerClient.php index 2773de5..16f8963 100644 --- a/src/WorkerClient.php +++ b/src/WorkerClient.php @@ -2,6 +2,10 @@ namespace jobd; +use jobd\exceptions\JobdException; +use jobd\messages\RequestMessage; +use jobd\messages\ResponseMessage; + class WorkerClient extends Client { public function __construct(int $port = Client::WORKER_PORT, ...$args) @@ -49,10 +53,22 @@ class WorkerClient extends Client { } /** + * @param array[] $jobs + * @return ResponseMessage + * @throws JobdException + */ + public function sendSignal(array $jobs): ResponseMessage + { + return $this->recv( + $this->sendRequest(new RequestMessage('send-signal', ['jobs' => $jobs])) + ); + } + + /** * @param string $target * @param int $concurrency * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function addTarget(string $target, int $concurrency): ResponseMessage { @@ -67,7 +83,7 @@ class WorkerClient extends Client { /** * @param string $target * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function removeTarget(string $target): ResponseMessage { @@ -82,7 +98,7 @@ class WorkerClient extends Client { * @param string $target * @param int $concurrency * @return ResponseMessage - * @throws Exception + * @throws JobdException */ public function setTargetConcurrency(string $target, int $concurrency): ResponseMessage { diff --git a/src/exceptions/JobInterruptedException.php b/src/exceptions/JobInterruptedException.php new file mode 100644 index 0000000..c645cc5 --- /dev/null +++ b/src/exceptions/JobInterruptedException.php @@ -0,0 +1,11 @@ +<?php + +namespace jobd\exceptions; + +class JobInterruptedException extends \Exception { + + public function __construct(int $code = 1, string $message = "") { + parent::__construct($message, $code); + } + +}
\ No newline at end of file diff --git a/src/exceptions/JobdException.php b/src/exceptions/JobdException.php new file mode 100644 index 0000000..59b218c --- /dev/null +++ b/src/exceptions/JobdException.php @@ -0,0 +1,5 @@ +<?php + +namespace jobd\exceptions; + +class JobdException extends \Exception {}
\ No newline at end of file diff --git a/src/Message.php b/src/messages/Message.php index d5df6ab..df7464e 100644 --- a/src/Message.php +++ b/src/messages/Message.php @@ -1,6 +1,6 @@ <?php -namespace jobd; +namespace jobd\messages; abstract class Message { diff --git a/src/PingMessage.php b/src/messages/PingMessage.php index f3441c9..fa61379 100644 --- a/src/PingMessage.php +++ b/src/messages/PingMessage.php @@ -1,6 +1,6 @@ <?php -namespace jobd; +namespace jobd\messages; class PingMessage extends Message { diff --git a/src/PongMessage.php b/src/messages/PongMessage.php index a0a38a0..00bb499 100644 --- a/src/PongMessage.php +++ b/src/messages/PongMessage.php @@ -1,6 +1,6 @@ <?php -namespace jobd; +namespace jobd\messages; class PongMessage extends Message { diff --git a/src/RequestMessage.php b/src/messages/RequestMessage.php index a2e196d..12ab7ad 100644 --- a/src/RequestMessage.php +++ b/src/messages/RequestMessage.php @@ -1,6 +1,6 @@ <?php -namespace jobd; +namespace jobd\messages; class RequestMessage extends Message { diff --git a/src/ResponseMessage.php b/src/messages/ResponseMessage.php index 1f91dd4..5101ed4 100644 --- a/src/ResponseMessage.php +++ b/src/messages/ResponseMessage.php @@ -1,6 +1,6 @@ <?php -namespace jobd; +namespace jobd\messages; class ResponseMessage extends Message { |