diff options
Diffstat (limited to 'src/voltronic/device.cc')
-rw-r--r-- | src/voltronic/device.cc | 219 |
1 files changed, 170 insertions, 49 deletions
diff --git a/src/voltronic/device.cc b/src/voltronic/device.cc index 54ea668..5566fbe 100644 --- a/src/voltronic/device.cc +++ b/src/voltronic/device.cc @@ -4,7 +4,7 @@ #include <iostream> #include <limits> #include <cstring> -#include <sstream> +#include <chrono> #include "crc.h" #include "device.h" @@ -13,11 +13,38 @@ #include "hexdump/hexdump.h" #include "../logging.h" +using namespace std::chrono_literals; + namespace voltronic { -Device::Device() : - flags_(FLAG_WRITE_CRC | FLAG_READ_CRC | FLAG_VERIFY_CRC), - timeout_(TIMEOUT) {} +Device::Device() + : flags_(FLAG_WRITE_CRC | FLAG_READ_CRC | FLAG_VERIFY_CRC) + , timeout_(TIMEOUT) + , timeStarted_(0) + , verbose_(false) + , workerType_(WorkerType::Normal) +{ + thread_ = std::thread(&Device::threadLoop, this); + +#ifdef __linux__ + // try to set the highest priority for this thread (requires root privs) + struct sched_param sp; + sp.sched_priority = sched_get_priority_max(SCHED_FIFO); + if (sp.sched_priority == -1) { + myerr << "sched_get_priority_max: " << std::string(strerror(errno)); + } else { + int res = pthread_setschedparam(thread_.native_handle(), SCHED_FIFO, &sp); + if (res) + myerr << "pthread_setschedparam: error " << std::to_string(res); + } +#endif + + thread_.detach(); +} + +Device::~Device() { + workerType_ = WorkerType::Dead; +} void Device::setFlags(int flags) { flags_ = flags; @@ -27,6 +54,10 @@ int Device::getFlags() const { return flags_; } +void Device::setWorkerType(WorkerType wt) { + workerType_ = wt; +} + void Device::setVerbose(bool verbose) { verbose_ = verbose; } @@ -50,74 +81,148 @@ u64 Device::getTimeLeft() const { return timeout_ - elapsed; } -size_t Device::run(const u8* inbuf, size_t inbufSize, u8* outbuf, size_t outbufSize) { - timeStarted_ = timestamp(); +size_t Device::enqueue(const u8* inbuf, size_t inbufSize, u8* outbuf, size_t outbufSize) { + if (verbose_) + mylog << "waiting to accept new task..."; - send(inbuf, inbufSize); + mutex_.lock(); + shio_.resetWith(inbuf, inbufSize, outbuf, outbufSize); + mutex_.unlock(); - if (!getTimeLeft()) { - // FIXME - // we should read incoming data from the device, - // or clean the buffer in some other way. - // otherwise we may get invalid response next time - throw TimeoutError("sending already took " + std::to_string(getElapsedTime()) + " ms"); + cv_.notify_all(); + + if (verbose_) + mylog << "notify the worker thread"; + + UniqueLock lock(mutex_); + cv_.wait(lock, [this]{ + return shio_.state == SharedIOBufferState::Done; + }); + + if (verbose_) + mylog << "worker thread done it's job"; + + switch (shio_.errorType) { + case ErrorType::DeviceError: throw DeviceError(shio_.errorMessage); + case ErrorType::TimeoutError: throw TimeoutError(shio_.errorMessage); + case ErrorType::InvalidDataError: throw InvalidDataError(shio_.errorMessage); + case ErrorType::OverflowError: throw OverflowError(shio_.errorMessage); + default: break; } - return recv(outbuf, outbufSize); + return shio_.dataSize; +} + +// ---------------------------------------- +// all code below runs in a separate thread +// ---------------------------------------- + +void Device::threadLoop() { + while (workerType_ != Dead) { + if (verbose_) + mylog << "waiting for new task..."; + + // wait for new task + UniqueLock lock(mutex_); + auto pred = [this]{ + return shio_.state == SharedIOBufferState::Ready; + }; + + if (workerType_ == OneShot) { + cv_.wait(lock, pred); + } else { + cv_.wait_for(lock, 1s, pred); + if (!pred()) + continue; + }; + + if (verbose_) + mylog << "got something"; + + shio_.state = SharedIOBufferState::InProgress; + + try { + shio_.setResult(run(shio_.inputBuffer, + shio_.inputBufferSize, + shio_.outputBuffer, + shio_.outputBufferSize)); + } + catch (DeviceError& e) { + shio_.setResult(ErrorType::DeviceError, e.what()); + } + catch (TimeoutError& e) { + shio_.setResult(ErrorType::TimeoutError, e.what()); + } + catch (InvalidDataError& e) { + shio_.setResult(ErrorType::InvalidDataError, e.what()); + } + catch (OverflowError& e) { + shio_.setResult(ErrorType::OverflowError, e.what()); + } + + if (verbose_) + mylog << "unlocking"; + + lock.unlock(); + cv_.notify_all(); + + if (workerType_ == OneShot) + break; + } } -void Device::send(const u8* buf, size_t bufSize) { +size_t Device::run(const u8* inbuf, size_t inbufSize, u8* outbuf, size_t outbufSize) { + timeStarted_ = timestamp(); + + // ------------------------------ + // add CRC and send to the device + // ------------------------------ + size_t dataLen; std::shared_ptr<u8> data; if ((flags_ & FLAG_WRITE_CRC) == FLAG_WRITE_CRC) { - const CRC crc = crc_calculate(buf, bufSize); - dataLen = bufSize + sizeof(u16) + 1; + const CRC crc = crc_calculate(inbuf, inbufSize); + dataLen = inbufSize + sizeof(u16) + 1; data = std::unique_ptr<u8>(new u8[dataLen]); - crc_write(crc, &data.get()[bufSize]); + crc_write(crc, &data.get()[inbufSize]); } else { - dataLen = bufSize + 1; + dataLen = inbufSize + 1; data = std::unique_ptr<u8>(new u8[dataLen]); } u8* dataPtr = data.get(); - memcpy((void*)dataPtr, buf, bufSize); - + memcpy((void*)dataPtr, inbuf, inbufSize); dataPtr[dataLen - 1] = '\r'; - if (verbose_) { myerr << "writing " << dataLen << (dataLen > 1 ? " bytes" : " byte"); std::cerr << hexdump(dataPtr, dataLen); } - writeLoop(dataPtr, dataLen); -} - -void Device::writeLoop(const u8* data, size_t dataSize) { - int bytesLeft = static_cast<int>(dataSize); + writeAll(dataPtr, dataLen); - while (true) { - size_t bytesWritten = write(data, bytesLeft); - if (verbose_) - myerr << "bytesWritten=" << bytesWritten; - - bytesLeft -= static_cast<int>(bytesWritten); - if (bytesLeft <= 0) - break; - if (!getTimeLeft()) - throw TimeoutError("data writing already took " + std::to_string(getElapsedTime()) + " ms"); + // ----------------- + // check for timeout + // ----------------- - data = &data[bytesWritten]; + if (!getTimeLeft()) { + // FIXME + // we should read incoming data from the device, + // or clean the buffer in some other way. + // otherwise we may get invalid response next time + throw TimeoutError("sending already took " + std::to_string(getElapsedTime()) + " ms"); } -} -size_t Device::recv(u8* buf, size_t bufSize) { - size_t bytesRead = readLoop(buf, bufSize); + // ------------------------------ + // read from device and check CRC + // ------------------------------ + + size_t bytesRead = readAll(outbuf, outbufSize); if (verbose_) { myerr << "got " << bytesRead << (bytesRead > 1 ? " bytes" : " byte"); - std::cerr << hexdump(buf, bytesRead); + std::cerr << hexdump(outbuf, bytesRead); } bool crcNeeded = (flags_ & FLAG_READ_CRC) == FLAG_READ_CRC; @@ -129,10 +234,8 @@ size_t Device::recv(u8* buf, size_t bufSize) { const size_t dataSize = bytesRead - minSize; if (crcNeeded) { - const CRC crcActual = crc_read(&buf[dataSize]); - const CRC crcExpected = crc_calculate(buf, dataSize); - -// buf[dataSize] = 0; + const CRC crcActual = crc_read(&outbuf[dataSize]); + const CRC crcExpected = crc_calculate(outbuf, dataSize); if ((flags_ & FLAG_VERIFY_CRC) == FLAG_VERIFY_CRC && crcActual == crcExpected) return dataSize; @@ -143,11 +246,29 @@ size_t Device::recv(u8* buf, size_t bufSize) { throw InvalidDataError(error.str()); } -// buf[dataSize] = 0; return dataSize; } -size_t Device::readLoop(u8 *buf, size_t bufSize) { +void Device::writeAll(const u8* data, size_t dataSize) { + int bytesLeft = static_cast<int>(dataSize); + + while (true) { + size_t bytesWritten = write(data, bytesLeft); + if (verbose_) + myerr << "bytesWritten=" << bytesWritten; + + bytesLeft -= static_cast<int>(bytesWritten); + if (bytesLeft <= 0) + break; + + if (!getTimeLeft()) + throw TimeoutError("data writing already took " + std::to_string(getElapsedTime()) + " ms"); + + data = &data[bytesWritten]; + } +} + +size_t Device::readAll(u8 *buf, size_t bufSize) { size_t size = 0; while(true) { @@ -170,7 +291,7 @@ size_t Device::readLoop(u8 *buf, size_t bufSize) { throw TimeoutError("data reading already took " + std::to_string(getElapsedTime()) + " ms"); if (bufSize <= 0) - throw std::overflow_error("input buffer is not large enough"); + throw OverflowError("input buffer is not large enough"); } } |