From 7b44cbe272e6adf24109a9232a48008a8818f318 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Mon, 1 Mar 2021 02:03:07 +0300 Subject: support new protocol --- src/Client.php | 217 +++++++++++++++++++++++++++++++++++++++++------- src/Message.php | 13 ++- src/PingMessage.php | 15 ++++ src/PongMessage.php | 15 ++++ src/RequestMessage.php | 18 +++- src/ResponseMessage.php | 25 +++++- 6 files changed, 267 insertions(+), 36 deletions(-) create mode 100644 src/PingMessage.php create mode 100644 src/PongMessage.php diff --git a/src/Client.php b/src/Client.php index 5d0048b..2d4b6e1 100644 --- a/src/Client.php +++ b/src/Client.php @@ -9,11 +9,14 @@ class Client { const MASTER_PORT = 7081; const EOT = "\4"; + const REQUEST_NO_LIMIT = 999999; protected $host; protected $port; protected $password; protected $sock; + protected $passwordSent = false; + protected $lastOutgoingRequestNo = null; /** * JobdClient constructor. @@ -30,6 +33,9 @@ class Client { $this->sock = fsockopen($this->host, $this->port); if (!$this->sock) throw new \Exception("Failed to connect to {$this->host}:{$this->port}"); + + // 0 is reserved + $this->lastOutgoingRequestNo = mt_rand(1, self::REQUEST_NO_LIMIT); } /** @@ -37,7 +43,7 @@ class Client { * @throws \Exception */ public function ping() { - $this->send(new RequestMessage('ping')); + $this->send(new PingMessage()); return $this->recv(); } @@ -47,8 +53,9 @@ class Client { * @throws \Exception */ public function poke(array $targets) { - $this->send(new RequestMessage('poke', ['targets' => $targets])); - return $this->recv(); + return $this->recv( + $this->sendRequest(new RequestMessage('poke', ['targets' => $targets])) + ); } /** @@ -56,8 +63,9 @@ class Client { * @throws \Exception */ public function status() { - $this->send(new RequestMessage('status')); - return $this->recv(); + return $this->recv( + $this->sendRequest(new RequestMessage('status')) + ); } /** @@ -66,8 +74,9 @@ class Client { * @throws \Exception */ public function poll(array $targets) { - $this->send(new RequestMessage('poll', ['targets' => $targets])); - return $this->recv(); + return $this->recv( + $this->sendRequest(new RequestMessage('poll', ['targets' => $targets])) + ); } /** @@ -75,27 +84,43 @@ class Client { * @return ResponseMessage */ public function runManual(int $id) { - $this->send(new RequestMessage('run-manual', ['id' => $id])); - return $this->recv(); + return $this->recv( + $this->sendRequest(new RequestMessage('run-manual', ['id' => $id])) + ); } /** * @param RequestMessage $request + * @return int */ - public function send(RequestMessage $request) { - if ($this->password) + public function sendRequest(RequestMessage $request) { + if ($this->password && !$this->passwordSent) { $request->setPassword($this->password); + $this->passwordSent = true; + } + + $no = $this->getNextOutgoingRequestNo(); + $request->setRequestNo($no); - $serialized = $request->serialize(); + $this->send($request); + return $no; + } + + /** + * @param Message $message + */ + protected function send(Message $message) { + $serialized = $message->serialize(); fwrite($this->sock, $serialized . self::EOT); } /** + * @param int $request_no * @return ResponseMessage * @throws \Exception */ - public function recv() { + public function recv(int $request_no = -1) { $messages = []; $buf = ''; while (!feof($this->sock)) { @@ -117,22 +142,73 @@ class Client { } } while ($eot_pos !== false && $offset < $buflen-1); - if (empty($message)) + if (empty($messages)) throw new \Exception("Malformed response: no messages found. Response: {$buf}"); if (count($messages) > 1) trigger_error(__METHOD__.": received more than one message"); - $response = self::parseMessage($messages[0]); - if (!($response instanceof ResponseMessage)) - throw new \Exception('Unexpected message type'); + $response = null; + $messages = array_map('self::parseMessage', $messages); + if ($request_no != -1) { + /** + * @var ResponseMessage[] $messages + */ + $messages = array_filter( + $messages, + + /** + * @param ResponseMessage|RequestMessage $message + */ + function(Message $message) use ($request_no) { + return $message instanceof ResponseMessage + && ($message->getRequestNo() === $request_no || $message->getRequestNo() === 0); + } + ); + + if (empty($messages)) + throw new \Exception("Malformed response: response for {$request_no} not found."); + + + if (count($messages) == 2) { + // weird, we caught response for our $request_no AND a message with reserved zero no + // but anyway + + for ($i = 0; $i < count($messages); $i++) { + $message = $messages[$i]; + + if ($message->getRequestNo() == $request_no) + $response = $message; + + else if ($message->getRequestNo() == 0) + trigger_error(__METHOD__.': received an error with reqno=0: '.($message->getError() ?? null)); + } + } + } - if ($error = $response->getError()) - throw new \Exception('jobd error: '.$response->getError()); + if (is_null($response)) + $response = $messages[0]; + + if ($response instanceof ResponseMessage) { + if ($error = $response->getError()) + throw new \Exception($response->getError()); + } return $response; } + /** + * @return int + */ + protected function getNextOutgoingRequestNo() { + $this->lastOutgoingRequestNo++; + + if ($this->lastOutgoingRequestNo >= self::REQUEST_NO_LIMIT) + $this->lastOutgoingRequestNo = 1; // 0 is reserved + + return $this->lastOutgoingRequestNo; + } + /** * @param string $raw_string * @return RequestMessage|ResponseMessage @@ -140,34 +216,117 @@ class Client { */ protected static function parseMessage(string $raw_string) { $raw = json_decode($raw_string, true); - if (!is_array($raw) || count($raw) != 2) + if (!is_array($raw) || count($raw) < 1) throw new \Exception("Malformed message: {$raw_string}"); - list($type, $data) = $raw; + list($type) = $raw; switch ($type) { case Message::REQUEST: - if (!$data || !is_array($data) || !isset($data['type']) || !is_string($data['type'])) - throw new \Exception('Malformed REQUEST message'); - - $message = new RequestMessage($data['type'], $data['data'] ?? null); + $data = $raw[1]; + try { + self::validateData($data, [ + // name type required + ['type', 's', true], + ['no', 'i', true], + ['password', 's', false], + ['data', 'aifs', false] + ]); + } catch (\Exception $e) { + throw new \Exception("Malformed REQUEST message: {$e->getMessage()}"); + } + + $message = new RequestMessage($data['no'], $data['type'], $data['data'] ?? null); if (isset($data['password'])) $message->setPassword($data['password']); return $message; case Message::RESPONSE: - if (!is_array($data) || count($data) < 2) - throw new \Exception('Malformed RESPONSE message'); + $data = $raw[1]; + try { + self::validateData($data, [ + // name type required + ['no', 'i', true], + ['data', 'aifs', false], + ['error', 's', false], + ]); + } catch (\Exception $e) { + throw new \Exception("Malformed RESPONSE message: {$e->getMessage()}"); + } + + return new ResponseMessage($data['no'], $data['error'] ?? null, $data['data'] ?? null); + + case Message::PING: + return new PingMessage(); + break; - $message = new ResponseMessage(...$data); - return $message; + case Message::PONG: + return new PongMessage(); + break; default: throw new \Exception("Malformed message: unexpected type {$type}"); } } + /** + * @param mixed $data + * @param array $schema + * @return bool + */ + protected static function validateData($data, array $schema) { + if (!$data || !is_array($data)) + throw new \Exception('data must be array'); + + foreach ($schema as $schema_item) { + list ($key_name, $key_types, $key_required) = $schema_item; + if (!isset($data[$key_name])) { + if ($key_required) + throw new \Exception("'{$key_name}' is missing"); + + continue; + } + + $passed = false; + for ($i = 0; $i < strlen($key_types); $i++) { + $type = $key_types[$i]; + + switch ($type) { + case 'i': + if (is_int($data[$key_name])) + $passed = true; + break; + + case 'f': + if (is_float($data[$key_name])) + $passed = true; + break; + + case 's': + if (is_string($data[$key_name])) + $passed = true; + break; + + case 'a': + if (is_array($data[$key_name])) + $passed = true; + break; + + default: + trigger_error(__METHOD__.': unexpected type '.$type); + break; + } + + if ($passed) + break; + } + + if (!$passed) + throw new \Exception("{$key_name}: required type is '{$key_types}'"); + } + } + /** * @return bool */ diff --git a/src/Message.php b/src/Message.php index 647afad..92c6005 100644 --- a/src/Message.php +++ b/src/Message.php @@ -7,6 +7,8 @@ abstract class Message { const REQUEST = 0; const RESPONSE = 1; + const PING = 2; + const PONG = 3; protected $type; @@ -27,10 +29,13 @@ abstract class Message { * @return string */ public function serialize(): string { - return json_encode([ - $this->type, - $this->getContent() - ]); + $data = [$this->type]; + $content = $this->getContent(); + + if (!empty($content)) + $data[] = $content; + + return json_encode($data); } } \ No newline at end of file diff --git a/src/PingMessage.php b/src/PingMessage.php new file mode 100644 index 0000000..2b751a2 --- /dev/null +++ b/src/PingMessage.php @@ -0,0 +1,15 @@ +password = $password; } + /** + * @param int $no + */ + public function setRequestNo(int $no) { + $this->requestNo = $no; + } + /** * @return string[] */ protected function getContent(): array { - $request = ['type' => $this->requestType]; + $request = [ + 'type' => $this->requestType, + 'no' => $this->requestNo, + ]; + if (!is_null($this->requestData)) $request['data'] = $this->requestData; + + if (!is_null($this->password)) + $request['password'] = $this->password; + return $request; } diff --git a/src/ResponseMessage.php b/src/ResponseMessage.php index 9352823..8e687e2 100644 --- a/src/ResponseMessage.php +++ b/src/ResponseMessage.php @@ -4,17 +4,21 @@ namespace jobd; class ResponseMessage extends Message { + protected $requestNo; protected $error; protected $data; /** * Response constructor. + * + * @param int $requestNo * @param null $error * @param null $data */ - public function __construct($error = null, $data = null) { + public function __construct($request_no, $error = null, $data = null) { parent::__construct(Message::RESPONSE); + $this->requestNo = $request_no; $this->error = $error; $this->data = $data; } @@ -23,7 +27,17 @@ class ResponseMessage extends Message { * @return array */ protected function getContent(): array { - return [$this->error, $this->data]; + $response = [ + 'no' => $this->requestNo + ]; + + if (!is_null($this->error)) + $response['error'] = $this->error; + + if (!is_null(!$this->data)) + $response['data'] = $this->data; + + return $response; } /** @@ -40,4 +54,11 @@ class ResponseMessage extends Message { return $this->data; } + /** + * @return int + */ + public function getRequestNo() { + return $this->requestNo; + } + } \ No newline at end of file -- cgit v1.2.3