diff options
Diffstat (limited to 'src/voltronic')
-rw-r--r-- | src/voltronic/crc.h | 2 | ||||
-rw-r--r-- | src/voltronic/device.cc | 219 | ||||
-rw-r--r-- | src/voltronic/device.h | 95 | ||||
-rw-r--r-- | src/voltronic/exceptions.h | 22 | ||||
-rw-r--r-- | src/voltronic/pseudo_device.cc | 5 | ||||
-rw-r--r-- | src/voltronic/shared_buf.cc | 38 | ||||
-rw-r--r-- | src/voltronic/time.h | 2 |
7 files changed, 305 insertions, 78 deletions
diff --git a/src/voltronic/crc.h b/src/voltronic/crc.h index 0f34f38..87166a5 100644 --- a/src/voltronic/crc.h +++ b/src/voltronic/crc.h @@ -5,7 +5,7 @@ #include <cstdint> #include <cstdlib> -#include "../numeric_types.h" +#include "../types.h" namespace voltronic { diff --git a/src/voltronic/device.cc b/src/voltronic/device.cc index 54ea668..5566fbe 100644 --- a/src/voltronic/device.cc +++ b/src/voltronic/device.cc @@ -4,7 +4,7 @@ #include <iostream> #include <limits> #include <cstring> -#include <sstream> +#include <chrono> #include "crc.h" #include "device.h" @@ -13,11 +13,38 @@ #include "hexdump/hexdump.h" #include "../logging.h" +using namespace std::chrono_literals; + namespace voltronic { -Device::Device() : - flags_(FLAG_WRITE_CRC | FLAG_READ_CRC | FLAG_VERIFY_CRC), - timeout_(TIMEOUT) {} +Device::Device() + : flags_(FLAG_WRITE_CRC | FLAG_READ_CRC | FLAG_VERIFY_CRC) + , timeout_(TIMEOUT) + , timeStarted_(0) + , verbose_(false) + , workerType_(WorkerType::Normal) +{ + thread_ = std::thread(&Device::threadLoop, this); + +#ifdef __linux__ + // try to set the highest priority for this thread (requires root privs) + struct sched_param sp; + sp.sched_priority = sched_get_priority_max(SCHED_FIFO); + if (sp.sched_priority == -1) { + myerr << "sched_get_priority_max: " << std::string(strerror(errno)); + } else { + int res = pthread_setschedparam(thread_.native_handle(), SCHED_FIFO, &sp); + if (res) + myerr << "pthread_setschedparam: error " << std::to_string(res); + } +#endif + + thread_.detach(); +} + +Device::~Device() { + workerType_ = WorkerType::Dead; +} void Device::setFlags(int flags) { flags_ = flags; @@ -27,6 +54,10 @@ int Device::getFlags() const { return flags_; } +void Device::setWorkerType(WorkerType wt) { + workerType_ = wt; +} + void Device::setVerbose(bool verbose) { verbose_ = verbose; } @@ -50,74 +81,148 @@ u64 Device::getTimeLeft() const { return timeout_ - elapsed; } -size_t Device::run(const u8* inbuf, size_t inbufSize, u8* outbuf, size_t outbufSize) { - timeStarted_ = timestamp(); +size_t Device::enqueue(const u8* inbuf, size_t inbufSize, u8* outbuf, size_t outbufSize) { + if (verbose_) + mylog << "waiting to accept new task..."; - send(inbuf, inbufSize); + mutex_.lock(); + shio_.resetWith(inbuf, inbufSize, outbuf, outbufSize); + mutex_.unlock(); - if (!getTimeLeft()) { - // FIXME - // we should read incoming data from the device, - // or clean the buffer in some other way. - // otherwise we may get invalid response next time - throw TimeoutError("sending already took " + std::to_string(getElapsedTime()) + " ms"); + cv_.notify_all(); + + if (verbose_) + mylog << "notify the worker thread"; + + UniqueLock lock(mutex_); + cv_.wait(lock, [this]{ + return shio_.state == SharedIOBufferState::Done; + }); + + if (verbose_) + mylog << "worker thread done it's job"; + + switch (shio_.errorType) { + case ErrorType::DeviceError: throw DeviceError(shio_.errorMessage); + case ErrorType::TimeoutError: throw TimeoutError(shio_.errorMessage); + case ErrorType::InvalidDataError: throw InvalidDataError(shio_.errorMessage); + case ErrorType::OverflowError: throw OverflowError(shio_.errorMessage); + default: break; } - return recv(outbuf, outbufSize); + return shio_.dataSize; +} + +// ---------------------------------------- +// all code below runs in a separate thread +// ---------------------------------------- + +void Device::threadLoop() { + while (workerType_ != Dead) { + if (verbose_) + mylog << "waiting for new task..."; + + // wait for new task + UniqueLock lock(mutex_); + auto pred = [this]{ + return shio_.state == SharedIOBufferState::Ready; + }; + + if (workerType_ == OneShot) { + cv_.wait(lock, pred); + } else { + cv_.wait_for(lock, 1s, pred); + if (!pred()) + continue; + }; + + if (verbose_) + mylog << "got something"; + + shio_.state = SharedIOBufferState::InProgress; + + try { + shio_.setResult(run(shio_.inputBuffer, + shio_.inputBufferSize, + shio_.outputBuffer, + shio_.outputBufferSize)); + } + catch (DeviceError& e) { + shio_.setResult(ErrorType::DeviceError, e.what()); + } + catch (TimeoutError& e) { + shio_.setResult(ErrorType::TimeoutError, e.what()); + } + catch (InvalidDataError& e) { + shio_.setResult(ErrorType::InvalidDataError, e.what()); + } + catch (OverflowError& e) { + shio_.setResult(ErrorType::OverflowError, e.what()); + } + + if (verbose_) + mylog << "unlocking"; + + lock.unlock(); + cv_.notify_all(); + + if (workerType_ == OneShot) + break; + } } -void Device::send(const u8* buf, size_t bufSize) { +size_t Device::run(const u8* inbuf, size_t inbufSize, u8* outbuf, size_t outbufSize) { + timeStarted_ = timestamp(); + + // ------------------------------ + // add CRC and send to the device + // ------------------------------ + size_t dataLen; std::shared_ptr<u8> data; if ((flags_ & FLAG_WRITE_CRC) == FLAG_WRITE_CRC) { - const CRC crc = crc_calculate(buf, bufSize); - dataLen = bufSize + sizeof(u16) + 1; + const CRC crc = crc_calculate(inbuf, inbufSize); + dataLen = inbufSize + sizeof(u16) + 1; data = std::unique_ptr<u8>(new u8[dataLen]); - crc_write(crc, &data.get()[bufSize]); + crc_write(crc, &data.get()[inbufSize]); } else { - dataLen = bufSize + 1; + dataLen = inbufSize + 1; data = std::unique_ptr<u8>(new u8[dataLen]); } u8* dataPtr = data.get(); - memcpy((void*)dataPtr, buf, bufSize); - + memcpy((void*)dataPtr, inbuf, inbufSize); dataPtr[dataLen - 1] = '\r'; - if (verbose_) { myerr << "writing " << dataLen << (dataLen > 1 ? " bytes" : " byte"); std::cerr << hexdump(dataPtr, dataLen); } - writeLoop(dataPtr, dataLen); -} - -void Device::writeLoop(const u8* data, size_t dataSize) { - int bytesLeft = static_cast<int>(dataSize); + writeAll(dataPtr, dataLen); - while (true) { - size_t bytesWritten = write(data, bytesLeft); - if (verbose_) - myerr << "bytesWritten=" << bytesWritten; - - bytesLeft -= static_cast<int>(bytesWritten); - if (bytesLeft <= 0) - break; - if (!getTimeLeft()) - throw TimeoutError("data writing already took " + std::to_string(getElapsedTime()) + " ms"); + // ----------------- + // check for timeout + // ----------------- - data = &data[bytesWritten]; + if (!getTimeLeft()) { + // FIXME + // we should read incoming data from the device, + // or clean the buffer in some other way. + // otherwise we may get invalid response next time + throw TimeoutError("sending already took " + std::to_string(getElapsedTime()) + " ms"); } -} -size_t Device::recv(u8* buf, size_t bufSize) { - size_t bytesRead = readLoop(buf, bufSize); + // ------------------------------ + // read from device and check CRC + // ------------------------------ + + size_t bytesRead = readAll(outbuf, outbufSize); if (verbose_) { myerr << "got " << bytesRead << (bytesRead > 1 ? " bytes" : " byte"); - std::cerr << hexdump(buf, bytesRead); + std::cerr << hexdump(outbuf, bytesRead); } bool crcNeeded = (flags_ & FLAG_READ_CRC) == FLAG_READ_CRC; @@ -129,10 +234,8 @@ size_t Device::recv(u8* buf, size_t bufSize) { const size_t dataSize = bytesRead - minSize; if (crcNeeded) { - const CRC crcActual = crc_read(&buf[dataSize]); - const CRC crcExpected = crc_calculate(buf, dataSize); - -// buf[dataSize] = 0; + const CRC crcActual = crc_read(&outbuf[dataSize]); + const CRC crcExpected = crc_calculate(outbuf, dataSize); if ((flags_ & FLAG_VERIFY_CRC) == FLAG_VERIFY_CRC && crcActual == crcExpected) return dataSize; @@ -143,11 +246,29 @@ size_t Device::recv(u8* buf, size_t bufSize) { throw InvalidDataError(error.str()); } -// buf[dataSize] = 0; return dataSize; } -size_t Device::readLoop(u8 *buf, size_t bufSize) { +void Device::writeAll(const u8* data, size_t dataSize) { + int bytesLeft = static_cast<int>(dataSize); + + while (true) { + size_t bytesWritten = write(data, bytesLeft); + if (verbose_) + myerr << "bytesWritten=" << bytesWritten; + + bytesLeft -= static_cast<int>(bytesWritten); + if (bytesLeft <= 0) + break; + + if (!getTimeLeft()) + throw TimeoutError("data writing already took " + std::to_string(getElapsedTime()) + " ms"); + + data = &data[bytesWritten]; + } +} + +size_t Device::readAll(u8 *buf, size_t bufSize) { size_t size = 0; while(true) { @@ -170,7 +291,7 @@ size_t Device::readLoop(u8 *buf, size_t bufSize) { throw TimeoutError("data reading already took " + std::to_string(getElapsedTime()) + " ms"); if (bufSize <= 0) - throw std::overflow_error("input buffer is not large enough"); + throw OverflowError("input buffer is not large enough"); } } diff --git a/src/voltronic/device.h b/src/voltronic/device.h index 6584585..23be22e 100644 --- a/src/voltronic/device.h +++ b/src/voltronic/device.h @@ -5,10 +5,15 @@ #include <string> #include <memory> +#include <atomic> +#include <thread> +#include <mutex> +#include <condition_variable> #include <hidapi/hidapi.h> #include <libserialport.h> -#include "../numeric_types.h" +#include "exceptions.h" +#include "../types.h" namespace voltronic { @@ -18,10 +23,40 @@ enum { FLAG_VERIFY_CRC = 4, }; +enum class SharedIOBufferState { + Ready, + InProgress, + Done, +}; + +struct SharedIOBuffer { + // helper methods + void resetWith(const u8* inbuf, size_t inbufSize, u8* outbuf, size_t outbufSize); + void setResult(size_t dataSize); + void setResult(ErrorType type, std::string message); + + // input + const u8* inputBuffer; + u8* outputBuffer; + size_t inputBufferSize; + size_t outputBufferSize; + + // output + SharedIOBufferState state = SharedIOBufferState::Done; + size_t dataSize; + ErrorType errorType = ErrorType::None; + std::string errorMessage; +}; + +enum WorkerType { + OneShot, + Normal, + Dead, +}; -/** - * Common device - */ +// ------------- +// Common device +// ------------- class Device { protected: @@ -30,11 +65,15 @@ protected: u64 timeStarted_; bool verbose_; - void send(const u8* buf, size_t bufSize); - size_t recv(u8* buf, size_t bufSize); + SharedIOBuffer shio_; + std::thread thread_; + // std::mutex enqueueMutex_; + std::mutex mutex_; + std::condition_variable cv_; + WorkerType workerType_; - void writeLoop(const u8* data, size_t dataSize); - size_t readLoop(u8* buf, size_t bufSize); + void writeAll(const u8* data, size_t dataSize); + size_t readAll(u8* buf, size_t bufSize); u64 getElapsedTime() const; u64 getTimeLeft() const; @@ -43,23 +82,26 @@ public: static const u64 TIMEOUT = 1000; Device(); - - virtual size_t read(u8* buf, size_t bufSize) = 0; - virtual size_t write(const u8* data, size_t dataSize) = 0; - - void setTimeout(u64 timeout); - size_t run(const u8* inbuf, size_t inbufSize, u8* outbuf, size_t outbufSize); + ~Device(); void setFlags(int flags); int getFlags() const; - + void setWorkerType(WorkerType wt); void setVerbose(bool verbose); + void setTimeout(u64 timeout); + + size_t enqueue(const u8* inbuf, size_t inbufSize, u8* outbuf, size_t outbufSize); + size_t run(const u8* inbuf, size_t inbufSize, u8* outbuf, size_t outbufSize); + void threadLoop(); + + virtual size_t read(u8* buf, size_t bufSize) = 0; + virtual size_t write(const u8* data, size_t dataSize) = 0; }; -/** - * USB device - */ +// ---------- +// USB device +// ---------- class USBDevice : public Device { private: @@ -79,9 +121,9 @@ public: }; -/** - * Serial device - */ +// ------------- +// Serial device +// ------------- typedef unsigned SerialBaudRate; @@ -149,7 +191,10 @@ public: explicit SerialPortConfiguration(SerialDevice& device); ~SerialPortConfiguration(); - void setConfiguration(SerialBaudRate baudRate, SerialDataBits dataBits, SerialStopBits stopBits, SerialParity parity); + void setConfiguration(SerialBaudRate baudRate, + SerialDataBits dataBits, + SerialStopBits stopBits, + SerialParity parity); }; bool is_serial_baud_rate_valid(SerialBaudRate baudRate); @@ -158,9 +203,9 @@ bool is_serial_stop_bits_valid(SerialStopBits stopBits); bool is_serial_parity_valid(SerialParity parity); -/** - * Pseudo device - */ +// ------------- +// Pseudo device +// ------------- class PseudoDevice : public Device { public: diff --git a/src/voltronic/exceptions.h b/src/voltronic/exceptions.h index 6ae9c32..bae4426 100644 --- a/src/voltronic/exceptions.h +++ b/src/voltronic/exceptions.h @@ -7,6 +7,10 @@ namespace voltronic { +// ---------- +// exceptions +// ---------- + class DeviceError : public std::runtime_error { public: using std::runtime_error::runtime_error; @@ -22,6 +26,24 @@ public: using std::runtime_error::runtime_error; }; +class OverflowError : public std::overflow_error { +public: + using std::overflow_error::overflow_error; +}; + + +// --------------- +// exception types +// --------------- + +enum class ErrorType { + None = 0, + DeviceError, + TimeoutError, + InvalidDataError, + OverflowError, +}; + } #endif //INVERTER_TOOLS_VOLTRONIC_EXCEPTIONS_H diff --git a/src/voltronic/pseudo_device.cc b/src/voltronic/pseudo_device.cc index 58cd95c..2c0015e 100644 --- a/src/voltronic/pseudo_device.cc +++ b/src/voltronic/pseudo_device.cc @@ -8,6 +8,7 @@ #include "crc.h" #include "hexdump/hexdump.h" #include "../logging.h" +#include "exceptions.h" namespace voltronic { @@ -26,7 +27,7 @@ static const char* response = "^D1060000,000,2300,500,0115,0018,002,500,000,000, // set response //static const char* response = "^1"; -// TODO: maybe move size and crc stuff to readLoop()? +// TODO: maybe move size and crc stuff to readAll()? size_t PseudoDevice::read(u8* buf, size_t bufSize) { size_t pseudoResponseSize = strlen(response); @@ -37,7 +38,7 @@ size_t PseudoDevice::read(u8* buf, size_t bufSize) { if (responseSize + 1 > bufSize) { std::ostringstream error; error << "buffer is not large enough (" << (responseSize + 1) << " > " << bufSize << ")"; - throw std::overflow_error(error.str()); + throw OverflowError(error.str()); } memcpy(buf, response, responseSize); diff --git a/src/voltronic/shared_buf.cc b/src/voltronic/shared_buf.cc new file mode 100644 index 0000000..143a6f0 --- /dev/null +++ b/src/voltronic/shared_buf.cc @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: BSD-3-Clause + +#include "device.h" + +namespace voltronic { + +void SharedIOBuffer::resetWith(const u8* inbuf, + size_t inbufSize, + u8* outbuf, + size_t outbufSize) { + // set input + inputBuffer = inbuf; + inputBufferSize = inbufSize; + + outputBuffer = outbuf; + outputBufferSize = outbufSize; + + // clean output + errorType = ErrorType::None; + errorMessage.erase(); + dataSize = 0; + + // mark as ready + state = SharedIOBufferState::Ready; +} + +void SharedIOBuffer::setResult(ErrorType type, std::string message) { + errorType = type; + errorMessage = std::move(message); + state = SharedIOBufferState::Done; +} + +void SharedIOBuffer::setResult(size_t _dataSize) { + dataSize = _dataSize; + state = SharedIOBufferState::Done; +} + +}
\ No newline at end of file diff --git a/src/voltronic/time.h b/src/voltronic/time.h index d456461..e2ad5b8 100644 --- a/src/voltronic/time.h +++ b/src/voltronic/time.h @@ -3,7 +3,7 @@ #ifndef INVERTER_TOOLS_VOLTRONIC_TIME_H #define INVERTER_TOOLS_VOLTRONIC_TIME_H -#include "../numeric_types.h" +#include "../types.h" namespace voltronic { |