Mercurial > rinkhals
view data/sounds/get-sources @ 419:d110d55c8449
Move hatching logic into chickens.
author | Jeremy Thurgood <firxen@gmail.com> |
---|---|
date | Sat, 21 Nov 2009 15:22:41 +0000 |
parents | 55105d3bdab1 |
children |
line wrap: on
line source
#!/usr/bin/env python """Script to fetch sources defined in sources.txt""" import ConfigParser import logging import os import subprocess import urllib import urllib2 import zipfile try: import pymedia.audio.acodec as acodec import pymedia.muxer as muxer except ImportError, e: logging.info("pymedia not installed, will use transcode to convert files: %s", e) acodec = None muxer = None def iter_sources(filename=None): """fetches a bunch of sound sources from the descriptions in the given filename""" if filename is None: filename = os.path.join(os.path.dirname(os.path.abspath(__file__)), "sources.txt") source_config = ConfigParser.RawConfigParser() source_config.read(filename) for section in source_config.sections(): yield section, dict(source_config.items(section)) def convert_audio(source_filename, target_filename, source_format, target_format, converter=None): """converts audio between files""" logging.info("Converting %s (format %s) to %s (format %s)", source_filename, source_format, target_filename, target_format) if converter == "oggenc": subprocess.call(["oggenc", source_filename, "-o", target_filename]) return if not acodec or not muxer: logging.debug("pymedia not present: will try use transcode") if source_format == "aiff": source_format = "mplayer" options = ["-y", "null,%s" % target_format, "-i", source_filename, "-o", target_filename] if source_format not in ["wav", "mp3", "ogg"]: options += ["-x", "null,%s" % source_format] subprocess.call(["transcode"] + options) return source_file = open(source_filename, 'rb') s = source_file.read(8192) dm = muxer.Demuxer(source_format) frames = dm.parse(s) print dm.hasHeader(), dm.getInfo() dec = acodec.Decoder(dm.streams[0]) frame = r[0] r = dec.decode(frame[1]) print r.sample_rate, r.channels, r.bitrate, r.sample_length params = { 'id': acodec.getCodecId(target_format), 'bitrate': r.bitrate, 'sample_rate': r.sample_rate, 'ext': target_format, 'channels': r.channels } enc = acodec.Encoder(params) enc.setInfo(dec.getInfo()) target_file = open(target_filename, 'wb') while len(s): frames = enc.encode(r.data) target_file.write(enc.mux(frames)) s = source_file.read(1024) r = dec.decode(s) target_file.write(enc.flush()) target_file.close() def lazy_login(options): """performs a lazy login for the given options""" if not options.get("lazy", False): # this login has already happened return options["lazy"] = False options = options.copy() url = options.pop("url") params = urllib.urlencode(options) logging.info("Logging in to %s", url) f = opener.open(url, params) contents = f.read() f.close() def handle_logins(config): """logs in to necessary sites and returns urllib2 opener with cookies set up""" opener = urllib2.build_opener(urllib2.HTTPCookieProcessor()) opener.weblogin = {} urllib2.install_opener(opener) for section in config.sections(): options = dict(config.items(section)) opener.weblogin[section] = options opener.weblogin[section]["lazy"] = True return opener def extract_archive(archive_filename, archive_member, target_filename): """extracts file from an archive""" archive = zipfile.ZipFile(archive_filename) contents = archive.read(archive_member, "rb") target_file = open(target_filename, "wb") target_file.write(contents) target_file.close() if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) target_dir = os.path.dirname(os.path.abspath(__file__)) web_config = ConfigParser.RawConfigParser() web_config.read(os.path.join(target_dir, "web.ini")) opener = handle_logins(web_config) for filename, source_options in iter_sources(os.path.join(target_dir, "sources.txt")): download_filename = filename orig_ext = source_options.get("originalextension", None) archive_ext = source_options.get("archiveextension", None) is_archive = archive_ext if archive_ext: download_ext = archive_ext else: download_ext = orig_ext url = source_options["url"] target_name, target_ext = os.path.splitext(filename) target_ext = target_ext.lstrip(".").lower() if is_archive: download_filename = url[url.rfind("/")+1:] elif download_ext: download_filename = target_name + "." + download_ext download_filename = os.path.join(target_dir, download_filename) if not os.path.exists(download_filename): if "weblogin" in source_options: lazy_login(opener.weblogin[source_options["weblogin"]]) logging.info("Downloading %s to %s", url, download_filename) contents = opener.open(url).read() if "<html" in contents[:1024].lower(): logging.error("%s returned HTML rather than file contents", url) continue f = open(download_filename, "wb") f.write(contents) f.close() target_filename = os.path.join(target_dir, filename) if not os.path.exists(target_filename): if is_archive: if not orig_ext: archive_filename = source_options.get("archivemember", filename) orig_ext = os.path.splitext(archive_filename)[1].lstrip(".") archive_filename = target_name + "." + orig_ext source_filename = os.path.join(target_dir, archive_filename) if not os.path.exists(source_filename): extract_archive(download_filename, source_options.get("archivemember", filename), source_filename) else: source_filename = download_filename orig_format = source_options.get("originalformat", orig_ext) target_format = source_options.get("targetformat", target_ext) converter = source_options.get("converter", None) convert_audio(source_filename, target_filename, orig_format, target_format, converter)