#!/usr/bin/env python3 import logging import solarmon_api from typing import Optional from argparse import ArgumentParser import socket from telegram import ( Update, ParseMode, ReplyKeyboardMarkup ) from telegram.ext import ( Updater, Filters, CommandHandler, MessageHandler, CallbackContext ) from telegram.error import TimedOut solarmon: Optional[solarmon_api.Client] = None class RelayClient: def __init__(self, port=8307, host='127.0.0.1'): self._host = host self._port = port self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def __del__(self): self.sock.close() def connect(self): self.sock.connect((self._host, self._port)) def _write(self, line): self.sock.sendall((line+'\r\n').encode()) def _read(self): buf = bytearray() while True: buf.extend(self.sock.recv(256)) if b'\r\n' in buf: break response = buf.decode().strip() return response def on(self): self._write('on') return self._read() def off(self): self._write('off') return self._read() def status(self): self._write('get') return self._read() # # helpers # def get_markup() -> ReplyKeyboardMarkup: button = [ [ 'Включить', 'Выключить' ], [ 'Статус' ] ] return ReplyKeyboardMarkup(button, one_time_keyboard=False) def reply(update: Update, text: str) -> None: update.message.reply_text(text, reply_markup=get_markup(), parse_mode=ParseMode.HTML) def handle_exc(update: Update, e) -> None: logging.exception(str(e)) if not isinstance(e, TimedOut): reply(update, 'exception: ' + str(e)) # # command/message handlers # def start(update: Update, context: CallbackContext) -> None: reply(update, 'Выберите команду на клавиатуре.') def msg_status(update: Update, context: CallbackContext) -> None: try: relay = RelayClient() relay.connect() response = relay.status() status = 'Включен ✅' if response == 'on' else 'Выключен ❌' reply(update, status) solarmon.log_bot_request(solarmon_api.BotType.PUMP, update.message.chat_id, 'Статус') except Exception as e: handle_exc(update, e) def msg_on(update: Update, context: CallbackContext) -> None: try: relay = RelayClient() relay.connect() relay.on() reply(update, 'Готово') solarmon.log_bot_request(solarmon_api.BotType.PUMP, update.message.chat_id, 'Включить') except Exception as e: handle_exc(update, e) def msg_off(update: Update, context: CallbackContext) -> None: try: relay = RelayClient() relay.connect() relay.off() reply(update, 'Готово') solarmon.log_bot_request(solarmon_api.BotType.PUMP, update.message.chat_id, 'Выключить') except Exception as e: handle_exc(update, e) def msg_all(update: Update, context: CallbackContext) -> None: reply(update, "Неизвестная команда.") if __name__ == '__main__': # command-line arguments parser = ArgumentParser() parser.add_argument('--token', required=True, type=str, help='Telegram bot token') parser.add_argument('--users-whitelist', nargs='+', help='ID of users allowed to use the bot') parser.add_argument('--gpio-relayd-host', default='127.0.0.1', type=str) parser.add_argument('--gpio-relayd-port', default=8307, type=int) parser.add_argument('--solarmon-api-token', type=str, required=True) args = parser.parse_args() whitelist = list(map(lambda x: int(x), args.users_whitelist)) # configure logging logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) # configure bot updater = Updater(args.token, request_kwargs={'read_timeout': 6, 'connect_timeout': 7}) dispatcher = updater.dispatcher user_filter = Filters.user(whitelist) dispatcher.add_handler(CommandHandler('start', start)) dispatcher.add_handler(MessageHandler(Filters.text('Включить') & user_filter, msg_on)) dispatcher.add_handler(MessageHandler(Filters.text('Выключить') & user_filter, msg_off)) dispatcher.add_handler(MessageHandler(Filters.text('Статус') & user_filter, msg_status)) dispatcher.add_handler(MessageHandler(Filters.all & user_filter, msg_all)) # create api client instance solarmon = solarmon_api.Client(args.solarmon_api_token, timeout=3) solarmon.enable_async() # start the bot updater.start_polling() # run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT updater.idle()