Index: action.py ================================================================== --- action.py +++ action.py @@ -2,11 +2,11 @@ # encoding: UTF-8 # api: streamtuner2 # type: functions # title: play/record actions # description: Starts audio applications, guesses MIME types for URLs -# version: 0.7 +# version: 0.8 # # Multimedia interface for starting audio players, recording app, # or web browser (listed as "url/http" association in players). # # Each channel plugin has a .listtype which describes the linked @@ -24,258 +24,284 @@ import ahttp as http from config import conf, __print__, dbg import platform +# coupling to main window main = None -#-- media actions --------------------------------------------- +#-- media actions # # implements "play" and "record" methods, # but also "browser" for web URLs # class action: - # streamlink formats - lt = {"asx":"video/x-ms-asf", "pls":"audio/x-scpls", "m3u":"audio/x-mpegurl", "xspf":"application/xspf+xml", "href":"url/http", "ram":"audio/x-pn-realaudio", "smil":"application/smil"} - # media formats - mf = {"mp3":"audio/mpeg", "ogg":"audio/ogg", "aac":"audio/aac"} - - - # web - @staticmethod - def browser(url): - bin = conf.play.get("url/http", "sensible-browser") - __print__( dbg.CONF, bin ) - action.run(bin + " " + action.quote(url)) - - - - # os shell cmd escaping - @staticmethod - def quote(s): - if conf.windows: - return str(s) # should actually be "\\\"%s\\\"" % s - else: - return "%r" % str(s) - - - # calls player for stream url and format - @staticmethod - def play(url, audioformat="audio/mpeg", listformat="text/x-href"): - if (url): - url = action.url(url, listformat) - if audioformat == "audio/mp3": - audioformat = "audio/mpeg" - cmd = action.mime_match(audioformat, conf.play) - try: - __print__( dbg.PROC, "play", url, cmd ) - action.run( action.interpol(cmd, url) ) - except: - pass - - - # exec wrapper - @staticmethod - def run(cmd): - if conf.windows: - os.system("start \"%s\"") - else: - os.system(cmd + " &") - - - # streamripper - @staticmethod - def record(url, audioformat="audio/mpeg", listformat="text/x-href", append="", row={}): - __print__( dbg.PROC, "record", url ) - cmd = action.mime_match(audioformat, conf.record) - try: action.run( action.interpol(cmd, url, row) + append ) - except: pass - - - # Convert MIME type into list of ["audio/xyz", "audio/*", "*/*"] for comparison against record/play association - @staticmethod - def mime_match(fmt, cmd_list): - for match in [ fmt, fmt[:fmt.find("/")] + "/*", "*/*" ]: - if cmd_list.get(match, None): - return cmd_list[match] - - - # save as .m3u - @staticmethod - def save(row, fn, listformat="audio/x-scpls"): - # modify stream url - row["url"] = action.url(row["url"], listformat) - stream_urls = action.extract_urls(row["url"], listformat) - # output format - if (re.search("\.m3u", fn)): - txt = "#M3U\n" - for url in stream_urls: - txt += http.fix_url(url) + "\n" - # output format - elif (re.search("\.pls", fn)): - txt = "[playlist]\n" + "numberofentries=1\n" - for i,u in enumerate(stream_urls): - i = str(i + 1) - txt += "File"+i + "=" + u + "\n" - txt += "Title"+i + "=" + row["title"] + "\n" - txt += "Length"+i + "=-1\n" - txt += "Version=2\n" - # output format - elif (re.search("\.xspf", fn)): - txt = '' + "\n" - txt += '' + "\n" - txt += '' + "\n" - for attr,tag in [("title","title"), ("homepage","info"), ("playing","annotation"), ("description","annotation")]: - if row.get(attr): - txt += " <"+tag+">" + xmlentities(row[attr]) + "\n" - txt += " \n" - for u in stream_urls: - txt += ' ' + xmlentities(u) + '' + "\n" - txt += " \n\n" - # output format - elif (re.search("\.json", fn)): - row["stream_urls"] = stream_urls - txt = str(row) # pseudo-json (python format) - # output format - elif (re.search("\.asx", fn)): - txt = "\n" \ - + " " + xmlentities(row["title"]) + "\n" \ - + " \n" \ - + " " + xmlentities(row["title"]) + "\n" \ - + " \n" \ - + " \n" \ - + " \n\n" - # output format - elif (re.search("\.smil", fn)): - txt = "\n\n \n\n" \ - + "\n \n \n\n\n" - # unknown - else: - txt = "" - # write - if txt: - f = open(fn, "wb") - f.write(txt) - f.close() - pass - - - # replaces instances of %u, %l, %pls with urls / %g, %f, %s, %m, %m3u or local filenames - @staticmethod - def interpol(cmd, url, row={}): - # inject other meta fields - if row: - for field in row: - cmd = cmd.replace("%"+field, "%r" % row.get(field)) - # add default if cmd has no %url placeholder - if cmd.find("%") < 0: - cmd = cmd + " %m3u" - # standard placeholders - if (re.search("%(url|pls|[ulr])", cmd)): - cmd = re.sub("%(url|pls|[ulr])", action.quote(url), cmd) - if (re.search("%(m3u|[fgm])", cmd)): - cmd = re.sub("%(m3u|[fgm])", action.quote(action.m3u(url)), cmd) - if (re.search("%(srv|[ds])", cmd)): - cmd = re.sub("%(srv|[ds])", action.quote(action.srv(url)), cmd) - return cmd - - - # eventually transforms internal URN/IRI to URL - @staticmethod - def url(url, listformat): - if (listformat == "audio/x-scpls"): - url = url - elif (listformat == "text/x-urn-streamtuner2-script"): - url = main.special.stream_url(url) - else: - url = url - return url - - - # download a .pls resource and extract urls - @staticmethod - def pls(url): - text = http.get(url) - __print__( dbg.DATA, "pls_text=", text ) - return re.findall("\s*File\d*\s*=\s*(\w+://[^\s]+)", text, re.I) - # currently misses out on the titles - - # get a single direct ICY stream url (extract either from PLS or M3U) - @staticmethod - def srv(url): - return action.extract_urls(url)[0] - - - # retrieve real stream urls from .pls or .m3u links - @staticmethod - def extract_urls(pls, listformat="__not_used_yet__"): - # extract stream address from .pls URL - if (re.search("\.pls", pls)): #audio/x-scpls - return action.pls(pls) - elif (re.search("\.asx", pls)): #video/x-ms-asf - return re.findall(" 3 and stream_id != "XXXXXX") - - # check if there are any urls in a given file - @staticmethod - def has_urls(tmp_fn): - if os.path.exists(tmp_fn): - return open(tmp_fn, "r").read().find("http://") > 0 - - - # create a local .m3u file from it - @staticmethod - def m3u(pls): - - # temp filename - (tmp_fn, unique) = action.tmp_fn(pls) - # does it already exist? - if tmp_fn and unique and conf.reuse_m3u and action.has_urls(tmp_fn): - return tmp_fn - - # download PLS - __print__( dbg.DATA, "pls=",pls ) - url_list = action.extract_urls(pls) - __print__( dbg.DATA, "urls=", url_list ) - - # output URL list to temporary .m3u file - if (len(url_list)): - #tmp_fn = - f = open(tmp_fn, "w") - f.write("#M3U\n") - f.write("\n".join(url_list) + "\n") - f.close() - # return path/name of temporary file - return tmp_fn - else: - __print__( dbg.ERR, "error, there were no URLs in ", pls ) - raise "Empty PLS" - - # open help browser - @staticmethod - def help(*args): - - action.run("yelp /usr/share/doc/streamtuner2/help/") - #or action.browser("/usr/share/doc/streamtuner2/") - -#class action - + # streamlink map + lt = dict( + asx = "video/x-ms-asf", + pls = "audio/x-scpls", + m3u = "audio/x-mpegurl", + xspf = "application/xspf+xml", + href = "url/http", + src = "url/direct", + ram = "audio/x-pn-realaudio", + smil = "application/smil", + ) + # media map + mf = dict( + mp3 = "audio/mpeg", + ogg = "audio/ogg", + aac = "audio/aac", + ) + + + # web + @staticmethod + def browser(url): + bin = conf.play.get("url/http", "sensible-browser") + __print__( dbg.CONF, bin ) + action.run(bin + " " + action.quote(url)) + + + + # os shell cmd escaping + @staticmethod + def quote(s): + if conf.windows: + return str(s) # should actually be "\\\"%s\\\"" % s + else: + return "%r" % str(s) + + + # calls player for stream url and format + @staticmethod + def play(url, audioformat="audio/mpeg", listformat="text/x-href"): + if (url): + url = action.url(url, listformat) + if audioformat == "audio/mp3": + audioformat = "audio/mpeg" + cmd = action.mime_match(audioformat, conf.play) + try: + __print__( dbg.PROC, "play", url, cmd ) + action.run( action.interpol(cmd, url) ) + except: + pass + + + # exec wrapper + @staticmethod + def run(cmd): + if conf.windows: + os.system("start \"%s\"") + else: + os.system(cmd + " &") + + + # streamripper + @staticmethod + def record(url, audioformat="audio/mpeg", listformat="text/x-href", append="", row={}): + __print__( dbg.PROC, "record", url ) + cmd = action.mime_match(audioformat, conf.record) + try: action.run( action.interpol(cmd, url, row) + append ) + except: pass + + + # Convert MIME type into list of ["audio/xyz", "audio/*", "*/*"] for comparison against record/play association + @staticmethod + def mime_match(fmt, cmd_list): + for match in [ fmt, fmt[:fmt.find("/")] + "/*", "*/*" ]: + if cmd_list.get(match, None): + return cmd_list[match] + + + # save as .m3u + @staticmethod + def save(row, fn, listformat="audio/x-scpls"): + + # output format + format = re.findall("\.(m3u|pls|xspf|jspf|json|asx|smil)", fn) + + # modify stream url + row["url"] = action.url(row["url"], listformat) + stream_urls = action.extract_urls(row["url"], listformat) + + # M3U + if "m3u" in format: + txt = "#M3U\n" + for url in stream_urls: + txt += http.fix_url(url) + "\n" + + # PLS + elif "pls" in format: + txt = "[playlist]\n" + "numberofentries=1\n" + for i,u in enumerate(stream_urls): + i = str(i + 1) + txt += "File"+i + "=" + u + "\n" + txt += "Title"+i + "=" + row["title"] + "\n" + txt += "Length"+i + "=-1\n" + txt += "Version=2\n" + + # XSPF + elif "xspf" in format: + txt = '' + "\n" + txt += '' + "\n" + txt += '' + "\n" + for attr,tag in [("title","title"), ("homepage","info"), ("playing","annotation"), ("description","annotation")]: + if row.get(attr): + txt += " <"+tag+">" + xmlentities(row[attr]) + "\n" + txt += " \n" + for u in stream_urls: + txt += ' ' + xmlentities(u) + '' + "\n" + txt += " \n\n" + + # JSPF + elif "jspf" in format: + pass + + # JSON + elif "json" in format: + row["stream_urls"] = stream_urls + txt = str(row) # pseudo-json (python format) + + # ASX + elif "asx" in format: + txt = "\n" \ + + " " + xmlentities(row["title"]) + "\n" \ + + " \n" \ + + " " + xmlentities(row["title"]) + "\n" \ + + " \n" \ + + " \n" \ + + " \n\n" + + # SMIL + elif "smil" in format: + txt = "\n\n \n\n" \ + + "\n \n \n\n\n" + + # unknown + else: + return + + # write + if txt: + f = open(fn, "wb") + f.write(txt) + f.close() + pass + + + # replaces instances of %u, %l, %pls with urls / %g, %f, %s, %m, %m3u or local filenames + @staticmethod + def interpol(cmd, url, row={}): + # inject other meta fields + if row: + for field in row: + cmd = cmd.replace("%"+field, "%r" % row.get(field)) + # add default if cmd has no %url placeholder + if cmd.find("%") < 0: + cmd = cmd + " %m3u" + # standard placeholders + if (re.search("%(url|pls|[ulr])", cmd)): + cmd = re.sub("%(url|pls|[ulr])", action.quote(url), cmd) + if (re.search("%(m3u|[fgm])", cmd)): + cmd = re.sub("%(m3u|[fgm])", action.quote(action.m3u(url)), cmd) + if (re.search("%(srv|[ds])", cmd)): + cmd = re.sub("%(srv|[ds])", action.quote(action.srv(url)), cmd) + return cmd + + + # eventually transforms internal URN/IRI to URL + @staticmethod + def url(url, listformat): + if (listformat == "audio/x-scpls"): + url = url + elif (listformat == "text/x-urn-streamtuner2-script"): + url = main.special.stream_url(url) + else: + url = url + return url + + + # download a .pls resource and extract urls + @staticmethod + def pls(url): + text = http.get(url) + __print__( dbg.DATA, "pls_text=", text ) + return re.findall("\s*File\d*\s*=\s*(\w+://[^\s]+)", text, re.I) + # currently misses out on the titles + + # get a single direct ICY stream url (extract either from PLS or M3U) + @staticmethod + def srv(url): + return action.extract_urls(url)[0] + + + # retrieve real stream urls from .pls or .m3u links + @staticmethod + def extract_urls(pls, listformat="__not_used_yet__"): + # extract stream address from .pls URL + if (re.search("\.pls", pls)): #audio/x-scpls + return action.pls(pls) + elif (re.search("\.asx", pls)): #video/x-ms-asf + return re.findall(" 3 and stream_id != "XXXXXX") + + # check if there are any urls in a given file + @staticmethod + def has_urls(tmp_fn): + if os.path.exists(tmp_fn): + return open(tmp_fn, "r").read().find("http://") > 0 + + + # create a local .m3u file from it + @staticmethod + def m3u(pls): + + # temp filename + (tmp_fn, unique) = action.tmp_fn(pls) + # does it already exist? + if tmp_fn and unique and conf.reuse_m3u and action.has_urls(tmp_fn): + return tmp_fn + + # download PLS + __print__( dbg.DATA, "pls=",pls ) + url_list = action.extract_urls(pls) + __print__( dbg.DATA, "urls=", url_list ) + + # output URL list to temporary .m3u file + if (len(url_list)): + #tmp_fn = + f = open(tmp_fn, "w") + f.write("#M3U\n") + f.write("\n".join(url_list) + "\n") + f.close() + # return path/name of temporary file + return tmp_fn + else: + __print__( dbg.ERR, "error, there were no URLs in ", pls ) + raise "Empty PLS" + + + # open help browser + @staticmethod + def help(*args): + action.run("yelp /usr/share/doc/streamtuner2/help/") Index: channels/bookmarks.py ================================================================== --- channels/bookmarks.py +++ channels/bookmarks.py @@ -118,11 +118,10 @@ self.urls.append(row["url"]) # simplified gtk TreeStore display logic (just one category for the moment, always rebuilt) def load(self, category, force=False): - __print__(dbg.UI, category, self.streams.keys()) self.streams[category] = self.update_streams(category) #self.liststore[category] = \ uikit.columns(self.gtk_list, self.datamap, self.prepare(self.streams[category]))