Mercurial > rinkhals
view data/sounds/get-sources @ 498:62b9a4e21f1a
chickens in closed buildings deselected. building opens if you put in chickens and it's not full. opening building with move or select tool allows rearrangement of chickens in building. fixed multiselect in buildings.
author | Adrianna Pińska <adrianna.pinska@gmail.com> |
---|---|
date | Wed, 25 Nov 2009 23:51:33 +0000 |
parents | 55105d3bdab1 |
children |
line wrap: on
line source
#!/usr/bin/env python """Script to fetch sources defined in sources.txt""" import ConfigParser import logging import os import subprocess import urllib import urllib2 import zipfile try: import pymedia.audio.acodec as acodec import pymedia.muxer as muxer except ImportError, e: logging.info("pymedia not installed, will use transcode to convert files: %s", e) acodec = None muxer = None def iter_sources(filename=None): """fetches a bunch of sound sources from the descriptions in the given filename""" if filename is None: filename = os.path.join(os.path.dirname(os.path.abspath(__file__)), "sources.txt") source_config = ConfigParser.RawConfigParser() source_config.read(filename) for section in source_config.sections(): yield section, dict(source_config.items(section)) def convert_audio(source_filename, target_filename, source_format, target_format, converter=None): """converts audio between files""" logging.info("Converting %s (format %s) to %s (format %s)", source_filename, source_format, target_filename, target_format) if converter == "oggenc": subprocess.call(["oggenc", source_filename, "-o", target_filename]) return if not acodec or not muxer: logging.debug("pymedia not present: will try use transcode") if source_format == "aiff": source_format = "mplayer" options = ["-y", "null,%s" % target_format, "-i", source_filename, "-o", target_filename] if source_format not in ["wav", "mp3", "ogg"]: options += ["-x", "null,%s" % source_format] subprocess.call(["transcode"] + options) return source_file = open(source_filename, 'rb') s = source_file.read(8192) dm = muxer.Demuxer(source_format) frames = dm.parse(s) print dm.hasHeader(), dm.getInfo() dec = acodec.Decoder(dm.streams[0]) frame = r[0] r = dec.decode(frame[1]) print r.sample_rate, r.channels, r.bitrate, r.sample_length params = { 'id': acodec.getCodecId(target_format), 'bitrate': r.bitrate, 'sample_rate': r.sample_rate, 'ext': target_format, 'channels': r.channels } enc = acodec.Encoder(params) enc.setInfo(dec.getInfo()) target_file = open(target_filename, 'wb') while len(s): frames = enc.encode(r.data) target_file.write(enc.mux(frames)) s = source_file.read(1024) r = dec.decode(s) target_file.write(enc.flush()) target_file.close() def lazy_login(options): """performs a lazy login for the given options""" if not options.get("lazy", False): # this login has already happened return options["lazy"] = False options = options.copy() url = options.pop("url") params = urllib.urlencode(options) logging.info("Logging in to %s", url) f = opener.open(url, params) contents = f.read() f.close() def handle_logins(config): """logs in to necessary sites and returns urllib2 opener with cookies set up""" opener = urllib2.build_opener(urllib2.HTTPCookieProcessor()) opener.weblogin = {} urllib2.install_opener(opener) for section in config.sections(): options = dict(config.items(section)) opener.weblogin[section] = options opener.weblogin[section]["lazy"] = True return opener def extract_archive(archive_filename, archive_member, target_filename): """extracts file from an archive""" archive = zipfile.ZipFile(archive_filename) contents = archive.read(archive_member, "rb") target_file = open(target_filename, "wb") target_file.write(contents) target_file.close() if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) target_dir = os.path.dirname(os.path.abspath(__file__)) web_config = ConfigParser.RawConfigParser() web_config.read(os.path.join(target_dir, "web.ini")) opener = handle_logins(web_config) for filename, source_options in iter_sources(os.path.join(target_dir, "sources.txt")): download_filename = filename orig_ext = source_options.get("originalextension", None) archive_ext = source_options.get("archiveextension", None) is_archive = archive_ext if archive_ext: download_ext = archive_ext else: download_ext = orig_ext url = source_options["url"] target_name, target_ext = os.path.splitext(filename) target_ext = target_ext.lstrip(".").lower() if is_archive: download_filename = url[url.rfind("/")+1:] elif download_ext: download_filename = target_name + "." + download_ext download_filename = os.path.join(target_dir, download_filename) if not os.path.exists(download_filename): if "weblogin" in source_options: lazy_login(opener.weblogin[source_options["weblogin"]]) logging.info("Downloading %s to %s", url, download_filename) contents = opener.open(url).read() if "<html" in contents[:1024].lower(): logging.error("%s returned HTML rather than file contents", url) continue f = open(download_filename, "wb") f.write(contents) f.close() target_filename = os.path.join(target_dir, filename) if not os.path.exists(target_filename): if is_archive: if not orig_ext: archive_filename = source_options.get("archivemember", filename) orig_ext = os.path.splitext(archive_filename)[1].lstrip(".") archive_filename = target_name + "." + orig_ext source_filename = os.path.join(target_dir, archive_filename) if not os.path.exists(source_filename): extract_archive(download_filename, source_options.get("archivemember", filename), source_filename) else: source_filename = download_filename orig_format = source_options.get("originalformat", orig_ext) target_format = source_options.get("targetformat", target_ext) converter = source_options.get("converter", None) convert_audio(source_filename, target_filename, orig_format, target_format, converter)