diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2021-05-07 02:18:07 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2021-05-07 02:18:07 +0300 |
commit | 7e743b73433475df086fcec81be7b10c1d695a42 (patch) | |
tree | 1737c5f9bdad2a40f740e9a655e510641331b9e2 /src/server |
initial
Diffstat (limited to 'src/server')
-rw-r--r-- | src/server/connection.cc | 258 | ||||
-rw-r--r-- | src/server/connection.h | 70 | ||||
-rw-r--r-- | src/server/server.cc | 143 | ||||
-rw-r--r-- | src/server/server.h | 79 | ||||
-rw-r--r-- | src/server/signal.cc | 20 | ||||
-rw-r--r-- | src/server/signal.h | 16 |
6 files changed, 586 insertions, 0 deletions
diff --git a/src/server/connection.cc b/src/server/connection.cc new file mode 100644 index 0000000..c662d6d --- /dev/null +++ b/src/server/connection.cc @@ -0,0 +1,258 @@ +// SPDX-License-Identifier: BSD-3-Clause + +#include <stdexcept> +#include <unistd.h> +#include <ios> +#include <arpa/inet.h> +#include <cerrno> + +#include "connection.h" +#include "../p18/commands.h" +#include "../p18/response.h" +#include "../logging.h" +#include "../common.h" +#include "hexdump/hexdump.h" +#include "signal.h" + +#define CHECK_ARGUMENTS_LENGTH(__size__) \ + if (arguments.size() != (__size__)) { \ + std::ostringstream error; \ + error << "invalid arguments count: expected " << (__size__) << ", got " << arguments.size(); \ + throw std::invalid_argument(error.str()); \ + } + +#define CHECK_ARGUMENTS_MIN_LENGTH(__size__) \ + if (arguments.size() < (__size__)) { \ + std::ostringstream error; \ + error << "invalid arguments count: expected " << (__size__) << ", got " << arguments.size(); \ + throw std::invalid_argument(error.str()); \ + } + + +namespace server { + +Connection::Connection(int sock, struct sockaddr_in addr, Server* server) + : sock_(sock), addr_(addr), server_(server) +{ + if (server_->verbose()) + mylog << "new connection from " << ipv4(); + + thread_ = std::thread(&Connection::run, this); + thread_.detach(); +} + +Connection::~Connection() { + if (server_->verbose()) + mylog << "closing socket.."; + + if (close(sock_) == -1) + myerr << ipv4() << ": close: " << strerror(errno); + + server_->removeConnection(this); +} + +void Connection::run() { + static int bufSize = 2048; + char buf[bufSize]; + + while (true) { + long rcvd = readLoop(buf, bufSize - 1); + if (rcvd == -1) { + if (errno != EINTR && server_->verbose()) + myerr << ipv4() << ": recv: " << std::string(strerror(errno)); + break; + } + if (rcvd == 0) + break; + + buf[rcvd] = '\0'; + if (*buf == '\4') + break; + + Response resp = processRequest(buf); + if (!sendResponse(resp)) + break; + } + + delete this; +} + +int Connection::readLoop(char* buf, size_t bufSize) const { + char* bufptr = buf; + int left = static_cast<int>(bufSize); + int readed = 0; + + while (left > 0) { + size_t rcvd = recv(sock_, bufptr, left, 0); + if (rcvd == -1) + return -1; + if (rcvd == 0) + break; + + readed += static_cast<int>(rcvd); + if (*bufptr == '\4') + break; + + left -= static_cast<int>(rcvd); + bufptr += rcvd; + + bufptr[rcvd] = '\0'; + char* ptr = strstr(buf, "\r\n"); + if (ptr) + break; + } + + return readed; +} + +bool Connection::writeLoop(const char* buf, size_t bufSize) const { + const char* bufptr = buf; + int left = static_cast<int>(bufSize); + + while (left > 0) { + size_t bytesSent = send(sock_, bufptr, left, 0); + if (bytesSent == -1) { + if (errno != EINTR && server_->verbose()) + myerr << ipv4() << ": send: " << std::string(strerror(errno)); + return false; + } + + left -= static_cast<int>(bytesSent); + bufptr += bytesSent; + } + + return true; +} + +bool Connection::sendResponse(Response& resp) const { + std::ostringstream sbuf; + sbuf << resp; + + std::string s = sbuf.str(); + const char* buf = s.c_str(); + size_t bufSize = s.size(); + + return writeLoop(buf, bufSize); +} + +std::string Connection::ipv4() const { + char ip[INET_ADDRSTRLEN] = {0}; + const char* result = inet_ntop(AF_INET, (const void*)&addr_.sin_addr, ip, sizeof(ip)); + if (result == nullptr) + return "?"; + + std::ostringstream buf; + buf << ip << ":" << htons(addr_.sin_port); + return buf.str(); +} + +Response Connection::processRequest(char* buf) { + std::stringstream sbuf; + int n = 0; + std::vector<std::string> arguments; + RequestType type; + + Response resp; + resp.type = ResponseType::OK; + + try { + char* last = nullptr; + const char* delim = " "; + for (char* token = strtok_r(buf, delim, &last); + token != nullptr; + token = strtok_r(nullptr, delim, &last)) { + + char* ptr = strstr(token, "\r\n"); + if (ptr) + *ptr = '\0'; + + if (!n++) { + std::string s = std::string(token); + + if (s == "format") + type = RequestType::Format; + + else if (s == "v") + type = RequestType::Version; + + else if (s == "exec") + type = RequestType::Execute; + + else if (s == "raw") + type = RequestType::Raw; + + else + throw std::invalid_argument("invalid token: " + s); + + } else if (strlen(token) > 0) + arguments.emplace_back(token); + } + + switch (type) { + case RequestType::Version: { + CHECK_ARGUMENTS_LENGTH(1) + auto v = static_cast<unsigned>(std::stoul(arguments[0])); + if (v != 1) + throw std::invalid_argument("invalid protocol version"); + options_.version = v; + break; + } + + case RequestType::Format: + CHECK_ARGUMENTS_LENGTH(1) + options_.format = format_from_string(arguments[0]); + break; + + case RequestType::Execute: { + CHECK_ARGUMENTS_MIN_LENGTH(1) + + std::string& command = arguments[0]; + auto commandArguments = std::vector<std::string>(); + + auto argumentsSlice = std::vector<std::string>(arguments.begin()+1, arguments.end()); + + p18::CommandInput input{&argumentsSlice}; + p18::CommandType commandType = p18::validate_input(command, commandArguments, (void*)&input); + + auto response = server_->executeCommand(commandType, commandArguments); + resp.buf << *(response->format(options_.format).get()); + + break; + } + + case RequestType::Raw: { + throw std::runtime_error("not implemented"); +// CHECK_ARGUMENTS_LENGTH(1) +// std::string& raw = arguments[0]; +// +// resp.type = ResponseType::Error; +// resp.buf << "not implemented"; + break; + } + } + } + // we except std::invalid_argument and std::runtime_error + catch (std::exception& e) { + resp.type = ResponseType::Error; + + auto err = p18::response_type::ErrorResponse(e.what()); + resp.buf << *(err.format(options_.format)); + } + + return resp; +} + +std::ostream& operator<<(std::ostream& os, Response& resp) { + os << (resp.type == ResponseType::OK ? "ok" : "err"); + + resp.buf.seekp(0, std::ios::end); + size_t size = resp.buf.tellp(); + if (size) { + resp.buf.seekp(0); + os << "\r\n" << resp.buf.str(); + } + + return os << "\r\n\r\n"; +} + +}
\ No newline at end of file diff --git a/src/server/connection.h b/src/server/connection.h new file mode 100644 index 0000000..eb28d51 --- /dev/null +++ b/src/server/connection.h @@ -0,0 +1,70 @@ +// SPDX-License-Identifier: BSD-3-Clause + +#ifndef INVERTER_TOOLS_CONNECTION_H +#define INVERTER_TOOLS_CONNECTION_H + +#include <thread> +#include <netinet/in.h> +#include <sstream> + +#include "server.h" +#include "../formatter/formatter.h" + +namespace server { + +class Server; +struct Response; + +struct ConnectionOptions { + ConnectionOptions() + : version(1), format(formatter::Format::JSON) + {} + + unsigned version; + formatter::Format format; +}; + + +class Connection { +private: + int sock_; + std::thread thread_; + struct sockaddr_in addr_; + Server* server_; + ConnectionOptions options_; + +public: + explicit Connection(int sock, struct sockaddr_in addr, Server* server); + ~Connection(); + void run(); + std::string ipv4() const; + bool sendResponse(Response& resp) const; + int readLoop(char* buf, size_t bufSize) const; + bool writeLoop(const char* buf, size_t bufSize) const; + Response processRequest(char* buf); +}; + + +enum class RequestType { + Version, + Format, + Execute, + Raw, +}; + + +enum class ResponseType { + OK, + Error +}; + + +struct Response { + ResponseType type; + std::ostringstream buf; +}; +std::ostream& operator<<(std::ostream& os, Response& resp); + +} + +#endif //INVERTER_TOOLS_CONNECTION_H diff --git a/src/server/server.cc b/src/server/server.cc new file mode 100644 index 0000000..981104d --- /dev/null +++ b/src/server/server.cc @@ -0,0 +1,143 @@ +// SPDX-License-Identifier: BSD-3-Clause + +#include <cstring> +#include <string> +#include <cerrno> +#include <algorithm> +#include <memory> +#include <utility> +#include <arpa/inet.h> +#include <sys/socket.h> +#include <unistd.h> + +#include "../voltronic/exceptions.h" +#include "../p18/exceptions.h" +#include "../voltronic/time.h" +#include "../logging.h" +//#include "hexdump/hexdump.h" +#include "server.h" +#include "connection.h" +#include "signal.h" + +namespace server { + +Server::Server(std::shared_ptr<voltronic::Device> device) + : sock_(0) + , port_(0) + , cacheTimeout_(CACHE_TIMEOUT) + , verbose_(false) + , device_(std::move(device)) { + client_.setDevice(device_); +} + +void Server::setVerbose(bool verbose) { + verbose_ = verbose; + device_->setVerbose(verbose); +} + +void Server::setCacheTimeout(u64 timeout) { + cacheTimeout_ = timeout; +} + +Server::~Server() { + if (sock_ > 0) + close(sock_); +} + +void Server::start(std::string& host, int port) { + host_ = host; + port_ = port; + + sock_ = socket(AF_INET, SOCK_STREAM, 0); + if (sock_ == -1) + throw ServerError("failed to create socket"); + + struct linger sl = {0}; + sl.l_onoff = 1; + sl.l_linger = 0; + if (setsockopt(sock_, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)) == -1) + throw ServerError("setsockopt(linger): " + std::string(strerror(errno))); + + int flag = 1; + if (setsockopt(sock_, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)) == -1) + throw ServerError("setsockopt(reuseaddr): " + std::string(strerror(errno))); + + struct sockaddr_in serv_addr = {0}; + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = inet_addr(host_.c_str()); + serv_addr.sin_port = htons(port_); + memset(serv_addr.sin_zero, 0, sizeof(serv_addr.sin_zero)); + + if (bind(sock_, (struct sockaddr*)&serv_addr, sizeof(serv_addr))) + throw ServerError("bind: " + std::string(strerror(errno))); + + if (listen(sock_, 50)) + throw ServerError("start: " + std::string(strerror(errno))); + + while (!shutdownCaught) { + struct sockaddr_in addr = {0}; + socklen_t addr_size = sizeof(addr); + + if (verbose_) + mylog << "waiting for client.."; + + int sock = accept(sock_, (struct sockaddr*)&addr, &addr_size); + if (sock == -1) + continue; + + auto conn = new Connection(sock, addr, this); + addConnection(conn); + } +} + +void Server::addConnection(Connection *conn) { + if (verbose_) + myerr << "adding " << conn->ipv4(); + LockGuard lock(threads_mutex_); + connections_.emplace_back(conn); +} + +void Server::removeConnection(Connection *conn) { + if (verbose_) + myerr << "removing " << conn->ipv4(); + LockGuard lock(threads_mutex_); + connections_.erase(std::remove(connections_.begin(), connections_.end(), conn), connections_.end()); +} + +size_t Server::getConnectionsCount() const { + return connections_.size(); +} + +std::shared_ptr<p18::response_type::BaseResponse> Server::executeCommand(p18::CommandType commandType, std::vector<std::string>& arguments) { + LockGuard lock(client_mutex_); + + auto it = cache_.find(commandType); + if (it != cache_.end()) { + auto cr = it->second; + if (voltronic::timestamp() - cr.time <= cacheTimeout_) { + return cr.response; + } + } + + try { + auto response = client_.execute(commandType, arguments); + CachedResponse cr{voltronic::timestamp(), response}; + cache_[commandType] = cr; + return response; + } + catch (voltronic::DeviceError& e) { + throw std::runtime_error("device error: " + std::string(e.what())); + } + catch (voltronic::TimeoutError& e) { + throw std::runtime_error("timeout: " + std::string(e.what())); + } + catch (voltronic::InvalidDataError& e) { + throw std::runtime_error("data is invalid: " + std::string(e.what())); + } + catch (p18::InvalidResponseError& e) { + throw std::runtime_error("response is invalid: " + std::string(e.what())); + } +} + + +}
\ No newline at end of file diff --git a/src/server/server.h b/src/server/server.h new file mode 100644 index 0000000..76b2a1b --- /dev/null +++ b/src/server/server.h @@ -0,0 +1,79 @@ +// SPDX-License-Identifier: BSD-3-Clause + +#ifndef INVERTER_TOOLS_SERVER_TCP_SERVER_H +#define INVERTER_TOOLS_SERVER_TCP_SERVER_H + +#include <memory> +#include <string> +#include <vector> +#include <thread> +#include <mutex> +#include <csignal> +#include <atomic> +#include <netinet/in.h> + +#include "connection.h" +#include "../numeric_types.h" +#include "../formatter/formatter.h" +#include "../p18/client.h" +#include "../p18/types.h" +#include "../voltronic/device.h" +#include "../voltronic/time.h" + +namespace server { + +typedef std::lock_guard<std::mutex> LockGuard; + +class Connection; + +struct CachedResponse { + u64 time; + std::shared_ptr<p18::response_type::BaseResponse> response; +}; + +class Server { +private: + int sock_; + std::string host_; + int port_; + bool verbose_; + p18::Client client_; + std::shared_ptr<voltronic::Device> device_; + + u64 cacheTimeout_; + std::map<p18::CommandType, CachedResponse> cache_; + + std::mutex threads_mutex_; + std::mutex client_mutex_; + + std::vector<Connection*> connections_; + +public: + static const u64 CACHE_TIMEOUT = 1000; + + volatile std::atomic<bool> sigCaught = 0; + + explicit Server(std::shared_ptr<voltronic::Device> device); + ~Server(); + + void setVerbose(bool verbose); + void setCacheTimeout(u64 timeout); + void start(std::string& host, int port); + + bool verbose() const { return verbose_; } + void addConnection(Connection* conn); + void removeConnection(Connection* conn); + size_t getConnectionsCount() const; + + std::shared_ptr<p18::response_type::BaseResponse> executeCommand(p18::CommandType commandType, std::vector<std::string>& arguments); +}; + + +class ServerError : public std::runtime_error { +public: + using std::runtime_error::runtime_error; +}; + +} + +#endif //INVERTER_TOOLS_SERVER_TCP_SERVER_H diff --git a/src/server/signal.cc b/src/server/signal.cc new file mode 100644 index 0000000..ea7ae3e --- /dev/null +++ b/src/server/signal.cc @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: BSD-3-Clause + +#include "signal.h" + +namespace server { + +volatile sig_atomic_t shutdownCaught = 0; + +static void sighandler(int) { + shutdownCaught = 1; +} + +void set_signal_handlers() { + struct sigaction sa = {0}; + sa.sa_handler = sighandler; + sigaction(SIGTERM, &sa, nullptr); + sigaction(SIGINT, &sa, nullptr); +} + +}
\ No newline at end of file diff --git a/src/server/signal.h b/src/server/signal.h new file mode 100644 index 0000000..0a21715 --- /dev/null +++ b/src/server/signal.h @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: BSD-3-Clause + +#ifndef INVERTER_TOOLS_SIGNAL_H +#define INVERTER_TOOLS_SIGNAL_H + +#include <csignal> + +namespace server { + +extern volatile sig_atomic_t shutdownCaught; + +void set_signal_handlers(); + +} + +#endif //INVERTER_TOOLS_SIGNAL_H |