aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2023-04-13 02:14:53 +0300
committerEvgeny Zinoviev <me@ch1p.io>2023-04-13 02:14:53 +0300
commitd0fa8d6f63003a6cb2f1897158bc587ab2344ea3 (patch)
tree5a6c165e928354a59eea48030c5de51067d381e4
parent5272c5f541a75b2a7824df4264ad7e69c6e346fa (diff)
1.6.0: support signals
-rw-r--r--.gitignore1
-rw-r--r--README.md4
-rw-r--r--composer.json6
-rw-r--r--composer.lock22
-rw-r--r--src/Client.php56
-rw-r--r--src/Exception.php5
-rw-r--r--src/MasterClient.php39
-rw-r--r--src/WorkerClient.php22
-rw-r--r--src/exceptions/JobInterruptedException.php11
-rw-r--r--src/exceptions/JobdException.php5
-rw-r--r--src/messages/Message.php (renamed from src/Message.php)2
-rw-r--r--src/messages/PingMessage.php (renamed from src/PingMessage.php)2
-rw-r--r--src/messages/PongMessage.php (renamed from src/PongMessage.php)2
-rw-r--r--src/messages/RequestMessage.php (renamed from src/RequestMessage.php)2
-rw-r--r--src/messages/ResponseMessage.php (renamed from src/ResponseMessage.php)2
15 files changed, 136 insertions, 45 deletions
diff --git a/.gitignore b/.gitignore
index 485dee6..45b1244 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,2 @@
.idea
+/vendor \ No newline at end of file
diff --git a/README.md b/README.md
index 97d8d02..9d6993b 100644
--- a/README.md
+++ b/README.md
@@ -19,7 +19,7 @@ Here's a small example.
```php
try {
$jobd = new jobd\MasterClient();
-} catch (jobd\Exception $e) {
+} catch (\jobd\exceptions\JobdException $e) {
die("Failed to connect.\n");
}
@@ -29,7 +29,7 @@ try {
// get status from master
$status = $jobd->status()->getData();
-} catch (jobd\Exception $e) {
+} catch (\jobd\exceptions\JobdException $e) {
die('jobd error: '.$e->getMessage()."\n");
}
diff --git a/composer.json b/composer.json
index f2dde7c..0fbb9f7 100644
--- a/composer.json
+++ b/composer.json
@@ -1,6 +1,6 @@
{
"name": "ch1p/jobd-client",
- "version": "1.5.2",
+ "version": "1.6.0",
"license": "BSD-2-Clause",
"keywords": ["queue", "job", "jobd"],
"repositories": [
@@ -11,7 +11,9 @@
],
"autoload": {
"psr-4": {
- "jobd\\": "src/"
+ "jobd\\": "src/",
+ "jobd\\messages\\": "src/messages",
+ "jobd\\exceptions\\": "src/exceptions"
}
},
"require": {
diff --git a/composer.lock b/composer.lock
new file mode 100644
index 0000000..768bcea
--- /dev/null
+++ b/composer.lock
@@ -0,0 +1,22 @@
+{
+ "_readme": [
+ "This file locks the dependencies of your project to a known state",
+ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
+ "This file is @generated automatically"
+ ],
+ "content-hash": "3ea2178ffa7d2dde56d1c2ac872d7c34",
+ "packages": [],
+ "packages-dev": [],
+ "aliases": [],
+ "minimum-stability": "stable",
+ "stability-flags": [],
+ "prefer-stable": false,
+ "prefer-lowest": false,
+ "platform": {
+ "php": ">=7.0",
+ "ext-json": "*",
+ "ext-sockets": "*"
+ },
+ "platform-dev": [],
+ "plugin-api-version": "2.3.0"
+}
diff --git a/src/Client.php b/src/Client.php
index b4628ca..b66b6e7 100644
--- a/src/Client.php
+++ b/src/Client.php
@@ -2,8 +2,14 @@
namespace jobd;
+use jobd\exceptions\JobdException;
+use jobd\messages\Message;
+use jobd\messages\PingMessage;
+use jobd\messages\PongMessage;
+use jobd\messages\RequestMessage;
+use jobd\messages\ResponseMessage;
-class Client {
+abstract class Client {
const WORKER_PORT = 7080;
const MASTER_PORT = 7081;
@@ -23,7 +29,7 @@ class Client {
* @param int $port
* @param string $host
* @param string $password
- * @throws Exception
+ * @throws JobdException
*/
public function __construct(int $port, string $host = '127.0.0.1', string $password = '')
{
@@ -32,12 +38,12 @@ class Client {
$this->password = $password;
if (($socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) === false)
- throw new Exception("socket_create() failed: ".$this->getSocketError());
+ throw new JobdException("socket_create() failed: ".$this->getSocketError());
$this->sock = $socket;
if ((socket_connect($socket, $host, $port)) === false)
- throw new Exception("socket_connect() failed: ".$this->getSocketError());
+ throw new JobdException("socket_connect() failed: ".$this->getSocketError());
$this->lastOutgoingRequestNo = mt_rand(1 /* 0 is reserved */, self::REQUEST_NO_LIMIT);
}
@@ -53,7 +59,7 @@ class Client {
/**
* @param string[] $targets
* @return ResponseMessage
- * @throws Exception
+ * @throws JobdException
*/
public function pause(array $targets = []): ResponseMessage
{
@@ -69,7 +75,7 @@ class Client {
/**
* @param string[] $targets
* @return ResponseMessage
- * @throws Exception
+ * @throws JobdException
*/
public function continue(array $targets = []): ResponseMessage
{
@@ -84,7 +90,7 @@ class Client {
/**
* @return PongMessage
- * @throws Exception
+ * @throws JobdException
*/
public function ping(): PongMessage
{
@@ -113,7 +119,7 @@ class Client {
/**
* @param Message $message
- * @throws Exception
+ * @throws JobdException
*/
public function send(Message $message)
{
@@ -123,7 +129,7 @@ class Client {
while ($remained > 0) {
$result = socket_write($this->sock, $data);
if ($result === false)
- throw new Exception(__METHOD__ . ": socket_write() failed: ".$this->getSocketError());
+ throw new JobdException(__METHOD__ . ": socket_write() failed: ".$this->getSocketError());
$remained -= $result;
if ($remained > 0)
@@ -134,7 +140,7 @@ class Client {
/**
* @param int $request_no
* @return RequestMessage|ResponseMessage|PingMessage|PongMessage
- * @throws Exception
+ * @throws JobdException
*/
public function recv(int $request_no = -1)
{
@@ -145,7 +151,7 @@ class Client {
while (true) {
$result = socket_recv($this->sock, $recv_buf, 1024, 0);
if ($result === false)
- throw new Exception(__METHOD__ . ": socket_recv() failed: " . $this->getSocketError());
+ throw new JobdException(__METHOD__ . ": socket_recv() failed: " . $this->getSocketError());
// peer disconnected
if ($result === 0)
@@ -172,7 +178,7 @@ class Client {
} while ($eot_pos !== false && $offset < $buflen-1);
if (empty($messages))
- throw new Exception("Malformed response: no messages found. Response: {$buf}");
+ throw new JobdException("Malformed response: no messages found. Response: {$buf}");
if (count($messages) > 1)
trigger_error(__METHOD__.": received more than one message");
@@ -196,7 +202,7 @@ class Client {
);
if (empty($messages))
- throw new Exception("Malformed response: response for {$request_no} not found.");
+ throw new JobdException("Malformed response: response for {$request_no} not found.");
if (count($messages) == 2) {
@@ -220,7 +226,7 @@ class Client {
if ($response instanceof ResponseMessage) {
if ($error = $response->getError())
- throw new Exception($response->getError());
+ throw new JobdException($response->getError());
}
return $response;
@@ -242,13 +248,13 @@ class Client {
/**
* @param string $raw_string
* @return RequestMessage|ResponseMessage|PingMessage|PongMessage
- * @throws Exception
+ * @throws JobdException
*/
protected static function parseMessage(string $raw_string)
{
$raw = json_decode($raw_string, true);
if (!is_array($raw) || count($raw) < 1)
- throw new Exception("Malformed message: {$raw_string}");
+ throw new JobdException("Malformed message: {$raw_string}");
list($type) = $raw;
@@ -263,8 +269,8 @@ class Client {
['password', 's', false],
['data', 'a', false]
]);
- } catch (Exception $e) {
- throw new Exception("Malformed REQUEST message: {$e->getMessage()}");
+ } catch (JobdException $e) {
+ throw new JobdException("Malformed REQUEST message: {$e->getMessage()}");
}
$message = new RequestMessage($data['type'], $data['data'] ?? null);
@@ -283,8 +289,8 @@ class Client {
['data', 'aifs', false],
['error', 's', false],
]);
- } catch (Exception $e) {
- throw new Exception("Malformed RESPONSE message: {$e->getMessage()}");
+ } catch (JobdException $e) {
+ throw new JobdException("Malformed RESPONSE message: {$e->getMessage()}");
}
return new ResponseMessage($data['no'], $data['error'] ?? null, $data['data'] ?? null);
@@ -296,25 +302,25 @@ class Client {
return new PongMessage();
default:
- throw new Exception("Malformed message: unexpected type {$type}");
+ throw new JobdException("Malformed message: unexpected type {$type}");
}
}
/**
* @param mixed $data
* @param array $schema
- * @throws Exception
+ * @throws JobdException
*/
protected static function validateData($data, array $schema)
{
if (!$data || !is_array($data))
- throw new Exception('data must be array');
+ throw new JobdException('data must be array');
foreach ($schema as $schema_item) {
list ($key_name, $key_types, $key_required) = $schema_item;
if (!isset($data[$key_name])) {
if ($key_required)
- throw new Exception("'{$key_name}' is missing");
+ throw new JobdException("'{$key_name}' is missing");
continue;
}
@@ -354,7 +360,7 @@ class Client {
}
if (!$passed)
- throw new Exception("{$key_name}: required type is '{$key_types}'");
+ throw new JobdException("{$key_name}: required type is '{$key_types}'");
}
}
diff --git a/src/Exception.php b/src/Exception.php
deleted file mode 100644
index be64d4c..0000000
--- a/src/Exception.php
+++ /dev/null
@@ -1,5 +0,0 @@
-<?php
-
-namespace jobd;
-
-class Exception extends \Exception {} \ No newline at end of file
diff --git a/src/MasterClient.php b/src/MasterClient.php
index d85e454..2fed338 100644
--- a/src/MasterClient.php
+++ b/src/MasterClient.php
@@ -2,6 +2,10 @@
namespace jobd;
+use jobd\exceptions\JobdException;
+use jobd\messages\RequestMessage;
+use jobd\messages\ResponseMessage;
+
class MasterClient extends Client {
public function __construct(int $port = Client::MASTER_PORT, ...$args)
@@ -12,7 +16,7 @@ class MasterClient extends Client {
/**
* @param array $targets
* @return ResponseMessage
- * @throws Exception
+ * @throws JobdException
*/
public function poke(array $targets): ResponseMessage
{
@@ -24,7 +28,7 @@ class MasterClient extends Client {
/**
* @param bool $poll_workers
* @return ResponseMessage
- * @throws Exception
+ * @throws JobdException
*/
public function status(bool $poll_workers = false): ResponseMessage
{
@@ -36,7 +40,7 @@ class MasterClient extends Client {
/**
* @param array[] $jobs
* @return ResponseMessage
- * @throws Exception
+ * @throws JobdException
*/
public function runManual(array $jobs): ResponseMessage
{
@@ -45,4 +49,33 @@ class MasterClient extends Client {
);
}
+ /**
+ * @param int $job_id
+ * @param int $signal
+ * @param string $target
+ * @return ResponseMessage
+ * @throws JobdException
+ */
+ public function sendSignal(int $job_id, int $signal, string $target): ResponseMessage
+ {
+ return $this->sendSignals([
+ [
+ 'id' => $job_id,
+ 'signal' => $signal,
+ 'target' => $target
+ ]
+ ]);
+ }
+
+ /**
+ * @param array $data
+ * @return ResponseMessage
+ * @throws JobdException
+ */
+ public function sendSignals(array $data): ResponseMessage {
+ return $this->recv(
+ $this->sendRequest(new RequestMessage('send-signal', ['jobs' => $data]))
+ );
+ }
+
} \ No newline at end of file
diff --git a/src/WorkerClient.php b/src/WorkerClient.php
index 2773de5..16f8963 100644
--- a/src/WorkerClient.php
+++ b/src/WorkerClient.php
@@ -2,6 +2,10 @@
namespace jobd;
+use jobd\exceptions\JobdException;
+use jobd\messages\RequestMessage;
+use jobd\messages\ResponseMessage;
+
class WorkerClient extends Client {
public function __construct(int $port = Client::WORKER_PORT, ...$args)
@@ -49,10 +53,22 @@ class WorkerClient extends Client {
}
/**
+ * @param array[] $jobs
+ * @return ResponseMessage
+ * @throws JobdException
+ */
+ public function sendSignal(array $jobs): ResponseMessage
+ {
+ return $this->recv(
+ $this->sendRequest(new RequestMessage('send-signal', ['jobs' => $jobs]))
+ );
+ }
+
+ /**
* @param string $target
* @param int $concurrency
* @return ResponseMessage
- * @throws Exception
+ * @throws JobdException
*/
public function addTarget(string $target, int $concurrency): ResponseMessage
{
@@ -67,7 +83,7 @@ class WorkerClient extends Client {
/**
* @param string $target
* @return ResponseMessage
- * @throws Exception
+ * @throws JobdException
*/
public function removeTarget(string $target): ResponseMessage
{
@@ -82,7 +98,7 @@ class WorkerClient extends Client {
* @param string $target
* @param int $concurrency
* @return ResponseMessage
- * @throws Exception
+ * @throws JobdException
*/
public function setTargetConcurrency(string $target, int $concurrency): ResponseMessage
{
diff --git a/src/exceptions/JobInterruptedException.php b/src/exceptions/JobInterruptedException.php
new file mode 100644
index 0000000..c645cc5
--- /dev/null
+++ b/src/exceptions/JobInterruptedException.php
@@ -0,0 +1,11 @@
+<?php
+
+namespace jobd\exceptions;
+
+class JobInterruptedException extends \Exception {
+
+ public function __construct(int $code = 1, string $message = "") {
+ parent::__construct($message, $code);
+ }
+
+} \ No newline at end of file
diff --git a/src/exceptions/JobdException.php b/src/exceptions/JobdException.php
new file mode 100644
index 0000000..59b218c
--- /dev/null
+++ b/src/exceptions/JobdException.php
@@ -0,0 +1,5 @@
+<?php
+
+namespace jobd\exceptions;
+
+class JobdException extends \Exception {} \ No newline at end of file
diff --git a/src/Message.php b/src/messages/Message.php
index d5df6ab..df7464e 100644
--- a/src/Message.php
+++ b/src/messages/Message.php
@@ -1,6 +1,6 @@
<?php
-namespace jobd;
+namespace jobd\messages;
abstract class Message {
diff --git a/src/PingMessage.php b/src/messages/PingMessage.php
index f3441c9..fa61379 100644
--- a/src/PingMessage.php
+++ b/src/messages/PingMessage.php
@@ -1,6 +1,6 @@
<?php
-namespace jobd;
+namespace jobd\messages;
class PingMessage extends Message {
diff --git a/src/PongMessage.php b/src/messages/PongMessage.php
index a0a38a0..00bb499 100644
--- a/src/PongMessage.php
+++ b/src/messages/PongMessage.php
@@ -1,6 +1,6 @@
<?php
-namespace jobd;
+namespace jobd\messages;
class PongMessage extends Message {
diff --git a/src/RequestMessage.php b/src/messages/RequestMessage.php
index a2e196d..12ab7ad 100644
--- a/src/RequestMessage.php
+++ b/src/messages/RequestMessage.php
@@ -1,6 +1,6 @@
<?php
-namespace jobd;
+namespace jobd\messages;
class RequestMessage extends Message {
diff --git a/src/ResponseMessage.php b/src/messages/ResponseMessage.php
index 1f91dd4..5101ed4 100644
--- a/src/ResponseMessage.php
+++ b/src/messages/ResponseMessage.php
@@ -1,6 +1,6 @@
<?php
-namespace jobd;
+namespace jobd\messages;
class ResponseMessage extends Message {