recovered py files from woody

This commit is contained in:
Jonas Forsberg 2023-01-04 11:31:36 +01:00
parent af7127d05c
commit fc8e788341
5 changed files with 516 additions and 0 deletions

5
src/voucher/__init__.py Normal file
View File

@ -0,0 +1,5 @@
import paho.mqtt.client as mqttc
__version__ = "0.1.2"
BUTTON_PIN_NUMBER = 3
mqtt_client = mqttc.Client()

241
src/voucher/cli.py Normal file
View File

@ -0,0 +1,241 @@
import os
import sys
import time
import signal
import logging
import RPi.GPIO as GPIO
from argparse import ArgumentParser
from distutils.util import strtobool
from voucher import __version__ as version
from voucher import mqtt, mqtt_client, unifi, BUTTON_PIN_NUMBER
def signal_handler(sig, frame):
"""Catch interuption signal and gracefully shut down"""
global mqtt_client
if mqtt_client:
logging.info("Shuttin down...")
mqtt_client.disconnect()
mqtt_client.loop_stop()
logging.info("... program exit")
sys.exit(0)
def create_parser(args=sys.argv[1:]):
"""Create a argparse.ArgumentParser
Creates a ArgumentParser based on command line
Parameters
----------
args: list, optional
a list that represents how it was called from the command line
if not specified the list is build wid sys.argv[1:]
Returns
-------
argparse.ArgumentParser
"""
parser = ArgumentParser()
parser.add_argument(
"--version", "-v", action="version", version=f"%(prog)s {version}"
)
parser.add_argument(
"--log-level",
"-l",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
type=str.upper,
help="Log level for application",
default=os.getenv("LOG_LEVEL", "WARNING").upper(),
)
parser.add_argument(
"--mqtt-host",
help="mqtt host to connect to. Defaults to localhost.",
default=os.getenv("MQTT_HOST", "localhost"),
)
parser.add_argument(
"--mqtt-client-id",
help="id to use for this client. Defaults to wifi-voucher_ appended with the process id",
default=os.getenv("MQTT_CLIENT_ID", f"wifi-voucher_{os.getpid()}"),
)
parser.add_argument(
"--capath",
help="path to a directory containing trusted CA certificates to enable encrypted communication, defaults to /etc/ssl/certsc/ca-certificates.crt",
default=os.getenv("CAPATH", "/etc/ssl/certs/ca-certificates.crt"),
)
parser.add_argument(
"--mqtt-tls-version",
choices=["tlsv1.1", "tlsv1.2", "tlsv1.3"],
help="TLS protocol version, can be one of tlsv1.3 tlsv1.2 or tlsv1.1. Defaults to tlsv1.2 if available.",
default=os.getenv("MQTT_TLS_VERSION", "tlsv1.2"),
)
parser.add_argument(
"--mqtt-insecure",
help="do not check that the server certificate hostname matches the remote hostname.",
action="store_true",
)
parser.add_argument(
"--mqtt-user",
help="provide a username for mqtt broker connection",
default=os.getenv("MQTT_USER", None),
)
parser.add_argument(
"--mqtt-password",
help="provide a password for mqtt broker connection",
default=os.getenv("MQTT_PASSWORD", None),
)
parser.add_argument(
"--mqtt-topic",
help="mqtt topic to subscribe to. defautls to wifi-voucher",
default=os.getenv("MQTT_TOPIC", "wifi-voucher"),
)
parser.add_argument(
"--mqtt-protocol-version",
help="specify the version of the MQTT protocol to use when connecting. Defaults to mqttv311.",
choices=["mqttv31", "mqttv311", "mqttv5"],
default=os.getenv("MQTT_PROTOCOL_VERSION", "mqttv311"),
)
parser.add_argument(
"--mqtt-tls",
action="store_true",
help="Use TLS when connecting to mqtt broker",
default=bool(strtobool(os.getenv("MQTT_TLS", "False"))),
)
parser.add_argument(
"--mqtt-port",
type=int,
help="network port to connect to. Defaults to 1883 for plain MQTT and 8883 for MQTT over TLS",
default=os.getenv("MQTT_PORT", None),
)
parser.add_argument(
"--unifi-host",
help="host where Unifi Controller is located. Default is localhost",
default=os.getenv("UNIFI_HOST", "localhost"),
)
parser.add_argument(
"--unifi-port",
help="port that Unifi controller is listing on. Default 8443",
default=os.getenv("UNIFI_PORT", "8443"),
type=int,
)
parser.add_argument(
"--unifi-insecure",
help="don't validate tls certificates",
action="store_true",
default=bool(strtobool(os.getenv("UNIFI_INSECURE", "False"))),
)
parser.add_argument(
"--unifi-user",
help="User to login to Unifi controller",
default=os.getenv("UNIFI_USER", None),
)
parser.add_argument(
"--unifi-password",
help="Password for logging in to unifi controller",
default=os.getenv("UNIFI_PASSWORD", None),
)
parser.add_argument(
"--unifi-voucher-expire-unit",
choices=["minute", "hour", "day", "week", "year"],
help="Set the unit that --unifi-voucher-expire-number is using, default day",
default=os.getenv("UNIFI_VOUCHER_EXPIRE_UNIT", "day"),
)
parser.add_argument(
"--unifi-voucher-expire-number",
type=int,
help="Set the number on how long the voucher is valid, default 1",
default=os.getenv("UNIFI_VOUCHER_EXPIRE_NUMBER", "1"),
)
parser.add_argument(
"--unifi-voucher-number",
type=int,
help="Number of vouchers to create. Default 1",
default=os.getenv("UNIFI_VOUCHER_NUMBER", 1),
)
parser.add_argument(
"--unifi-voucher-quota",
type=int,
help="Number of times the voucher can be used. Default 1",
default=os.getenv("UNIFI_VOUCHER_QUOTA", 1),
)
parser.add_argument(
"--unifi-voucher-limit-up",
type=int,
help="Limit the bandwidth for uploads, in Kbps. Default unlimited",
default=os.getenv("UNIFI_VOUCHER_LIMIT_UP", None),
)
parser.add_argument(
"--unifi-voucher-limit-down",
type=int,
help="Limit the bandwidth for downloads, in Kbps. Default unlimited",
default=os.getenv("UNIFI_VOUCHER_LIMIT_DOWN", None),
)
parser.add_argument(
"--unifi-voucher-limit-byte",
type=int,
help="Limit the total MB transfer for the voucher, in MB. Default unlimited",
default=os.getenv("UNIFI_VOUCHER_LIMIT_BYTE", None),
)
parser.add_argument(
"--unifi-site",
help="Site in the Unifi controller to create vouchers for, default is 'default'",
default=os.getenv("UNIFI_SITE", "default"),
)
parser.add_argument(
"--printer", help="CUPS printer name", default=os.getenv("PRINTER", None)
)
return parser
def main():
"""The main program logic"""
global mqtt_client
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
parser = create_parser()
args = parser.parse_args()
# set up the logging
logging.basicConfig(
level=args.log_level, format="%(asctime)s - %(levelname)s - %(message)s"
)
logging.info(f"starting version {version}")
logging.debug(args)
if bool(args.mqtt_user) != bool(args.mqtt_password):
logging.error(
"Both MQTT Username and Password needs to be set, or none of them"
)
sys.exit(1)
if args.unifi_user == None or args.unifi_password == None:
logging.error("Both Unifi user and Unifi password needs to be set")
sys.exit(1)
if args.printer == None:
logging.error("You need to specify a printer")
sys.exit(2)
mqtt_client = mqtt.create_client(args)
if mqtt_client:
mqtt_client.loop_start()
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(BUTTON_PIN_NUMBER, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.add_event_detect(BUTTON_PIN_NUMBER, GPIO.FALLING, bouncetime=300)
while True:
if GPIO.event_detected(BUTTON_PIN_NUMBER):
logging.debug("Button was pressed")
for voucher in unifi.create_voucher(args):
unifi.print_voucher(args, voucher)
time.sleep(0.3)

34
src/voucher/gotify.py Normal file
View File

@ -0,0 +1,34 @@
import requests
from logging import StreamHandler
class Gotify:
def __init__(self, url, token):
self.url = url
self.token = token
def send(self, title, message, priority=5):
files = {
"title": (None, title),
"message": (None, message),
"priority": (None, priority),
}
response = requests.post(f"{self.url}/message?token={self.token}", files=files)
class GotifyWebhookHandler(StreamHandler):
def __init__(self, url, token):
StreamHandler.__init__(self)
self.gotify = Gotify(url, token)
def emit(self, record):
msg = self.format(record)
self.gotify.send("wifi-voucher", msg)
# try:
#
# response = requests.post(f"{self.url}/message?token={self.token}", files=files)
# except requests.exceptions.RequestException as err:
# print(f"Gotify logging: {err}")
#

99
src/voucher/mqtt.py Normal file
View File

@ -0,0 +1,99 @@
import ssl
import sys
import json
import logging
from voucher import unifi
from argparse import Namespace
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
if rc == 0:
logging.info(f"Connected to MQTT with result code {rc}")
client.subscribe(client.args.mqtt_topic)
else:
logging.warning("Connection to MQTT failed, result code {rc}")
client.reconnect()
def on_disconnect(client, userdata, rc):
if rc != 0:
logging.warning(
f"Unexpected disconnected from MQTT, result code {rc}. Reconnecting..."
)
client.reconnect()
else:
logging.info("Disconnected from MQTT successfully")
def on_message(client, userdata, message):
logging.debug(
f"MQTT Message received ({message.topic}): {message.payload.decode('UTF-8')}"
)
args = vars(client.args)
try:
recived = json.loads(message.payload.decode("UTF-8"))
except json.decoder.JSONDecodeError as err:
logging.warning(f"Couldn't parse json response {err}")
return
try:
args.update(recived)
except ValueError as err:
logging.error(f"The message could be parsed as json, but was not a dict")
return
vouchers = unifi.create_voucher(Namespace(**args))
for voucher in vouchers:
unifi.print_voucher(Namespace(**args), voucher)
def create_client(args):
if args.mqtt_tls_version == "tlsv1.1":
tls_version = ssl.PROTOCOL_TLSv1_1
elif args.mqtt_tls_version == "tlsv1.2":
tls_version = ssl.PROTOCOL_TLSv1_2
elif args.mqtt_tls_version == "tlsv1.3":
tls_version = ssl.PROTOCOL_TLSv1_3
if args.mqtt_port == None:
if args.mqtt_tls:
mqtt_port = 8883
else:
mqtt_port = 1883
client = mqtt.Client(args.mqtt_client_id)
client.args = args
client.enable_logger()
if args.mqtt_user:
client.username_pw_set(args.mqtt_user, password=args.mqtt_password)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
if args.mqtt_tls:
if args.mqtt_insecure:
client.tls_set(
ca_certs=args.capath,
certfile=None,
keyfile=None,
cert_reqs=ssl.CERT_NONE,
tls_version=tls_version,
ciphers=None,
)
else:
client.tls_set(
ca_certs=args.capath,
certfile=None,
keyfile=None,
cert_reqs=ssl.CERT_REQUIRED,
tls_version=tls_version,
ciphers=None,
)
try:
client.connect(args.mqtt_host, port=int(mqtt_port))
except Exception as err:
logging.error(
f"Couln't connecto to broker on {args.mqtt_host}:{mqtt_port}: {err}"
)
return None
return client

137
src/voucher/unifi.py Normal file
View File

@ -0,0 +1,137 @@
import json
import subprocess
import requests
import logging
from voucher import __version__ as version
headers = {
"Accept": "application/json, text/plain, */*",
"User-Agent": f"wifi-voucher/{version} https://git.rre.nu/jonas/wifi-voucher",
}
def print_voucher(args, voucher):
logging.debug(f"Printing voucher: {voucher[0:5]}-{voucher[5:]}")
validstr = (
f"Valid for {args.unifi_voucher_expire_number} {args.unifi_voucher_expire_unit}"
)
if args.unifi_voucher_expire_number != 1:
validstr = validstr + "s"
lpr = subprocess.Popen(
["/usr/bin/lpr", "-P", args.printer],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
lpr.stdin.write(
f"GoHome Guest Wifi \n\n{voucher[0:5]}-{voucher[5:]}\n\n{validstr}\n".encode()
)
output, error = lpr.communicate()
if error:
logging.error(error.decode("utf-8").strip())
def create_voucher(args):
"""
Creates a wifi-voucher
"""
logging.debug("Creating voucher")
session = requests.Session()
session.verify = not args.unifi_insecure
# Login
payload = {
"username": args.unifi_user,
"password": args.unifi_password,
"for_hotspot": True,
"remember": False,
"site_name": args.unifi_site,
}
try:
response = session.post(
f"https://{args.unifi_host}:{args.unifi_port}/api/login",
data=json.dumps(payload),
headers=headers,
)
except requests.exceptions.RequestException as err:
logging.error(err)
return []
if response.status_code != 200:
logging.error(
f"Got response code '{response.status_code}' from https://{args.unifi_host}:{args.unifi_port}"
)
return []
try:
if response.json()["meta"]["rc"] != "ok":
logging.error(f"Error in response from unifi API: {response.json()}")
return []
except KeyError:
logging.error("Got invalid json response from Unifi controller during login")
return []
# create voucher
payload = {}
payload["cmd"] = "create-voucher"
payload["note"] = f"wifi-voucher: {version}"
payload["quota"] = args.unifi_voucher_quota
payload["expire_unit"] = 1440
if args.unifi_voucher_expire_unit in ["minute", "hours"]:
payload["expire_number"] = 1
if args.unifi_voucher_expire_unit == "minute":
payload["expire"] = args.unifi_voucher_expire_number
else:
payload["expire"] = args.unifi_voucher_expire_number * 60
else:
payload["expire"] = "custom"
if args.unifi_voucher_expire_unit == "day":
payload["expire_number"] = args.unifi_voucher_expire_number
elif args.unifi_voucher_expire_unit == "week":
payload["expire_number"] = args.unifi_voucher_expire_number * 7
elif args.unifi_voucher_expire_unit == "year":
payload["expire_number"] = args.unifi_voucher_expire_number * 364
payload["n"] = args.unifi_voucher_number
if args.unifi_voucher_limit_byte:
payload["byte"] = args.unifi_voucher_limit_byte
if args.unifi_voucher_limit_up:
payload["up"] = args.unifi_voucher_limit_up
if args.unifi_voucher_limit_down:
payload["down"] = args.unifi_voucher_limit_down
try:
response = session.post(
f"https://{args.unifi_host}:{args.unifi_port}/api/s/{args.unifi_site}/cmd/hotspot",
data=json.dumps(payload),
headers=headers,
)
except requests.exceptions.RequestException as err:
logging.error(err)
return []
if response.status_code != 200:
logging.error(
f"Got response code '{response.status_code}' from https://{args.unifi_host}:{args.unifi_port}"
)
return []
try:
if response.json()["meta"]["rc"] != "ok":
logging.error(f"Error in response from unifi API: {response.json()}")
return []
except KeyError:
logging.error(
"Got invalid json response from Unifi controller during voucher creation"
)
return []
creation_time_list = response.json()["data"]
# Get created voucher.
response = session.get(
f"https://{args.unifi_host}:{args.unifi_port}/api/s/{args.unifi_site}/stat/voucher",
headers=headers,
)
voucher_codes = []
for voucher in response.json()["data"]:
for item in creation_time_list:
if voucher["create_time"] == item["create_time"]:
voucher_codes.append(voucher["code"])
return voucher_codes