summaryrefslogtreecommitdiff
path: root/src/voltronic/device.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/voltronic/device.cc')
-rw-r--r--src/voltronic/device.cc219
1 files changed, 170 insertions, 49 deletions
diff --git a/src/voltronic/device.cc b/src/voltronic/device.cc
index 54ea668..5566fbe 100644
--- a/src/voltronic/device.cc
+++ b/src/voltronic/device.cc
@@ -4,7 +4,7 @@
#include <iostream>
#include <limits>
#include <cstring>
-#include <sstream>
+#include <chrono>
#include "crc.h"
#include "device.h"
@@ -13,11 +13,38 @@
#include "hexdump/hexdump.h"
#include "../logging.h"
+using namespace std::chrono_literals;
+
namespace voltronic {
-Device::Device() :
- flags_(FLAG_WRITE_CRC | FLAG_READ_CRC | FLAG_VERIFY_CRC),
- timeout_(TIMEOUT) {}
+Device::Device()
+ : flags_(FLAG_WRITE_CRC | FLAG_READ_CRC | FLAG_VERIFY_CRC)
+ , timeout_(TIMEOUT)
+ , timeStarted_(0)
+ , verbose_(false)
+ , workerType_(WorkerType::Normal)
+{
+ thread_ = std::thread(&Device::threadLoop, this);
+
+#ifdef __linux__
+ // try to set the highest priority for this thread (requires root privs)
+ struct sched_param sp;
+ sp.sched_priority = sched_get_priority_max(SCHED_FIFO);
+ if (sp.sched_priority == -1) {
+ myerr << "sched_get_priority_max: " << std::string(strerror(errno));
+ } else {
+ int res = pthread_setschedparam(thread_.native_handle(), SCHED_FIFO, &sp);
+ if (res)
+ myerr << "pthread_setschedparam: error " << std::to_string(res);
+ }
+#endif
+
+ thread_.detach();
+}
+
+Device::~Device() {
+ workerType_ = WorkerType::Dead;
+}
void Device::setFlags(int flags) {
flags_ = flags;
@@ -27,6 +54,10 @@ int Device::getFlags() const {
return flags_;
}
+void Device::setWorkerType(WorkerType wt) {
+ workerType_ = wt;
+}
+
void Device::setVerbose(bool verbose) {
verbose_ = verbose;
}
@@ -50,74 +81,148 @@ u64 Device::getTimeLeft() const {
return timeout_ - elapsed;
}
-size_t Device::run(const u8* inbuf, size_t inbufSize, u8* outbuf, size_t outbufSize) {
- timeStarted_ = timestamp();
+size_t Device::enqueue(const u8* inbuf, size_t inbufSize, u8* outbuf, size_t outbufSize) {
+ if (verbose_)
+ mylog << "waiting to accept new task...";
- send(inbuf, inbufSize);
+ mutex_.lock();
+ shio_.resetWith(inbuf, inbufSize, outbuf, outbufSize);
+ mutex_.unlock();
- if (!getTimeLeft()) {
- // FIXME
- // we should read incoming data from the device,
- // or clean the buffer in some other way.
- // otherwise we may get invalid response next time
- throw TimeoutError("sending already took " + std::to_string(getElapsedTime()) + " ms");
+ cv_.notify_all();
+
+ if (verbose_)
+ mylog << "notify the worker thread";
+
+ UniqueLock lock(mutex_);
+ cv_.wait(lock, [this]{
+ return shio_.state == SharedIOBufferState::Done;
+ });
+
+ if (verbose_)
+ mylog << "worker thread done it's job";
+
+ switch (shio_.errorType) {
+ case ErrorType::DeviceError: throw DeviceError(shio_.errorMessage);
+ case ErrorType::TimeoutError: throw TimeoutError(shio_.errorMessage);
+ case ErrorType::InvalidDataError: throw InvalidDataError(shio_.errorMessage);
+ case ErrorType::OverflowError: throw OverflowError(shio_.errorMessage);
+ default: break;
}
- return recv(outbuf, outbufSize);
+ return shio_.dataSize;
+}
+
+// ----------------------------------------
+// all code below runs in a separate thread
+// ----------------------------------------
+
+void Device::threadLoop() {
+ while (workerType_ != Dead) {
+ if (verbose_)
+ mylog << "waiting for new task...";
+
+ // wait for new task
+ UniqueLock lock(mutex_);
+ auto pred = [this]{
+ return shio_.state == SharedIOBufferState::Ready;
+ };
+
+ if (workerType_ == OneShot) {
+ cv_.wait(lock, pred);
+ } else {
+ cv_.wait_for(lock, 1s, pred);
+ if (!pred())
+ continue;
+ };
+
+ if (verbose_)
+ mylog << "got something";
+
+ shio_.state = SharedIOBufferState::InProgress;
+
+ try {
+ shio_.setResult(run(shio_.inputBuffer,
+ shio_.inputBufferSize,
+ shio_.outputBuffer,
+ shio_.outputBufferSize));
+ }
+ catch (DeviceError& e) {
+ shio_.setResult(ErrorType::DeviceError, e.what());
+ }
+ catch (TimeoutError& e) {
+ shio_.setResult(ErrorType::TimeoutError, e.what());
+ }
+ catch (InvalidDataError& e) {
+ shio_.setResult(ErrorType::InvalidDataError, e.what());
+ }
+ catch (OverflowError& e) {
+ shio_.setResult(ErrorType::OverflowError, e.what());
+ }
+
+ if (verbose_)
+ mylog << "unlocking";
+
+ lock.unlock();
+ cv_.notify_all();
+
+ if (workerType_ == OneShot)
+ break;
+ }
}
-void Device::send(const u8* buf, size_t bufSize) {
+size_t Device::run(const u8* inbuf, size_t inbufSize, u8* outbuf, size_t outbufSize) {
+ timeStarted_ = timestamp();
+
+ // ------------------------------
+ // add CRC and send to the device
+ // ------------------------------
+
size_t dataLen;
std::shared_ptr<u8> data;
if ((flags_ & FLAG_WRITE_CRC) == FLAG_WRITE_CRC) {
- const CRC crc = crc_calculate(buf, bufSize);
- dataLen = bufSize + sizeof(u16) + 1;
+ const CRC crc = crc_calculate(inbuf, inbufSize);
+ dataLen = inbufSize + sizeof(u16) + 1;
data = std::unique_ptr<u8>(new u8[dataLen]);
- crc_write(crc, &data.get()[bufSize]);
+ crc_write(crc, &data.get()[inbufSize]);
} else {
- dataLen = bufSize + 1;
+ dataLen = inbufSize + 1;
data = std::unique_ptr<u8>(new u8[dataLen]);
}
u8* dataPtr = data.get();
- memcpy((void*)dataPtr, buf, bufSize);
-
+ memcpy((void*)dataPtr, inbuf, inbufSize);
dataPtr[dataLen - 1] = '\r';
-
if (verbose_) {
myerr << "writing " << dataLen << (dataLen > 1 ? " bytes" : " byte");
std::cerr << hexdump(dataPtr, dataLen);
}
- writeLoop(dataPtr, dataLen);
-}
-
-void Device::writeLoop(const u8* data, size_t dataSize) {
- int bytesLeft = static_cast<int>(dataSize);
+ writeAll(dataPtr, dataLen);
- while (true) {
- size_t bytesWritten = write(data, bytesLeft);
- if (verbose_)
- myerr << "bytesWritten=" << bytesWritten;
-
- bytesLeft -= static_cast<int>(bytesWritten);
- if (bytesLeft <= 0)
- break;
- if (!getTimeLeft())
- throw TimeoutError("data writing already took " + std::to_string(getElapsedTime()) + " ms");
+ // -----------------
+ // check for timeout
+ // -----------------
- data = &data[bytesWritten];
+ if (!getTimeLeft()) {
+ // FIXME
+ // we should read incoming data from the device,
+ // or clean the buffer in some other way.
+ // otherwise we may get invalid response next time
+ throw TimeoutError("sending already took " + std::to_string(getElapsedTime()) + " ms");
}
-}
-size_t Device::recv(u8* buf, size_t bufSize) {
- size_t bytesRead = readLoop(buf, bufSize);
+ // ------------------------------
+ // read from device and check CRC
+ // ------------------------------
+
+ size_t bytesRead = readAll(outbuf, outbufSize);
if (verbose_) {
myerr << "got " << bytesRead << (bytesRead > 1 ? " bytes" : " byte");
- std::cerr << hexdump(buf, bytesRead);
+ std::cerr << hexdump(outbuf, bytesRead);
}
bool crcNeeded = (flags_ & FLAG_READ_CRC) == FLAG_READ_CRC;
@@ -129,10 +234,8 @@ size_t Device::recv(u8* buf, size_t bufSize) {
const size_t dataSize = bytesRead - minSize;
if (crcNeeded) {
- const CRC crcActual = crc_read(&buf[dataSize]);
- const CRC crcExpected = crc_calculate(buf, dataSize);
-
-// buf[dataSize] = 0;
+ const CRC crcActual = crc_read(&outbuf[dataSize]);
+ const CRC crcExpected = crc_calculate(outbuf, dataSize);
if ((flags_ & FLAG_VERIFY_CRC) == FLAG_VERIFY_CRC && crcActual == crcExpected)
return dataSize;
@@ -143,11 +246,29 @@ size_t Device::recv(u8* buf, size_t bufSize) {
throw InvalidDataError(error.str());
}
-// buf[dataSize] = 0;
return dataSize;
}
-size_t Device::readLoop(u8 *buf, size_t bufSize) {
+void Device::writeAll(const u8* data, size_t dataSize) {
+ int bytesLeft = static_cast<int>(dataSize);
+
+ while (true) {
+ size_t bytesWritten = write(data, bytesLeft);
+ if (verbose_)
+ myerr << "bytesWritten=" << bytesWritten;
+
+ bytesLeft -= static_cast<int>(bytesWritten);
+ if (bytesLeft <= 0)
+ break;
+
+ if (!getTimeLeft())
+ throw TimeoutError("data writing already took " + std::to_string(getElapsedTime()) + " ms");
+
+ data = &data[bytesWritten];
+ }
+}
+
+size_t Device::readAll(u8 *buf, size_t bufSize) {
size_t size = 0;
while(true) {
@@ -170,7 +291,7 @@ size_t Device::readLoop(u8 *buf, size_t bufSize) {
throw TimeoutError("data reading already took " + std::to_string(getElapsedTime()) + " ms");
if (bufSize <= 0)
- throw std::overflow_error("input buffer is not large enough");
+ throw OverflowError("input buffer is not large enough");
}
}