Mercurial > mamba
comparison mamba/forest.py @ 578:1306f7d8ed35
Add support for serving zip file containing all levels.
author | Simon Cross <hodgestar@gmail.com> |
---|---|
date | Sun, 06 Oct 2013 22:47:45 +0200 |
parents | e6344f57886e |
children | 60985ca71af5 |
comparison
equal
deleted
inserted
replaced
577:486c7ae141ea | 578:1306f7d8ed35 |
---|---|
9 import time | 9 import time |
10 import os | 10 import os |
11 import sys | 11 import sys |
12 import socket | 12 import socket |
13 import json | 13 import json |
14 import StringIO | |
15 import zipfile | |
14 | 16 |
15 app = Flask(__name__) | 17 app = Flask(__name__) |
16 | 18 |
17 | 19 |
18 def path(ctype): | 20 class LevelSet(object): |
19 main = app.config.forest.main | 21 def __init__(self, ctype): |
20 if ctype == "curated": | 22 self.ctype = ctype |
21 return os.path.join(main.level_folder, 'curated') | 23 self.folder = self._path(ctype) |
22 elif ctype == "uncurated": | 24 |
23 return os.path.join(main.level_folder, 'uncurated') | 25 @classmethod |
24 abort(404, "Not found") | 26 def _path(cls, ctype): |
25 | 27 main = app.config.forest.main |
26 | 28 if ctype == "curated": |
27 def list_levels(folder): | 29 return os.path.join(main.level_folder, 'curated') |
28 endl = len(".txt") | 30 elif ctype == "uncurated": |
29 files = [x[:-endl] for x in os.listdir(folder) | 31 return os.path.join(main.level_folder, 'uncurated') |
30 if not x.startswith('.') and x.endswith('.txt')] | 32 abort(404, "Not found") |
31 return "\n".join(files) | 33 |
34 def _level_path(self, levelname): | |
35 filename = "%s.txt" % secure_filename(levelname) | |
36 return os.path.join(self.folder, filename) | |
37 | |
38 def list_levels(self): | |
39 endl = len(".txt") | |
40 files = [x[:-endl] for x in os.listdir(self.folder) | |
41 if not x.startswith('.') and x.endswith('.txt')] | |
42 return files | |
43 | |
44 def read_level(self, levelname): | |
45 level_path = self._level_path(levelname) | |
46 if not os.path.isfile(level_path): | |
47 abort(404, "Level not found. Hsss.") | |
48 with open(level_path) as level: | |
49 return level.read() | |
50 | |
51 def write_level(self, levelname, leveldata): | |
52 level_path = self._level_path(levelname) | |
53 if os.path.exists(level_path): | |
54 abort(409, "Mamba already resident.") | |
55 with open(level_path, 'w') as level: | |
56 level.write(leveldata) | |
57 | |
58 def zip_levels(self): | |
59 levels_raw = StringIO.StringIO() | |
60 levels_zip = zipfile.ZipFile( | |
61 levels_raw, "w", compression=zipfile.ZIP_DEFLATED) | |
62 for levelname in self.list_levels(): | |
63 levels_zip.writestr( | |
64 "%s.txt" % levelname, self.read_level(levelname)) | |
65 levels_zip.close() | |
66 return levels_raw.getvalue() | |
32 | 67 |
33 | 68 |
34 @app.route("/<ctype>/index") | 69 @app.route("/<ctype>/index") |
35 def index(ctype): | 70 def index(ctype): |
36 ctype = path(ctype) | 71 ls = LevelSet(ctype) |
37 return list_levels(ctype) | 72 levels = ls.list_levels() |
73 return "\n".join(levels) | |
38 | 74 |
39 | 75 |
40 @app.route("/<ctype>/level/<levelname>") | 76 @app.route("/<ctype>/level/<levelname>") |
41 def level(ctype, levelname): | 77 def level(ctype, levelname): |
42 ctype = path(ctype) | 78 ls = LevelSet(ctype) |
43 levelname = "%s.txt" % secure_filename(levelname) | 79 return ls.read_level(levelname) |
44 levelpath = os.path.join(ctype, levelname) | 80 |
45 if not os.path.isfile(levelpath): | 81 |
46 abort(404, "Level not found. Hsss.") | 82 @app.route("/<ctype>/levels.zip") |
47 with open(levelpath) as level: | 83 def levels_zip(ctype): |
48 return level.read() | 84 ls = LevelSet(ctype) |
85 return ls.zip_levels() | |
49 | 86 |
50 | 87 |
51 @app.route("/save/<levelname>", methods=['GET', 'POST']) | 88 @app.route("/save/<levelname>", methods=['GET', 'POST']) |
52 def save(levelname): | 89 def save(levelname): |
53 ts = datetime.now().strftime("%Y%m%d.%H%M%S") | 90 ts = datetime.now().strftime("%Y%m%d.%H%M%S") |
54 levelname = "%s.%s.txt" % (secure_filename(levelname), ts) | 91 levelname = "%s.%s" % (levelname, ts) |
55 levelpath = os.path.join(path("uncurated"), levelname) | 92 ls = LevelSet("uncurated") |
56 if request.method == 'POST': | 93 if request.method == 'POST': |
57 if os.path.exists(levelpath): | |
58 abort(409, "Mamba already resident.") | |
59 leveldata = request.form['data'].encode('ascii') | 94 leveldata = request.form['data'].encode('ascii') |
60 with open(levelpath, 'w') as level: | 95 ls.write_level(levelname, leveldata) |
61 level.write(leveldata) | |
62 inform_cia(levelname, "New level uploaded.", branch="uncurated") | 96 inform_cia(levelname, "New level uploaded.", branch="uncurated") |
63 inform_irker(levelname, "New level uploaded.", branch="uncurated") | 97 inform_irker(levelname, "New level uploaded.", branch="uncurated") |
64 return "Ssss." | 98 return "Ssss." |
65 else: | 99 else: |
66 abort(405, "Post levels here. Hsss.") | 100 abort(405, "Post levels here. Hsss.") |
115 'timestamp': int(time.time()), | 149 'timestamp': int(time.time()), |
116 'revision': '0', | 150 'revision': '0', |
117 'author': 'unknown', | 151 'author': 'unknown', |
118 'file': filename, | 152 'file': filename, |
119 'log': log, | 153 'log': log, |
120 } | 154 } |
121 srv = xmlrpclib.Server(cia.url) | 155 srv = xmlrpclib.Server(cia.url) |
122 srv.hub.deliver(msg) | 156 srv.hub.deliver(msg) |
123 | 157 |
124 | 158 |
125 def format_irker_message(msg_template, project, filename, log, | 159 def format_irker_message(msg_template, project, filename, log, |