#!/usr/bin/env python3 import argparse import asyncio import logging import os import sys from pyA20.gpio import gpio from pyA20.gpio import port logger = logging.getLogger(__name__) bus = None lock = asyncio.Lock() PIN = port.PA10 OFF = 1 ON = 0 async def relay_set(value): async with lock: gpio.output(PIN, value) async def relay_get(): async with lock: return int(gpio.input(PIN)) == ON async def handle_client(reader, writer): request = None while request != 'quit': try: request = await reader.read(255) if request == b'\x04': break request = request.decode('utf-8').strip() except Exception: break data = 'unknown' if request == 'on': await relay_set(ON) logger.info('set on') data = 'ok' elif request == 'off': await relay_set(OFF) logger.info('set off') data = 'ok' elif request == 'get': status = await relay_get() data = 'on' if status is True else 'off' writer.write((data + '\r\n').encode('utf-8')) try: await writer.drain() except ConnectionError: break try: writer.close() except ConnectionError: pass async def run_server(host, port): server = await asyncio.start_server(handle_client, host, port) async with server: logger.info('Server started.') await server.serve_forever() if __name__ == '__main__': if not os.getegid() == 0: sys.exit('Must be run as root.') parser = argparse.ArgumentParser() parser.add_argument('--host', type=str, default='127.0.0.1') parser.add_argument('--port', type=int, default=8307) args = parser.parse_args() logging.basicConfig(level=logging.INFO) gpio.init() gpio.setcfg(PIN, gpio.OUTPUT) try: asyncio.run(run_server(args.host, args.port)) except KeyboardInterrupt: logging.info('Exiting...')