summaryrefslogtreecommitdiff
path: root/src/Client.php
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-03-01 02:03:07 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-03-01 02:03:07 +0300
commit7b44cbe272e6adf24109a9232a48008a8818f318 (patch)
tree7725bca8517082fe79fe31af1edfca95dfc76987 /src/Client.php
parentc66fc2f691424023b107dfee1295fa165b223f86 (diff)
support new protocol
Diffstat (limited to 'src/Client.php')
-rw-r--r--src/Client.php217
1 files changed, 188 insertions, 29 deletions
diff --git a/src/Client.php b/src/Client.php
index 5d0048b..2d4b6e1 100644
--- a/src/Client.php
+++ b/src/Client.php
@@ -9,11 +9,14 @@ class Client {
const MASTER_PORT = 7081;
const EOT = "\4";
+ const REQUEST_NO_LIMIT = 999999;
protected $host;
protected $port;
protected $password;
protected $sock;
+ protected $passwordSent = false;
+ protected $lastOutgoingRequestNo = null;
/**
* JobdClient constructor.
@@ -30,6 +33,9 @@ class Client {
$this->sock = fsockopen($this->host, $this->port);
if (!$this->sock)
throw new \Exception("Failed to connect to {$this->host}:{$this->port}");
+
+ // 0 is reserved
+ $this->lastOutgoingRequestNo = mt_rand(1, self::REQUEST_NO_LIMIT);
}
/**
@@ -37,7 +43,7 @@ class Client {
* @throws \Exception
*/
public function ping() {
- $this->send(new RequestMessage('ping'));
+ $this->send(new PingMessage());
return $this->recv();
}
@@ -47,8 +53,9 @@ class Client {
* @throws \Exception
*/
public function poke(array $targets) {
- $this->send(new RequestMessage('poke', ['targets' => $targets]));
- return $this->recv();
+ return $this->recv(
+ $this->sendRequest(new RequestMessage('poke', ['targets' => $targets]))
+ );
}
/**
@@ -56,8 +63,9 @@ class Client {
* @throws \Exception
*/
public function status() {
- $this->send(new RequestMessage('status'));
- return $this->recv();
+ return $this->recv(
+ $this->sendRequest(new RequestMessage('status'))
+ );
}
/**
@@ -66,8 +74,9 @@ class Client {
* @throws \Exception
*/
public function poll(array $targets) {
- $this->send(new RequestMessage('poll', ['targets' => $targets]));
- return $this->recv();
+ return $this->recv(
+ $this->sendRequest(new RequestMessage('poll', ['targets' => $targets]))
+ );
}
/**
@@ -75,27 +84,43 @@ class Client {
* @return ResponseMessage
*/
public function runManual(int $id) {
- $this->send(new RequestMessage('run-manual', ['id' => $id]));
- return $this->recv();
+ return $this->recv(
+ $this->sendRequest(new RequestMessage('run-manual', ['id' => $id]))
+ );
}
/**
* @param RequestMessage $request
+ * @return int
*/
- public function send(RequestMessage $request) {
- if ($this->password)
+ public function sendRequest(RequestMessage $request) {
+ if ($this->password && !$this->passwordSent) {
$request->setPassword($this->password);
+ $this->passwordSent = true;
+ }
+
+ $no = $this->getNextOutgoingRequestNo();
+ $request->setRequestNo($no);
- $serialized = $request->serialize();
+ $this->send($request);
+ return $no;
+ }
+
+ /**
+ * @param Message $message
+ */
+ protected function send(Message $message) {
+ $serialized = $message->serialize();
fwrite($this->sock, $serialized . self::EOT);
}
/**
+ * @param int $request_no
* @return ResponseMessage
* @throws \Exception
*/
- public function recv() {
+ public function recv(int $request_no = -1) {
$messages = [];
$buf = '';
while (!feof($this->sock)) {
@@ -117,51 +142,128 @@ class Client {
}
} while ($eot_pos !== false && $offset < $buflen-1);
- if (empty($message))
+ if (empty($messages))
throw new \Exception("Malformed response: no messages found. Response: {$buf}");
if (count($messages) > 1)
trigger_error(__METHOD__.": received more than one message");
- $response = self::parseMessage($messages[0]);
- if (!($response instanceof ResponseMessage))
- throw new \Exception('Unexpected message type');
+ $response = null;
+ $messages = array_map('self::parseMessage', $messages);
+ if ($request_no != -1) {
+ /**
+ * @var ResponseMessage[] $messages
+ */
+ $messages = array_filter(
+ $messages,
+
+ /**
+ * @param ResponseMessage|RequestMessage $message
+ */
+ function(Message $message) use ($request_no) {
+ return $message instanceof ResponseMessage
+ && ($message->getRequestNo() === $request_no || $message->getRequestNo() === 0);
+ }
+ );
+
+ if (empty($messages))
+ throw new \Exception("Malformed response: response for {$request_no} not found.");
+
+
+ if (count($messages) == 2) {
+ // weird, we caught response for our $request_no AND a message with reserved zero no
+ // but anyway
+
+ for ($i = 0; $i < count($messages); $i++) {
+ $message = $messages[$i];
+
+ if ($message->getRequestNo() == $request_no)
+ $response = $message;
+
+ else if ($message->getRequestNo() == 0)
+ trigger_error(__METHOD__.': received an error with reqno=0: '.($message->getError() ?? null));
+ }
+ }
+ }
- if ($error = $response->getError())
- throw new \Exception('jobd error: '.$response->getError());
+ if (is_null($response))
+ $response = $messages[0];
+
+ if ($response instanceof ResponseMessage) {
+ if ($error = $response->getError())
+ throw new \Exception($response->getError());
+ }
return $response;
}
/**
+ * @return int
+ */
+ protected function getNextOutgoingRequestNo() {
+ $this->lastOutgoingRequestNo++;
+
+ if ($this->lastOutgoingRequestNo >= self::REQUEST_NO_LIMIT)
+ $this->lastOutgoingRequestNo = 1; // 0 is reserved
+
+ return $this->lastOutgoingRequestNo;
+ }
+
+ /**
* @param string $raw_string
* @return RequestMessage|ResponseMessage
* @throws \Exception
*/
protected static function parseMessage(string $raw_string) {
$raw = json_decode($raw_string, true);
- if (!is_array($raw) || count($raw) != 2)
+ if (!is_array($raw) || count($raw) < 1)
throw new \Exception("Malformed message: {$raw_string}");
- list($type, $data) = $raw;
+ list($type) = $raw;
switch ($type) {
case Message::REQUEST:
- if (!$data || !is_array($data) || !isset($data['type']) || !is_string($data['type']))
- throw new \Exception('Malformed REQUEST message');
-
- $message = new RequestMessage($data['type'], $data['data'] ?? null);
+ $data = $raw[1];
+ try {
+ self::validateData($data, [
+ // name type required
+ ['type', 's', true],
+ ['no', 'i', true],
+ ['password', 's', false],
+ ['data', 'aifs', false]
+ ]);
+ } catch (\Exception $e) {
+ throw new \Exception("Malformed REQUEST message: {$e->getMessage()}");
+ }
+
+ $message = new RequestMessage($data['no'], $data['type'], $data['data'] ?? null);
if (isset($data['password']))
$message->setPassword($data['password']);
return $message;
case Message::RESPONSE:
- if (!is_array($data) || count($data) < 2)
- throw new \Exception('Malformed RESPONSE message');
+ $data = $raw[1];
+ try {
+ self::validateData($data, [
+ // name type required
+ ['no', 'i', true],
+ ['data', 'aifs', false],
+ ['error', 's', false],
+ ]);
+ } catch (\Exception $e) {
+ throw new \Exception("Malformed RESPONSE message: {$e->getMessage()}");
+ }
+
+ return new ResponseMessage($data['no'], $data['error'] ?? null, $data['data'] ?? null);
+
+ case Message::PING:
+ return new PingMessage();
+ break;
- $message = new ResponseMessage(...$data);
- return $message;
+ case Message::PONG:
+ return new PongMessage();
+ break;
default:
throw new \Exception("Malformed message: unexpected type {$type}");
@@ -169,6 +271,63 @@ class Client {
}
/**
+ * @param mixed $data
+ * @param array $schema
+ * @return bool
+ */
+ protected static function validateData($data, array $schema) {
+ if (!$data || !is_array($data))
+ throw new \Exception('data must be array');
+
+ foreach ($schema as $schema_item) {
+ list ($key_name, $key_types, $key_required) = $schema_item;
+ if (!isset($data[$key_name])) {
+ if ($key_required)
+ throw new \Exception("'{$key_name}' is missing");
+
+ continue;
+ }
+
+ $passed = false;
+ for ($i = 0; $i < strlen($key_types); $i++) {
+ $type = $key_types[$i];
+
+ switch ($type) {
+ case 'i':
+ if (is_int($data[$key_name]))
+ $passed = true;
+ break;
+
+ case 'f':
+ if (is_float($data[$key_name]))
+ $passed = true;
+ break;
+
+ case 's':
+ if (is_string($data[$key_name]))
+ $passed = true;
+ break;
+
+ case 'a':
+ if (is_array($data[$key_name]))
+ $passed = true;
+ break;
+
+ default:
+ trigger_error(__METHOD__.': unexpected type '.$type);
+ break;
+ }
+
+ if ($passed)
+ break;
+ }
+
+ if (!$passed)
+ throw new \Exception("{$key_name}: required type is '{$key_types}'");
+ }
+ }
+
+ /**
* @return bool
*/
public function close() {