aboutsummaryrefslogtreecommitdiff
path: root/src/server
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-05-07 02:18:07 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-05-07 02:18:07 +0300
commit7e743b73433475df086fcec81be7b10c1d695a42 (patch)
tree1737c5f9bdad2a40f740e9a655e510641331b9e2 /src/server
initial
Diffstat (limited to 'src/server')
-rw-r--r--src/server/connection.cc258
-rw-r--r--src/server/connection.h70
-rw-r--r--src/server/server.cc143
-rw-r--r--src/server/server.h79
-rw-r--r--src/server/signal.cc20
-rw-r--r--src/server/signal.h16
6 files changed, 586 insertions, 0 deletions
diff --git a/src/server/connection.cc b/src/server/connection.cc
new file mode 100644
index 0000000..c662d6d
--- /dev/null
+++ b/src/server/connection.cc
@@ -0,0 +1,258 @@
+// SPDX-License-Identifier: BSD-3-Clause
+
+#include <stdexcept>
+#include <unistd.h>
+#include <ios>
+#include <arpa/inet.h>
+#include <cerrno>
+
+#include "connection.h"
+#include "../p18/commands.h"
+#include "../p18/response.h"
+#include "../logging.h"
+#include "../common.h"
+#include "hexdump/hexdump.h"
+#include "signal.h"
+
+#define CHECK_ARGUMENTS_LENGTH(__size__) \
+ if (arguments.size() != (__size__)) { \
+ std::ostringstream error; \
+ error << "invalid arguments count: expected " << (__size__) << ", got " << arguments.size(); \
+ throw std::invalid_argument(error.str()); \
+ }
+
+#define CHECK_ARGUMENTS_MIN_LENGTH(__size__) \
+ if (arguments.size() < (__size__)) { \
+ std::ostringstream error; \
+ error << "invalid arguments count: expected " << (__size__) << ", got " << arguments.size(); \
+ throw std::invalid_argument(error.str()); \
+ }
+
+
+namespace server {
+
+Connection::Connection(int sock, struct sockaddr_in addr, Server* server)
+ : sock_(sock), addr_(addr), server_(server)
+{
+ if (server_->verbose())
+ mylog << "new connection from " << ipv4();
+
+ thread_ = std::thread(&Connection::run, this);
+ thread_.detach();
+}
+
+Connection::~Connection() {
+ if (server_->verbose())
+ mylog << "closing socket..";
+
+ if (close(sock_) == -1)
+ myerr << ipv4() << ": close: " << strerror(errno);
+
+ server_->removeConnection(this);
+}
+
+void Connection::run() {
+ static int bufSize = 2048;
+ char buf[bufSize];
+
+ while (true) {
+ long rcvd = readLoop(buf, bufSize - 1);
+ if (rcvd == -1) {
+ if (errno != EINTR && server_->verbose())
+ myerr << ipv4() << ": recv: " << std::string(strerror(errno));
+ break;
+ }
+ if (rcvd == 0)
+ break;
+
+ buf[rcvd] = '\0';
+ if (*buf == '\4')
+ break;
+
+ Response resp = processRequest(buf);
+ if (!sendResponse(resp))
+ break;
+ }
+
+ delete this;
+}
+
+int Connection::readLoop(char* buf, size_t bufSize) const {
+ char* bufptr = buf;
+ int left = static_cast<int>(bufSize);
+ int readed = 0;
+
+ while (left > 0) {
+ size_t rcvd = recv(sock_, bufptr, left, 0);
+ if (rcvd == -1)
+ return -1;
+ if (rcvd == 0)
+ break;
+
+ readed += static_cast<int>(rcvd);
+ if (*bufptr == '\4')
+ break;
+
+ left -= static_cast<int>(rcvd);
+ bufptr += rcvd;
+
+ bufptr[rcvd] = '\0';
+ char* ptr = strstr(buf, "\r\n");
+ if (ptr)
+ break;
+ }
+
+ return readed;
+}
+
+bool Connection::writeLoop(const char* buf, size_t bufSize) const {
+ const char* bufptr = buf;
+ int left = static_cast<int>(bufSize);
+
+ while (left > 0) {
+ size_t bytesSent = send(sock_, bufptr, left, 0);
+ if (bytesSent == -1) {
+ if (errno != EINTR && server_->verbose())
+ myerr << ipv4() << ": send: " << std::string(strerror(errno));
+ return false;
+ }
+
+ left -= static_cast<int>(bytesSent);
+ bufptr += bytesSent;
+ }
+
+ return true;
+}
+
+bool Connection::sendResponse(Response& resp) const {
+ std::ostringstream sbuf;
+ sbuf << resp;
+
+ std::string s = sbuf.str();
+ const char* buf = s.c_str();
+ size_t bufSize = s.size();
+
+ return writeLoop(buf, bufSize);
+}
+
+std::string Connection::ipv4() const {
+ char ip[INET_ADDRSTRLEN] = {0};
+ const char* result = inet_ntop(AF_INET, (const void*)&addr_.sin_addr, ip, sizeof(ip));
+ if (result == nullptr)
+ return "?";
+
+ std::ostringstream buf;
+ buf << ip << ":" << htons(addr_.sin_port);
+ return buf.str();
+}
+
+Response Connection::processRequest(char* buf) {
+ std::stringstream sbuf;
+ int n = 0;
+ std::vector<std::string> arguments;
+ RequestType type;
+
+ Response resp;
+ resp.type = ResponseType::OK;
+
+ try {
+ char* last = nullptr;
+ const char* delim = " ";
+ for (char* token = strtok_r(buf, delim, &last);
+ token != nullptr;
+ token = strtok_r(nullptr, delim, &last)) {
+
+ char* ptr = strstr(token, "\r\n");
+ if (ptr)
+ *ptr = '\0';
+
+ if (!n++) {
+ std::string s = std::string(token);
+
+ if (s == "format")
+ type = RequestType::Format;
+
+ else if (s == "v")
+ type = RequestType::Version;
+
+ else if (s == "exec")
+ type = RequestType::Execute;
+
+ else if (s == "raw")
+ type = RequestType::Raw;
+
+ else
+ throw std::invalid_argument("invalid token: " + s);
+
+ } else if (strlen(token) > 0)
+ arguments.emplace_back(token);
+ }
+
+ switch (type) {
+ case RequestType::Version: {
+ CHECK_ARGUMENTS_LENGTH(1)
+ auto v = static_cast<unsigned>(std::stoul(arguments[0]));
+ if (v != 1)
+ throw std::invalid_argument("invalid protocol version");
+ options_.version = v;
+ break;
+ }
+
+ case RequestType::Format:
+ CHECK_ARGUMENTS_LENGTH(1)
+ options_.format = format_from_string(arguments[0]);
+ break;
+
+ case RequestType::Execute: {
+ CHECK_ARGUMENTS_MIN_LENGTH(1)
+
+ std::string& command = arguments[0];
+ auto commandArguments = std::vector<std::string>();
+
+ auto argumentsSlice = std::vector<std::string>(arguments.begin()+1, arguments.end());
+
+ p18::CommandInput input{&argumentsSlice};
+ p18::CommandType commandType = p18::validate_input(command, commandArguments, (void*)&input);
+
+ auto response = server_->executeCommand(commandType, commandArguments);
+ resp.buf << *(response->format(options_.format).get());
+
+ break;
+ }
+
+ case RequestType::Raw: {
+ throw std::runtime_error("not implemented");
+// CHECK_ARGUMENTS_LENGTH(1)
+// std::string& raw = arguments[0];
+//
+// resp.type = ResponseType::Error;
+// resp.buf << "not implemented";
+ break;
+ }
+ }
+ }
+ // we except std::invalid_argument and std::runtime_error
+ catch (std::exception& e) {
+ resp.type = ResponseType::Error;
+
+ auto err = p18::response_type::ErrorResponse(e.what());
+ resp.buf << *(err.format(options_.format));
+ }
+
+ return resp;
+}
+
+std::ostream& operator<<(std::ostream& os, Response& resp) {
+ os << (resp.type == ResponseType::OK ? "ok" : "err");
+
+ resp.buf.seekp(0, std::ios::end);
+ size_t size = resp.buf.tellp();
+ if (size) {
+ resp.buf.seekp(0);
+ os << "\r\n" << resp.buf.str();
+ }
+
+ return os << "\r\n\r\n";
+}
+
+} \ No newline at end of file
diff --git a/src/server/connection.h b/src/server/connection.h
new file mode 100644
index 0000000..eb28d51
--- /dev/null
+++ b/src/server/connection.h
@@ -0,0 +1,70 @@
+// SPDX-License-Identifier: BSD-3-Clause
+
+#ifndef INVERTER_TOOLS_CONNECTION_H
+#define INVERTER_TOOLS_CONNECTION_H
+
+#include <thread>
+#include <netinet/in.h>
+#include <sstream>
+
+#include "server.h"
+#include "../formatter/formatter.h"
+
+namespace server {
+
+class Server;
+struct Response;
+
+struct ConnectionOptions {
+ ConnectionOptions()
+ : version(1), format(formatter::Format::JSON)
+ {}
+
+ unsigned version;
+ formatter::Format format;
+};
+
+
+class Connection {
+private:
+ int sock_;
+ std::thread thread_;
+ struct sockaddr_in addr_;
+ Server* server_;
+ ConnectionOptions options_;
+
+public:
+ explicit Connection(int sock, struct sockaddr_in addr, Server* server);
+ ~Connection();
+ void run();
+ std::string ipv4() const;
+ bool sendResponse(Response& resp) const;
+ int readLoop(char* buf, size_t bufSize) const;
+ bool writeLoop(const char* buf, size_t bufSize) const;
+ Response processRequest(char* buf);
+};
+
+
+enum class RequestType {
+ Version,
+ Format,
+ Execute,
+ Raw,
+};
+
+
+enum class ResponseType {
+ OK,
+ Error
+};
+
+
+struct Response {
+ ResponseType type;
+ std::ostringstream buf;
+};
+std::ostream& operator<<(std::ostream& os, Response& resp);
+
+}
+
+#endif //INVERTER_TOOLS_CONNECTION_H
diff --git a/src/server/server.cc b/src/server/server.cc
new file mode 100644
index 0000000..981104d
--- /dev/null
+++ b/src/server/server.cc
@@ -0,0 +1,143 @@
+// SPDX-License-Identifier: BSD-3-Clause
+
+#include <cstring>
+#include <string>
+#include <cerrno>
+#include <algorithm>
+#include <memory>
+#include <utility>
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include "../voltronic/exceptions.h"
+#include "../p18/exceptions.h"
+#include "../voltronic/time.h"
+#include "../logging.h"
+//#include "hexdump/hexdump.h"
+#include "server.h"
+#include "connection.h"
+#include "signal.h"
+
+namespace server {
+
+Server::Server(std::shared_ptr<voltronic::Device> device)
+ : sock_(0)
+ , port_(0)
+ , cacheTimeout_(CACHE_TIMEOUT)
+ , verbose_(false)
+ , device_(std::move(device)) {
+ client_.setDevice(device_);
+}
+
+void Server::setVerbose(bool verbose) {
+ verbose_ = verbose;
+ device_->setVerbose(verbose);
+}
+
+void Server::setCacheTimeout(u64 timeout) {
+ cacheTimeout_ = timeout;
+}
+
+Server::~Server() {
+ if (sock_ > 0)
+ close(sock_);
+}
+
+void Server::start(std::string& host, int port) {
+ host_ = host;
+ port_ = port;
+
+ sock_ = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock_ == -1)
+ throw ServerError("failed to create socket");
+
+ struct linger sl = {0};
+ sl.l_onoff = 1;
+ sl.l_linger = 0;
+ if (setsockopt(sock_, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)) == -1)
+ throw ServerError("setsockopt(linger): " + std::string(strerror(errno)));
+
+ int flag = 1;
+ if (setsockopt(sock_, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)) == -1)
+ throw ServerError("setsockopt(reuseaddr): " + std::string(strerror(errno)));
+
+ struct sockaddr_in serv_addr = {0};
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = inet_addr(host_.c_str());
+ serv_addr.sin_port = htons(port_);
+ memset(serv_addr.sin_zero, 0, sizeof(serv_addr.sin_zero));
+
+ if (bind(sock_, (struct sockaddr*)&serv_addr, sizeof(serv_addr)))
+ throw ServerError("bind: " + std::string(strerror(errno)));
+
+ if (listen(sock_, 50))
+ throw ServerError("start: " + std::string(strerror(errno)));
+
+ while (!shutdownCaught) {
+ struct sockaddr_in addr = {0};
+ socklen_t addr_size = sizeof(addr);
+
+ if (verbose_)
+ mylog << "waiting for client..";
+
+ int sock = accept(sock_, (struct sockaddr*)&addr, &addr_size);
+ if (sock == -1)
+ continue;
+
+ auto conn = new Connection(sock, addr, this);
+ addConnection(conn);
+ }
+}
+
+void Server::addConnection(Connection *conn) {
+ if (verbose_)
+ myerr << "adding " << conn->ipv4();
+ LockGuard lock(threads_mutex_);
+ connections_.emplace_back(conn);
+}
+
+void Server::removeConnection(Connection *conn) {
+ if (verbose_)
+ myerr << "removing " << conn->ipv4();
+ LockGuard lock(threads_mutex_);
+ connections_.erase(std::remove(connections_.begin(), connections_.end(), conn), connections_.end());
+}
+
+size_t Server::getConnectionsCount() const {
+ return connections_.size();
+}
+
+std::shared_ptr<p18::response_type::BaseResponse> Server::executeCommand(p18::CommandType commandType, std::vector<std::string>& arguments) {
+ LockGuard lock(client_mutex_);
+
+ auto it = cache_.find(commandType);
+ if (it != cache_.end()) {
+ auto cr = it->second;
+ if (voltronic::timestamp() - cr.time <= cacheTimeout_) {
+ return cr.response;
+ }
+ }
+
+ try {
+ auto response = client_.execute(commandType, arguments);
+ CachedResponse cr{voltronic::timestamp(), response};
+ cache_[commandType] = cr;
+ return response;
+ }
+ catch (voltronic::DeviceError& e) {
+ throw std::runtime_error("device error: " + std::string(e.what()));
+ }
+ catch (voltronic::TimeoutError& e) {
+ throw std::runtime_error("timeout: " + std::string(e.what()));
+ }
+ catch (voltronic::InvalidDataError& e) {
+ throw std::runtime_error("data is invalid: " + std::string(e.what()));
+ }
+ catch (p18::InvalidResponseError& e) {
+ throw std::runtime_error("response is invalid: " + std::string(e.what()));
+ }
+}
+
+
+} \ No newline at end of file
diff --git a/src/server/server.h b/src/server/server.h
new file mode 100644
index 0000000..76b2a1b
--- /dev/null
+++ b/src/server/server.h
@@ -0,0 +1,79 @@
+// SPDX-License-Identifier: BSD-3-Clause
+
+#ifndef INVERTER_TOOLS_SERVER_TCP_SERVER_H
+#define INVERTER_TOOLS_SERVER_TCP_SERVER_H
+
+#include <memory>
+#include <string>
+#include <vector>
+#include <thread>
+#include <mutex>
+#include <csignal>
+#include <atomic>
+#include <netinet/in.h>
+
+#include "connection.h"
+#include "../numeric_types.h"
+#include "../formatter/formatter.h"
+#include "../p18/client.h"
+#include "../p18/types.h"
+#include "../voltronic/device.h"
+#include "../voltronic/time.h"
+
+namespace server {
+
+typedef std::lock_guard<std::mutex> LockGuard;
+
+class Connection;
+
+struct CachedResponse {
+ u64 time;
+ std::shared_ptr<p18::response_type::BaseResponse> response;
+};
+
+class Server {
+private:
+ int sock_;
+ std::string host_;
+ int port_;
+ bool verbose_;
+ p18::Client client_;
+ std::shared_ptr<voltronic::Device> device_;
+
+ u64 cacheTimeout_;
+ std::map<p18::CommandType, CachedResponse> cache_;
+
+ std::mutex threads_mutex_;
+ std::mutex client_mutex_;
+
+ std::vector<Connection*> connections_;
+
+public:
+ static const u64 CACHE_TIMEOUT = 1000;
+
+ volatile std::atomic<bool> sigCaught = 0;
+
+ explicit Server(std::shared_ptr<voltronic::Device> device);
+ ~Server();
+
+ void setVerbose(bool verbose);
+ void setCacheTimeout(u64 timeout);
+ void start(std::string& host, int port);
+
+ bool verbose() const { return verbose_; }
+ void addConnection(Connection* conn);
+ void removeConnection(Connection* conn);
+ size_t getConnectionsCount() const;
+
+ std::shared_ptr<p18::response_type::BaseResponse> executeCommand(p18::CommandType commandType, std::vector<std::string>& arguments);
+};
+
+
+class ServerError : public std::runtime_error {
+public:
+ using std::runtime_error::runtime_error;
+};
+
+}
+
+#endif //INVERTER_TOOLS_SERVER_TCP_SERVER_H
diff --git a/src/server/signal.cc b/src/server/signal.cc
new file mode 100644
index 0000000..ea7ae3e
--- /dev/null
+++ b/src/server/signal.cc
@@ -0,0 +1,20 @@
+// SPDX-License-Identifier: BSD-3-Clause
+
+#include "signal.h"
+
+namespace server {
+
+volatile sig_atomic_t shutdownCaught = 0;
+
+static void sighandler(int) {
+ shutdownCaught = 1;
+}
+
+void set_signal_handlers() {
+ struct sigaction sa = {0};
+ sa.sa_handler = sighandler;
+ sigaction(SIGTERM, &sa, nullptr);
+ sigaction(SIGINT, &sa, nullptr);
+}
+
+} \ No newline at end of file
diff --git a/src/server/signal.h b/src/server/signal.h
new file mode 100644
index 0000000..0a21715
--- /dev/null
+++ b/src/server/signal.h
@@ -0,0 +1,16 @@
+// SPDX-License-Identifier: BSD-3-Clause
+
+#ifndef INVERTER_TOOLS_SIGNAL_H
+#define INVERTER_TOOLS_SIGNAL_H
+
+#include <csignal>
+
+namespace server {
+
+extern volatile sig_atomic_t shutdownCaught;
+
+void set_signal_handlers();
+
+}
+
+#endif //INVERTER_TOOLS_SIGNAL_H