@@ -13,295 +13,353 @@ # audio playlist format. It's audio/x-scpls mostly, seldomly m3u, # but sometimes url/direct if the entry[url] directly leads to the # streaming server. # # As fallback there is a regex which just looks for URLs in the -# given resource (works for m3u/pls/xspf/asx/...). There is no -# actual url "filename" extension guessing. +# given resource (works for m3u/pls/xspf/asx/...). import re import os -import ahttp as http -from config import conf, __print__, dbg +from ahttp import fix_url as http_fix_url, session +from config import conf, __print__ as debug, dbg import platform -# coupling to main window +# Coupling to main window +# main = None -#-- media actions -# -# implements "play" and "record" methods, -# but also "browser" for web URLs -# -class action: - - # streamlink map - lt = dict( - asx = "video/x-ms-asf", - pls = "audio/x-scpls", - m3u = "audio/x-mpegurl", - xspf = "application/xspf+xml", - href = "url/http", - src = "url/direct", - ram = "audio/x-pn-realaudio", - smil = "application/smil", - ) - # media map - mf = dict( - mp3 = "audio/mpeg", - ogg = "audio/ogg", - aac = "audio/aac", - ) - - - # web - @staticmethod - def browser(url): - bin = conf.play.get("url/http", "sensible-browser") - __print__( dbg.CONF, bin ) - action.run(bin + " " + action.quote(url)) - - - - # os shell cmd escaping - @staticmethod - def quote(s): - if conf.windows: - return str(s) # should actually be "\\\"%s\\\"" % s - else: - return "%r" % str(s) - - - # calls player for stream url and format - @staticmethod - def play(url, audioformat="audio/mpeg", listformat="text/x-href"): - if (url): - url = action.url(url, listformat) - if audioformat == "audio/mp3": - audioformat = "audio/mpeg" - cmd = action.mime_match(audioformat, conf.play) - try: - __print__( dbg.PROC, "play", url, cmd ) - action.run( action.interpol(cmd, url) ) - except: - pass - - - # exec wrapper - @staticmethod - def run(cmd): - if conf.windows: - os.system("start \"%s\"") - else: - os.system(cmd + " &") - - - # streamripper - @staticmethod - def record(url, audioformat="audio/mpeg", listformat="text/x-href", append="", row={}): - __print__( dbg.PROC, "record", url ) - cmd = action.mime_match(audioformat, conf.record) - try: action.run( action.interpol(cmd, url, row) + append ) - except: pass - - - # Convert MIME type into list of ["audio/xyz", "audio/*", "*/*"] for comparison against record/play association - @staticmethod - def mime_match(fmt, cmd_list): - for match in [ fmt, fmt[:fmt.find("/")] + "/*", "*/*" ]: - if cmd_list.get(match, None): - return cmd_list[match] - - - # save as .m3u - @staticmethod - def save(row, fn, listformat="audio/x-scpls"): - - # output format - format = re.findall("\.(m3u|pls|xspf|jspf|json|asx|smil)", fn) - - # modify stream url - row["url"] = action.url(row["url"], listformat) - stream_urls = action.extract_urls(row["url"], listformat) - - # M3U - if "m3u" in format: - txt = "#M3U\n" - for url in stream_urls: - txt += http.fix_url(url) + "\n" - - # PLS - elif "pls" in format: - txt = "[playlist]\n" + "numberofentries=1\n" - for i,u in enumerate(stream_urls): - i = str(i + 1) - txt += "File"+i + "=" + u + "\n" - txt += "Title"+i + "=" + row["title"] + "\n" - txt += "Length"+i + "=-1\n" - txt += "Version=2\n" - - # XSPF - elif "xspf" in format: - txt = '' + "\n" - txt += '' + "\n" - txt += '' + "\n" - for attr,tag in [("title","title"), ("homepage","info"), ("playing","annotation"), ("description","annotation")]: - if row.get(attr): - txt += " <"+tag+">" + xmlentities(row[attr]) + "\n" - txt += " \n" - for u in stream_urls: - txt += ' ' + xmlentities(u) + '' + "\n" - txt += " \n\n" - - # JSPF - elif "jspf" in format: - pass - - # JSON - elif "json" in format: - row["stream_urls"] = stream_urls - txt = str(row) # pseudo-json (python format) - - # ASX - elif "asx" in format: - txt = "\n" \ - + " " + xmlentities(row["title"]) + "\n" \ - + " \n" \ - + " " + xmlentities(row["title"]) + "\n" \ - + " \n" \ - + " \n" \ - + " \n\n" - - # SMIL - elif "smil" in format: - txt = "\n\n \n\n" \ - + "\n \n \n\n\n" - - # unknown - else: - return - - # write - if txt: - f = open(fn, "wb") - f.write(txt) - f.close() - pass - - - # replaces instances of %u, %l, %pls with urls / %g, %f, %s, %m, %m3u or local filenames - @staticmethod - def interpol(cmd, url, row={}): - # inject other meta fields - if row: - for field in row: - cmd = cmd.replace("%"+field, "%r" % row.get(field)) - # add default if cmd has no %url placeholder - if cmd.find("%") < 0: - cmd = cmd + " %m3u" - # standard placeholders - if (re.search("%(url|pls|[ulr])", cmd)): - cmd = re.sub("%(url|pls|[ulr])", action.quote(url), cmd) - if (re.search("%(m3u|[fgm])", cmd)): - cmd = re.sub("%(m3u|[fgm])", action.quote(action.m3u(url)), cmd) - if (re.search("%(srv|[ds])", cmd)): - cmd = re.sub("%(srv|[ds])", action.quote(action.srv(url)), cmd) - return cmd - - - # eventually transforms internal URN/IRI to URL - @staticmethod - def url(url, listformat): - if (listformat == "audio/x-scpls"): - url = url - elif (listformat == "text/x-urn-streamtuner2-script"): - url = main.special.stream_url(url) - else: - url = url - return url - - - # download a .pls resource and extract urls - @staticmethod - def pls(url): - text = http.get(url) - __print__( dbg.DATA, "pls_text=", text ) - return re.findall("\s*File\d*\s*=\s*(\w+://[^\s]+)", text, re.I) - # currently misses out on the titles - - # get a single direct ICY stream url (extract either from PLS or M3U) - @staticmethod - def srv(url): - return action.extract_urls(url)[0] - - - # retrieve real stream urls from .pls or .m3u links - @staticmethod - def extract_urls(pls, listformat="__not_used_yet__"): - # extract stream address from .pls URL - if (re.search("\.pls", pls)): #audio/x-scpls - return action.pls(pls) - elif (re.search("\.asx", pls)): #video/x-ms-asf - return re.findall(" 3 and stream_id != "XXXXXX") - - # check if there are any urls in a given file - @staticmethod - def has_urls(tmp_fn): - if os.path.exists(tmp_fn): - return open(tmp_fn, "r").read().find("http://") > 0 - - - # create a local .m3u file from it - @staticmethod - def m3u(pls): - - # temp filename - (tmp_fn, unique) = action.tmp_fn(pls) - # does it already exist? - if tmp_fn and unique and conf.reuse_m3u and action.has_urls(tmp_fn): - return tmp_fn - - # download PLS - __print__( dbg.DATA, "pls=",pls ) - url_list = action.extract_urls(pls) - __print__( dbg.DATA, "urls=", url_list ) - - # output URL list to temporary .m3u file - if (len(url_list)): - #tmp_fn = - f = open(tmp_fn, "w") - f.write("#M3U\n") - f.write("\n".join(url_list) + "\n") - f.close() - # return path/name of temporary file - return tmp_fn - else: - __print__( dbg.ERR, "error, there were no URLs in ", pls ) - raise "Empty PLS" - - - # open help browser - @staticmethod - def help(*args): - action.run("yelp /usr/share/doc/streamtuner2/help/") + +# Streamlink/listformat mapping +listfmt_t = { + "audio/x-scpls": "pls", + "audio/x-mpegurl": "m3u", + "video/x-ms-asf": "asx", + "application/xspf+xml": "xspf", + "*/*": "href", + "url/direct": "srv", + "url/youtube": "href", + "url/http": "href", + "audio/x-pn-realaudio": "ram", + "application/smil": "smil", + "application/vnd.ms-wpl":"smil", + "x-urn/st2-script": "script", # unused +} + +# Audio type MIME map +mediafmt_t = { + "audio/mpeg": "mp3", + "audio/ogg": "ogg", + "audio/aac" : "aac", + "audio/aacp" : "aac", + "audio/midi": "midi", + "audio/mod": "mod", + "audio/it+zip": "mod", + "audio/s3+zip": "mod", + "audio/xm+zip": "mod", +} + +# Player command placeholders for playlist formats +placeholder_map = dict( + pls = "%url | %pls | %u | %l | %r", + m3u = "%m3u | %f | %g | %m", + srv = "%srv | %d | %s", +) + +# Playlist format content probing (assert type) +playlist_content_map = { + "pls": r""" (?i)\[playlist\].*numberofentries""", + "xspf": r""" <\?xml .* ]*> .* """, + "wpl": r""" <\?wpl \s+ version="1\.0" \s* \?>""", + "jspf": r""" \{ \s* "playlist": \s* \{ """, + "json": r""" "url": \s* "\w+:// """, + "href": r""" .* """, +} + + + +# Exec wrapper +# +def run(cmd): + if cmd: debug(dbg.PROC, "Exec:", cmd) + try: os.system("start \"%s\"" % cmd if conf.windows else cmd + " &") + except: debug(dbg.ERR, "Command not found:", cmd) + + +# Start web browser +# +def browser(url): + bin = conf.play.get("url/http", "sensible-browser") + run(bin + " " + quote(url)) + + +# Open help browser, streamtuner2 pages +# +def help(*args): + run("yelp /usr/share/doc/streamtuner2/help/") + + +# Calls player for stream url and format +# +def play(url, audioformat="audio/mpeg", listformat="href"): + cmd = mime_app(audioformat, conf.play) + cmd = interpol(cmd, url, listformat) + run(cmd) + + +# Call streamripper +# +def record(url, audioformat="audio/mpeg", listformat="href", append="", row={}): + cmd = mime_app(audioformat, conf.record) + cmd = interpol(cmd, url, listformat, row) + run(cmd) + + +# OS shell command escaping +# +def quote(ins): + if type(ins) is str: + return "%r" % str(ins) + else: + return " ".join(["%r" % str(s) for s in ins]) + + +# Convert e.g. "text/x-scpls" MIME types to just "pls" monikers +# +def listfmt(t = "pls"): + if t in listfmt_t.values(): + for short,mime in listfmt_t.items(): + if mime == t: + return short + return t # "pls" + + +# Convert MIME type into list of ["audio/xyz", "audio/*", "*/*"] +# for comparison against configured record/play association. +def mime_app(fmt, cmd_list): + major = fmt[:fmt.find("/")] + for match in [ fmt, major + "/*", "*/*" ]: + if cmd_list.get(match): + return cmd_list[match] + + + +# Replaces instances of %m3u, %pls, %srv in a command string. +# · Also understands short aliases %l, %f, %d. +# · And can embed %title or %genre placeholders. +# · Replace .pls URL with local .m3u file depending on map. +# +def interpol(cmd, url, source="pls", row={}): + + # inject other meta fields + if row: + for field in row: + cmd = cmd.replace("%"+field, "%r" % row.get(field)) + + # add default if cmd has no %url placeholder + if cmd.find("%") < 0: + cmd = cmd + " %m3u" + + # standard placeholders + for dest, rx in placeholder_map.items(): + if re.search(rx, cmd, re.X): + # from .pls to .m3u + urls = convert_playlist(url, listfmt(source), listfmt(dest)) + # insert quoted URL/filepath + return re.sub(rx, cmd, quote(urls), 2, re.X) + + return "false" + + +# Substitute .pls URL with local .m3u, +# or direct srv address, or leave as-is. +# +def convert_playlist(url, source, dest): + urls = [] + + print(dbg.PROC, "convert_playlist(", url, source, dest, ")") + + # Leave alone + is_url = re.search("\w+://", url) + if source == dest or source in ("srv", "href") or not is_url: + return [url] + + # Retrieve from URL + (mime, cnt) = http_probe_get(url) + + # Leave streaming server as is + if mime == "srv": + cnt = "" + return [url] + + # Test URL path "extension" for ".pls" / ".m3u" etc. + ext = re.findall("\.(\w)$|($)", url)[0] + + # Probe MIME type and content per regex + probe = None + for probe,rx in playlist_content_map.items(): + if re.search(rx, cnt, re.X|re.S): + break # with `probe` set + + # Check ambiguity (except pseudo extension) + if len(set([source, mime, probe])) > 1: + print(dbg.ERR, "Possible playlist format mismatch:", (source, mime, probe, ext)) + + # Extract URLs from content + for fmt,extractor in [ ("pls",extract_playlist.pls), ("asx",extract_playlist.asx), ("raw",extract_playlist.raw) ]: + if not urls and fmt in (source, mime, probe, ext): + urls = extractor(cnt) + + # Return asis for srv targets + if dest in ("srv", "href", "any"): + return urls + print urls + + # Otherwise convert to local file + fn = tmp_fn(cnt) + save(urls[0], fn, dest) + return [fn] + + + +# Tries to fetch a resource, aborts on ICY responses. +# +def http_probe_get(url): + + # possible streaming request + r = session.get(url, stream=True) + if not len(r.headers): + return ("srv", r) + + # extract payload + mime = r.headers.get("content-type", "any") + if mediafmt_t.get(mime): + mime = mediafmt_t.get(mime) + content = "".join(r.iter_lines()) + return (mime, content) + + + +# Extract URLs from playlist formats: +# +class extract_playlist(object): + + @staticmethod + def pls(text): + return re.findall("\s*File\d*\s*=\s*(\w+://[^\s]+)", text, re.I) + + @staticmethod + def asx(text): + return re.findall("" + xmlentities(row[attr]) + "\n" + txt += " \n" + for u in stream_urls: + txt += ' ' + xmlentities(u) + '' + "\n" + txt += " \n\n" + + # JSPF + elif "jspf" in format: + pass + + # JSON + elif "json" in format: + row["stream_urls"] = stream_urls + txt = str(row) # pseudo-json (python format) + + # ASX + elif "asx" in format: + txt = "\n" \ + + " " + xmlentities(row["title"]) + "\n" \ + + " \n" \ + + " " + xmlentities(row["title"]) + "\n" \ + + " \n" \ + + " \n" \ + + " \n\n" + + # SMIL + elif "smil" in format: + txt = "\n\n \n\n" \ + + "\n \n \n\n\n" + + # unknown + else: + return + + # write + if txt: + with open(fn, "wb") as f: + f.write(txt) + pass + + + +# generate filename for temporary .m3u, if possible with unique id +def tmp_fn(pls): + # use shoutcast unique stream id if available + stream_id = re.search("http://.+?/.*?(\d+)", pls, re.M) + stream_id = stream_id and stream_id.group(1) or "XXXXXX" + try: + channelname = main.current_channel + except: + channelname = "unknown" + return (str(conf.tmp) + os.sep + "streamtuner2."+channelname+"."+stream_id+".m3u", len(stream_id) > 3 and stream_id != "XXXXXX") + + +# check if there are any urls in a given file +def has_urls(tmp_fn): + if os.path.exists(tmp_fn): + return open(tmp_fn, "r").read().find("http://") > 0 +