Mercurial > mamba
view mamba/forest.py @ 601:915de6c7d342 default tip
Add support for making the editor fullscreen too.
author | Simon Cross <hodgestar@gmail.com> |
---|---|
date | Sat, 14 Jan 2023 19:34:26 +0100 |
parents | f7c11fc2a3e7 |
children |
line wrap: on
line source
"""Where mamba's hang out with each other.""" from werkzeug.utils import secure_filename from flask import Flask, request, abort from datetime import datetime from configparser import ConfigParser from xmlrpc.client import ServerProxy import time import os import sys import socket import json from io import BytesIO import zipfile app = Flask(__name__) class LevelSet(object): def __init__(self, ctype): self.ctype = ctype self.folder = self._path(ctype) @classmethod def _path(cls, ctype): main = app.config.forest.main if ctype == "curated": return os.path.join(main.level_folder, 'curated') elif ctype == "uncurated": return os.path.join(main.level_folder, 'uncurated') abort(404, "Not found") def _level_path(self, levelname): filename = "%s.txt" % secure_filename(levelname) return os.path.join(self.folder, filename) def list_levels(self): endl = len(".txt") files = [x[:-endl] for x in os.listdir(self.folder) if not x.startswith('.') and x.endswith('.txt')] return files def read_level(self, levelname): level_path = self._level_path(levelname) if not os.path.isfile(level_path): abort(404, "Level not found. Hsss.") with open(level_path) as level: return level.read() def write_level(self, levelname, leveldata): level_path = self._level_path(levelname) if os.path.exists(level_path): abort(409, "Mamba already resident.") with open(level_path, 'w') as level: level.write(leveldata) def zip_levels(self): levels_raw = BytesIO() levels_zip = zipfile.ZipFile( levels_raw, "w", compression=zipfile.ZIP_DEFLATED) for levelname in self.list_levels(): levels_zip.writestr( "%s.txt" % levelname, self.read_level(levelname)) levels_zip.close() return levels_raw.getvalue() @app.route("/<ctype>/index") def index(ctype): ls = LevelSet(ctype) levels = ls.list_levels() return "\n".join(levels) @app.route("/<ctype>/level/<levelname>") def level(ctype, levelname): ls = LevelSet(ctype) return ls.read_level(levelname) @app.route("/<ctype>/levels.zip") def levels_zip(ctype): ls = LevelSet(ctype) return ls.zip_levels() @app.route("/save/<levelname>", methods=['GET', 'POST']) def save(levelname): ts = datetime.now().strftime("%Y%m%d.%H%M%S") levelname = "%s.%s" % (levelname, ts) ls = LevelSet("uncurated") if request.method == 'POST': leveldata = request.form['data'].encode('ascii') ls.write_level(levelname, leveldata) #inform_cia(levelname, "New level uploaded.", branch="uncurated") inform_irker(levelname, "New level uploaded.", branch="uncurated") return "Ssss." else: abort(405, "Post levels here. Hsss.") MAMBA_VERSION = "0.1" MAMBA_URL = "https://hg.ctpug.org.za/mamba" CIA_URL = "http://cia.navi.cx" CIA_MSG_TEMPLATE = """ <message xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="schema.xsd"> <generator> <name>Mutable Mamba Level Server</name> <version>%(mamba_version)s</version> <url>%(mamba_url)s</url> </generator> <source> <project>%(project)s</project> <module>%(module)s</module> <branch>%(branch)s</branch> </source> <timestamp> %(timestamp)d </timestamp> <body> <commit> <revision>%(revision)s</revision> <author>%(author)s</author> <files> <file>%(file)s</file> </files> <log> %(log)s </log> </commit> </body> </message> """ def inform_cia(filename, log, branch='uncurated'): if app.config.forest.cia is None: return cia = app.config.forest.cia msg = CIA_MSG_TEMPLATE % { 'mamba_version': MAMBA_VERSION, 'mamba_url': MAMBA_URL, 'project': cia.project, 'module': 'level-server', 'branch': branch, 'timestamp': int(time.time()), 'revision': '0', 'author': 'unknown', 'file': filename, 'log': log, } srv = ServerProxy(cia.url) srv.hub.deliver(msg) def format_irker_message(msg_template, project, filename, log, branch='uncurated', colours=None): msg_params = { "project": project, "filename": filename, "log": log, "branch": branch, } if colours: msg_params.update(colours) return msg_template % msg_params def irker_colours(colour_style): if colour_style == "mIRC": return { 'bold': '\x02', 'green': '\x0303', 'blue': '\x0302', 'red': '\x0305', 'yellow': '\x0307', 'brown': '\x0305', 'magenta': '\x0306', 'cyan': '\x0310', 'reset': '\x0F', } else: return { 'bold': '', 'green': '', 'blue': '', 'red': '', 'yellow': '', 'brown': '', 'magenta': '', 'cyan': '', 'reset': '', } def inform_irker(filename, log, branch='uncurated'): if app.config.forest.irker is None: return irker = app.config.forest.irker colours = irker_colours(irker.colour_style) privmsg = format_irker_message(irker.msg_template, irker.project, filename, log, branch, colours=colours) message = json.dumps({"to": irker.channels, "privmsg": privmsg}) try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((irker.host, irker.port)) sock.sendall(message + "\n") finally: sock.close() class ForestConfig(object): IRKER_PORT = 6659 IRKER_MSG_TEMPLATE = ( "%(bold)s%(project)s:%(reset)s " "%(magenta)s%(branch)s%(reset)s * " "%(bold)s%(filename)s%(reset)s: " "%(log)s" ) class SubConfig(object): """Holder for sub-config.""" def __init__(self, filenames): self._config = ConfigParser() self._config.read(filenames) self.main = self._parse("main") self.irker = self._parse("irker") self.cia = self._parse("cia") def _parse(self, section): if self._config.has_section(section): conf = dict(self._config.items(section)) else: conf = {} return getattr(self, "_parse_%s" % section)(conf) def _parse_main(self, conf): main = self.SubConfig() main.host = conf.get('host', '0.0.0.0') main.port = int(conf['port']) main.level_folder = conf['level_folder'] return main def _parse_irker(self, conf): if 'project' not in conf: return None irker = self.SubConfig() irker.project = conf['project'] irker.host = conf.get('host', 'localhost') irker.port = int(conf.get('port', self.IRKER_PORT)) irker.channels = conf['channels'] irker.msg_template = conf.get('msg_template', self.IRKER_MSG_TEMPLATE) irker.colour_style = conf.get('colour_style', 'mIRC') return irker def _parse_cia(self, conf): if 'project' not in conf: return None cia = self.SubConfig() cia.project = conf['project'] cia.url = conf.get('url', CIA_URL) return cia USAGE = """Usage: python -m mamba.forest <config file> The config file should look like: [main] host=0.0.0.0 # optional port=8000 level_folder=./my/levels [irker] host=localhost # optional port=8001 project=mamba-levels channels=channel1,channel2 """ if __name__ == "__main__": if len(sys.argv) != 2: print(USAGE) sys.exit(1) app.config.forest = ForestConfig([sys.argv[1]]) main = app.config.forest.main for ctype in ("curated", "uncurated"): folder = os.path.join(main.level_folder, ctype) if not os.path.exists(folder): os.makedirs(folder) if not os.path.isdir(folder): print("Level folder must be a folder.") sys.exit(1) cia_project = sys.argv[3] if len(sys.argv) > 3 else None # app.debug = True app.run(host=main.host, port=main.port)