view mamba/habitats/userlevelmenu.py @ 597:16c690a7dc27

Fix bare except clauses.
author Simon Cross <hodgestar@gmail.com>
date Sat, 14 Jan 2023 19:00:34 +0100
parents ffb0134be578
children
line wrap: on
line source

"""Level menu."""

from urllib import request
import zipfile
from io import BytesIO

from mamba.habitats.levelmenu import LevelMenu
from mamba.level import Level
from mamba.constants import LEVEL_SERVER, LEVEL_SERVER_VERSION
from mamba.data import get_level_list, load_file


class UserLevelApi(object):

    def __init__(self, ctype, url=LEVEL_SERVER, timeout=10):
        assert ctype in ("curated", "uncurated")
        assert url.endswith("/")
        self.ctype = ctype
        self.level_namespace = ctype
        self.url = url
        self.timeout = timeout
        self.cache = {}

    def _url_data(self, route):
        url = "%s%s/%s" % (self.url, self.ctype, route)
        return request.urlopen(url, timeout=self.timeout).read()

    def _populate_level(self, name):
        try:
            source = self._url_data("level/%s" % name)
            level = Level(name, self.level_namespace, source)
        except Exception:
            print("Failed to download online level %r" % name)
            return
        self.cache[name] = level

    def _populate_cache(self):
        try:
            data = self._url_data("index")
        except Exception:
            print("Failed to download online level index.")
            return
        levels = [x.strip() for x in data.splitlines()]

        for name in levels:
            self._populate_level(name)

    def _populate_cache_from_zip(self):
        try:
            data = self._url_data("levels.zip")
        except Exception:
            print("Failed to download online level zip.")
            return
        level_zip = zipfile.ZipFile(BytesIO(data), "r")
        levels = level_zip.namelist()
        for level in levels:
            try:
                source = level_zip.read(level).decode('utf-8')
                self.cache[level] = Level(level, self.level_namespace, source)
            except Exception as e:
                print("Failed to parse online level %r" % level)
                print('Error: ', e)

    def list_levels(self):
        if not self.cache:
            if LEVEL_SERVER_VERSION >= 2:
                self._populate_cache_from_zip()
            else:
                self._populate_cache()
        return sorted(self.cache.keys())

    def print_levels(self):
        title = "%s levels:" % self.ctype.title()
        print(title)
        print("=" * len(title))
        for name in self.list_levels():
            print(name)
        print("=" * len(title))

    def get_level(self, name):
        if name not in self.cache:
            self._populate_level(name)
        return self.cache[name]


class NetworkLevelMenu(LevelMenu):

    API = UserLevelApi("curated")

    @property
    def level_namespace(self):
        self.API.level_namespace

    def list_levels(self):
        return self.API.list_levels()

    def get_level(self, name):
        return self.API.get_level(name)


class UserLevelMenu(LevelMenu):

    level_namespace = "user"

    def on_enter(self):
        super(UserLevelMenu, self).on_enter()
        self._levels = {}
        self._level_list = []

    def list_levels(self):
        if not self._level_list:
            for name in get_level_list('user_levels', is_user_dir=True):
                try:
                    self.get_level(name)
                    self._level_list.append(name)
                except Exception:
                    print("Invalid user level:", name)
        return self._level_list[:]

    def get_level(self, name):
        if name not in self._levels:
            src = load_file('user_levels/%s.txt' % (name,), is_user_dir=True)
            self._levels[name] = Level(name, self.level_namespace, src.read())
        return self._levels[name]