diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2021-03-01 02:03:07 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2021-03-01 02:03:07 +0300 |
commit | 7b44cbe272e6adf24109a9232a48008a8818f318 (patch) | |
tree | 7725bca8517082fe79fe31af1edfca95dfc76987 /src/Client.php | |
parent | c66fc2f691424023b107dfee1295fa165b223f86 (diff) |
support new protocol
Diffstat (limited to 'src/Client.php')
-rw-r--r-- | src/Client.php | 217 |
1 files changed, 188 insertions, 29 deletions
diff --git a/src/Client.php b/src/Client.php index 5d0048b..2d4b6e1 100644 --- a/src/Client.php +++ b/src/Client.php @@ -9,11 +9,14 @@ class Client { const MASTER_PORT = 7081; const EOT = "\4"; + const REQUEST_NO_LIMIT = 999999; protected $host; protected $port; protected $password; protected $sock; + protected $passwordSent = false; + protected $lastOutgoingRequestNo = null; /** * JobdClient constructor. @@ -30,6 +33,9 @@ class Client { $this->sock = fsockopen($this->host, $this->port); if (!$this->sock) throw new \Exception("Failed to connect to {$this->host}:{$this->port}"); + + // 0 is reserved + $this->lastOutgoingRequestNo = mt_rand(1, self::REQUEST_NO_LIMIT); } /** @@ -37,7 +43,7 @@ class Client { * @throws \Exception */ public function ping() { - $this->send(new RequestMessage('ping')); + $this->send(new PingMessage()); return $this->recv(); } @@ -47,8 +53,9 @@ class Client { * @throws \Exception */ public function poke(array $targets) { - $this->send(new RequestMessage('poke', ['targets' => $targets])); - return $this->recv(); + return $this->recv( + $this->sendRequest(new RequestMessage('poke', ['targets' => $targets])) + ); } /** @@ -56,8 +63,9 @@ class Client { * @throws \Exception */ public function status() { - $this->send(new RequestMessage('status')); - return $this->recv(); + return $this->recv( + $this->sendRequest(new RequestMessage('status')) + ); } /** @@ -66,8 +74,9 @@ class Client { * @throws \Exception */ public function poll(array $targets) { - $this->send(new RequestMessage('poll', ['targets' => $targets])); - return $this->recv(); + return $this->recv( + $this->sendRequest(new RequestMessage('poll', ['targets' => $targets])) + ); } /** @@ -75,27 +84,43 @@ class Client { * @return ResponseMessage */ public function runManual(int $id) { - $this->send(new RequestMessage('run-manual', ['id' => $id])); - return $this->recv(); + return $this->recv( + $this->sendRequest(new RequestMessage('run-manual', ['id' => $id])) + ); } /** * @param RequestMessage $request + * @return int */ - public function send(RequestMessage $request) { - if ($this->password) + public function sendRequest(RequestMessage $request) { + if ($this->password && !$this->passwordSent) { $request->setPassword($this->password); + $this->passwordSent = true; + } + + $no = $this->getNextOutgoingRequestNo(); + $request->setRequestNo($no); - $serialized = $request->serialize(); + $this->send($request); + return $no; + } + + /** + * @param Message $message + */ + protected function send(Message $message) { + $serialized = $message->serialize(); fwrite($this->sock, $serialized . self::EOT); } /** + * @param int $request_no * @return ResponseMessage * @throws \Exception */ - public function recv() { + public function recv(int $request_no = -1) { $messages = []; $buf = ''; while (!feof($this->sock)) { @@ -117,51 +142,128 @@ class Client { } } while ($eot_pos !== false && $offset < $buflen-1); - if (empty($message)) + if (empty($messages)) throw new \Exception("Malformed response: no messages found. Response: {$buf}"); if (count($messages) > 1) trigger_error(__METHOD__.": received more than one message"); - $response = self::parseMessage($messages[0]); - if (!($response instanceof ResponseMessage)) - throw new \Exception('Unexpected message type'); + $response = null; + $messages = array_map('self::parseMessage', $messages); + if ($request_no != -1) { + /** + * @var ResponseMessage[] $messages + */ + $messages = array_filter( + $messages, + + /** + * @param ResponseMessage|RequestMessage $message + */ + function(Message $message) use ($request_no) { + return $message instanceof ResponseMessage + && ($message->getRequestNo() === $request_no || $message->getRequestNo() === 0); + } + ); + + if (empty($messages)) + throw new \Exception("Malformed response: response for {$request_no} not found."); + + + if (count($messages) == 2) { + // weird, we caught response for our $request_no AND a message with reserved zero no + // but anyway + + for ($i = 0; $i < count($messages); $i++) { + $message = $messages[$i]; + + if ($message->getRequestNo() == $request_no) + $response = $message; + + else if ($message->getRequestNo() == 0) + trigger_error(__METHOD__.': received an error with reqno=0: '.($message->getError() ?? null)); + } + } + } - if ($error = $response->getError()) - throw new \Exception('jobd error: '.$response->getError()); + if (is_null($response)) + $response = $messages[0]; + + if ($response instanceof ResponseMessage) { + if ($error = $response->getError()) + throw new \Exception($response->getError()); + } return $response; } /** + * @return int + */ + protected function getNextOutgoingRequestNo() { + $this->lastOutgoingRequestNo++; + + if ($this->lastOutgoingRequestNo >= self::REQUEST_NO_LIMIT) + $this->lastOutgoingRequestNo = 1; // 0 is reserved + + return $this->lastOutgoingRequestNo; + } + + /** * @param string $raw_string * @return RequestMessage|ResponseMessage * @throws \Exception */ protected static function parseMessage(string $raw_string) { $raw = json_decode($raw_string, true); - if (!is_array($raw) || count($raw) != 2) + if (!is_array($raw) || count($raw) < 1) throw new \Exception("Malformed message: {$raw_string}"); - list($type, $data) = $raw; + list($type) = $raw; switch ($type) { case Message::REQUEST: - if (!$data || !is_array($data) || !isset($data['type']) || !is_string($data['type'])) - throw new \Exception('Malformed REQUEST message'); - - $message = new RequestMessage($data['type'], $data['data'] ?? null); + $data = $raw[1]; + try { + self::validateData($data, [ + // name type required + ['type', 's', true], + ['no', 'i', true], + ['password', 's', false], + ['data', 'aifs', false] + ]); + } catch (\Exception $e) { + throw new \Exception("Malformed REQUEST message: {$e->getMessage()}"); + } + + $message = new RequestMessage($data['no'], $data['type'], $data['data'] ?? null); if (isset($data['password'])) $message->setPassword($data['password']); return $message; case Message::RESPONSE: - if (!is_array($data) || count($data) < 2) - throw new \Exception('Malformed RESPONSE message'); + $data = $raw[1]; + try { + self::validateData($data, [ + // name type required + ['no', 'i', true], + ['data', 'aifs', false], + ['error', 's', false], + ]); + } catch (\Exception $e) { + throw new \Exception("Malformed RESPONSE message: {$e->getMessage()}"); + } + + return new ResponseMessage($data['no'], $data['error'] ?? null, $data['data'] ?? null); + + case Message::PING: + return new PingMessage(); + break; - $message = new ResponseMessage(...$data); - return $message; + case Message::PONG: + return new PongMessage(); + break; default: throw new \Exception("Malformed message: unexpected type {$type}"); @@ -169,6 +271,63 @@ class Client { } /** + * @param mixed $data + * @param array $schema + * @return bool + */ + protected static function validateData($data, array $schema) { + if (!$data || !is_array($data)) + throw new \Exception('data must be array'); + + foreach ($schema as $schema_item) { + list ($key_name, $key_types, $key_required) = $schema_item; + if (!isset($data[$key_name])) { + if ($key_required) + throw new \Exception("'{$key_name}' is missing"); + + continue; + } + + $passed = false; + for ($i = 0; $i < strlen($key_types); $i++) { + $type = $key_types[$i]; + + switch ($type) { + case 'i': + if (is_int($data[$key_name])) + $passed = true; + break; + + case 'f': + if (is_float($data[$key_name])) + $passed = true; + break; + + case 's': + if (is_string($data[$key_name])) + $passed = true; + break; + + case 'a': + if (is_array($data[$key_name])) + $passed = true; + break; + + default: + trigger_error(__METHOD__.': unexpected type '.$type); + break; + } + + if ($passed) + break; + } + + if (!$passed) + throw new \Exception("{$key_name}: required type is '{$key_types}'"); + } + } + + /** * @return bool */ public function close() { |