Index: action.py
==================================================================
--- action.py
+++ action.py
@@ -2,11 +2,11 @@
# encoding: UTF-8
# api: streamtuner2
# type: functions
# title: play/record actions
# description: Starts audio applications, guesses MIME types for URLs
-# version: 0.7
+# version: 0.8
#
# Multimedia interface for starting audio players, recording app,
# or web browser (listed as "url/http" association in players).
#
# Each channel plugin has a .listtype which describes the linked
@@ -24,258 +24,284 @@
import ahttp as http
from config import conf, __print__, dbg
import platform
+# coupling to main window
main = None
-#-- media actions ---------------------------------------------
+#-- media actions
#
# implements "play" and "record" methods,
# but also "browser" for web URLs
#
class action:
- # streamlink formats
- lt = {"asx":"video/x-ms-asf", "pls":"audio/x-scpls", "m3u":"audio/x-mpegurl", "xspf":"application/xspf+xml", "href":"url/http", "ram":"audio/x-pn-realaudio", "smil":"application/smil"}
- # media formats
- mf = {"mp3":"audio/mpeg", "ogg":"audio/ogg", "aac":"audio/aac"}
-
-
- # web
- @staticmethod
- def browser(url):
- bin = conf.play.get("url/http", "sensible-browser")
- __print__( dbg.CONF, bin )
- action.run(bin + " " + action.quote(url))
-
-
-
- # os shell cmd escaping
- @staticmethod
- def quote(s):
- if conf.windows:
- return str(s) # should actually be "\\\"%s\\\"" % s
- else:
- return "%r" % str(s)
-
-
- # calls player for stream url and format
- @staticmethod
- def play(url, audioformat="audio/mpeg", listformat="text/x-href"):
- if (url):
- url = action.url(url, listformat)
- if audioformat == "audio/mp3":
- audioformat = "audio/mpeg"
- cmd = action.mime_match(audioformat, conf.play)
- try:
- __print__( dbg.PROC, "play", url, cmd )
- action.run( action.interpol(cmd, url) )
- except:
- pass
-
-
- # exec wrapper
- @staticmethod
- def run(cmd):
- if conf.windows:
- os.system("start \"%s\"")
- else:
- os.system(cmd + " &")
-
-
- # streamripper
- @staticmethod
- def record(url, audioformat="audio/mpeg", listformat="text/x-href", append="", row={}):
- __print__( dbg.PROC, "record", url )
- cmd = action.mime_match(audioformat, conf.record)
- try: action.run( action.interpol(cmd, url, row) + append )
- except: pass
-
-
- # Convert MIME type into list of ["audio/xyz", "audio/*", "*/*"] for comparison against record/play association
- @staticmethod
- def mime_match(fmt, cmd_list):
- for match in [ fmt, fmt[:fmt.find("/")] + "/*", "*/*" ]:
- if cmd_list.get(match, None):
- return cmd_list[match]
-
-
- # save as .m3u
- @staticmethod
- def save(row, fn, listformat="audio/x-scpls"):
- # modify stream url
- row["url"] = action.url(row["url"], listformat)
- stream_urls = action.extract_urls(row["url"], listformat)
- # output format
- if (re.search("\.m3u", fn)):
- txt = "#M3U\n"
- for url in stream_urls:
- txt += http.fix_url(url) + "\n"
- # output format
- elif (re.search("\.pls", fn)):
- txt = "[playlist]\n" + "numberofentries=1\n"
- for i,u in enumerate(stream_urls):
- i = str(i + 1)
- txt += "File"+i + "=" + u + "\n"
- txt += "Title"+i + "=" + row["title"] + "\n"
- txt += "Length"+i + "=-1\n"
- txt += "Version=2\n"
- # output format
- elif (re.search("\.xspf", fn)):
- txt = '' + "\n"
- txt += '' + "\n"
- txt += '' + "\n"
- for attr,tag in [("title","title"), ("homepage","info"), ("playing","annotation"), ("description","annotation")]:
- if row.get(attr):
- txt += " <"+tag+">" + xmlentities(row[attr]) + ""+tag+">\n"
- txt += " \n"
- for u in stream_urls:
- txt += ' ' + "\n"
- txt += " \n\n"
- # output format
- elif (re.search("\.json", fn)):
- row["stream_urls"] = stream_urls
- txt = str(row) # pseudo-json (python format)
- # output format
- elif (re.search("\.asx", fn)):
- txt = "\n" \
- + " " + xmlentities(row["title"]) + "\n" \
- + " \n" \
- + " " + xmlentities(row["title"]) + "\n" \
- + " \n" \
- + " \n" \
- + " \n\n"
- # output format
- elif (re.search("\.smil", fn)):
- txt = "\n\n \n\n" \
- + "\n \n \n \n\n\n"
- # unknown
- else:
- txt = ""
- # write
- if txt:
- f = open(fn, "wb")
- f.write(txt)
- f.close()
- pass
-
-
- # replaces instances of %u, %l, %pls with urls / %g, %f, %s, %m, %m3u or local filenames
- @staticmethod
- def interpol(cmd, url, row={}):
- # inject other meta fields
- if row:
- for field in row:
- cmd = cmd.replace("%"+field, "%r" % row.get(field))
- # add default if cmd has no %url placeholder
- if cmd.find("%") < 0:
- cmd = cmd + " %m3u"
- # standard placeholders
- if (re.search("%(url|pls|[ulr])", cmd)):
- cmd = re.sub("%(url|pls|[ulr])", action.quote(url), cmd)
- if (re.search("%(m3u|[fgm])", cmd)):
- cmd = re.sub("%(m3u|[fgm])", action.quote(action.m3u(url)), cmd)
- if (re.search("%(srv|[ds])", cmd)):
- cmd = re.sub("%(srv|[ds])", action.quote(action.srv(url)), cmd)
- return cmd
-
-
- # eventually transforms internal URN/IRI to URL
- @staticmethod
- def url(url, listformat):
- if (listformat == "audio/x-scpls"):
- url = url
- elif (listformat == "text/x-urn-streamtuner2-script"):
- url = main.special.stream_url(url)
- else:
- url = url
- return url
-
-
- # download a .pls resource and extract urls
- @staticmethod
- def pls(url):
- text = http.get(url)
- __print__( dbg.DATA, "pls_text=", text )
- return re.findall("\s*File\d*\s*=\s*(\w+://[^\s]+)", text, re.I)
- # currently misses out on the titles
-
- # get a single direct ICY stream url (extract either from PLS or M3U)
- @staticmethod
- def srv(url):
- return action.extract_urls(url)[0]
-
-
- # retrieve real stream urls from .pls or .m3u links
- @staticmethod
- def extract_urls(pls, listformat="__not_used_yet__"):
- # extract stream address from .pls URL
- if (re.search("\.pls", pls)): #audio/x-scpls
- return action.pls(pls)
- elif (re.search("\.asx", pls)): #video/x-ms-asf
- return re.findall("[ 3 and stream_id != "XXXXXX")
-
- # check if there are any urls in a given file
- @staticmethod
- def has_urls(tmp_fn):
- if os.path.exists(tmp_fn):
- return open(tmp_fn, "r").read().find("http://") > 0
-
-
- # create a local .m3u file from it
- @staticmethod
- def m3u(pls):
-
- # temp filename
- (tmp_fn, unique) = action.tmp_fn(pls)
- # does it already exist?
- if tmp_fn and unique and conf.reuse_m3u and action.has_urls(tmp_fn):
- return tmp_fn
-
- # download PLS
- __print__( dbg.DATA, "pls=",pls )
- url_list = action.extract_urls(pls)
- __print__( dbg.DATA, "urls=", url_list )
-
- # output URL list to temporary .m3u file
- if (len(url_list)):
- #tmp_fn =
- f = open(tmp_fn, "w")
- f.write("#M3U\n")
- f.write("\n".join(url_list) + "\n")
- f.close()
- # return path/name of temporary file
- return tmp_fn
- else:
- __print__( dbg.ERR, "error, there were no URLs in ", pls )
- raise "Empty PLS"
-
- # open help browser
- @staticmethod
- def help(*args):
-
- action.run("yelp /usr/share/doc/streamtuner2/help/")
- #or action.browser("/usr/share/doc/streamtuner2/")
-
-#class action
-
+ # streamlink map
+ lt = dict(
+ asx = "video/x-ms-asf",
+ pls = "audio/x-scpls",
+ m3u = "audio/x-mpegurl",
+ xspf = "application/xspf+xml",
+ href = "url/http",
+ src = "url/direct",
+ ram = "audio/x-pn-realaudio",
+ smil = "application/smil",
+ )
+ # media map
+ mf = dict(
+ mp3 = "audio/mpeg",
+ ogg = "audio/ogg",
+ aac = "audio/aac",
+ )
+
+
+ # web
+ @staticmethod
+ def browser(url):
+ bin = conf.play.get("url/http", "sensible-browser")
+ __print__( dbg.CONF, bin )
+ action.run(bin + " " + action.quote(url))
+
+
+
+ # os shell cmd escaping
+ @staticmethod
+ def quote(s):
+ if conf.windows:
+ return str(s) # should actually be "\\\"%s\\\"" % s
+ else:
+ return "%r" % str(s)
+
+
+ # calls player for stream url and format
+ @staticmethod
+ def play(url, audioformat="audio/mpeg", listformat="text/x-href"):
+ if (url):
+ url = action.url(url, listformat)
+ if audioformat == "audio/mp3":
+ audioformat = "audio/mpeg"
+ cmd = action.mime_match(audioformat, conf.play)
+ try:
+ __print__( dbg.PROC, "play", url, cmd )
+ action.run( action.interpol(cmd, url) )
+ except:
+ pass
+
+
+ # exec wrapper
+ @staticmethod
+ def run(cmd):
+ if conf.windows:
+ os.system("start \"%s\"")
+ else:
+ os.system(cmd + " &")
+
+
+ # streamripper
+ @staticmethod
+ def record(url, audioformat="audio/mpeg", listformat="text/x-href", append="", row={}):
+ __print__( dbg.PROC, "record", url )
+ cmd = action.mime_match(audioformat, conf.record)
+ try: action.run( action.interpol(cmd, url, row) + append )
+ except: pass
+
+
+ # Convert MIME type into list of ["audio/xyz", "audio/*", "*/*"] for comparison against record/play association
+ @staticmethod
+ def mime_match(fmt, cmd_list):
+ for match in [ fmt, fmt[:fmt.find("/")] + "/*", "*/*" ]:
+ if cmd_list.get(match, None):
+ return cmd_list[match]
+
+
+ # save as .m3u
+ @staticmethod
+ def save(row, fn, listformat="audio/x-scpls"):
+
+ # output format
+ format = re.findall("\.(m3u|pls|xspf|jspf|json|asx|smil)", fn)
+
+ # modify stream url
+ row["url"] = action.url(row["url"], listformat)
+ stream_urls = action.extract_urls(row["url"], listformat)
+
+ # M3U
+ if "m3u" in format:
+ txt = "#M3U\n"
+ for url in stream_urls:
+ txt += http.fix_url(url) + "\n"
+
+ # PLS
+ elif "pls" in format:
+ txt = "[playlist]\n" + "numberofentries=1\n"
+ for i,u in enumerate(stream_urls):
+ i = str(i + 1)
+ txt += "File"+i + "=" + u + "\n"
+ txt += "Title"+i + "=" + row["title"] + "\n"
+ txt += "Length"+i + "=-1\n"
+ txt += "Version=2\n"
+
+ # XSPF
+ elif "xspf" in format:
+ txt = '' + "\n"
+ txt += '' + "\n"
+ txt += '' + "\n"
+ for attr,tag in [("title","title"), ("homepage","info"), ("playing","annotation"), ("description","annotation")]:
+ if row.get(attr):
+ txt += " <"+tag+">" + xmlentities(row[attr]) + ""+tag+">\n"
+ txt += " \n"
+ for u in stream_urls:
+ txt += ' ' + "\n"
+ txt += " \n\n"
+
+ # JSPF
+ elif "jspf" in format:
+ pass
+
+ # JSON
+ elif "json" in format:
+ row["stream_urls"] = stream_urls
+ txt = str(row) # pseudo-json (python format)
+
+ # ASX
+ elif "asx" in format:
+ txt = "\n" \
+ + " " + xmlentities(row["title"]) + "\n" \
+ + " \n" \
+ + " " + xmlentities(row["title"]) + "\n" \
+ + " \n" \
+ + " ]\n" \
+ + " \n\n"
+
+ # SMIL
+ elif "smil" in format:
+ txt = "\n\n \n\n" \
+ + "\n \n \n \n\n\n"
+
+ # unknown
+ else:
+ return
+
+ # write
+ if txt:
+ f = open(fn, "wb")
+ f.write(txt)
+ f.close()
+ pass
+
+
+ # replaces instances of %u, %l, %pls with urls / %g, %f, %s, %m, %m3u or local filenames
+ @staticmethod
+ def interpol(cmd, url, row={}):
+ # inject other meta fields
+ if row:
+ for field in row:
+ cmd = cmd.replace("%"+field, "%r" % row.get(field))
+ # add default if cmd has no %url placeholder
+ if cmd.find("%") < 0:
+ cmd = cmd + " %m3u"
+ # standard placeholders
+ if (re.search("%(url|pls|[ulr])", cmd)):
+ cmd = re.sub("%(url|pls|[ulr])", action.quote(url), cmd)
+ if (re.search("%(m3u|[fgm])", cmd)):
+ cmd = re.sub("%(m3u|[fgm])", action.quote(action.m3u(url)), cmd)
+ if (re.search("%(srv|[ds])", cmd)):
+ cmd = re.sub("%(srv|[ds])", action.quote(action.srv(url)), cmd)
+ return cmd
+
+
+ # eventually transforms internal URN/IRI to URL
+ @staticmethod
+ def url(url, listformat):
+ if (listformat == "audio/x-scpls"):
+ url = url
+ elif (listformat == "text/x-urn-streamtuner2-script"):
+ url = main.special.stream_url(url)
+ else:
+ url = url
+ return url
+
+
+ # download a .pls resource and extract urls
+ @staticmethod
+ def pls(url):
+ text = http.get(url)
+ __print__( dbg.DATA, "pls_text=", text )
+ return re.findall("\s*File\d*\s*=\s*(\w+://[^\s]+)", text, re.I)
+ # currently misses out on the titles
+
+ # get a single direct ICY stream url (extract either from PLS or M3U)
+ @staticmethod
+ def srv(url):
+ return action.extract_urls(url)[0]
+
+
+ # retrieve real stream urls from .pls or .m3u links
+ @staticmethod
+ def extract_urls(pls, listformat="__not_used_yet__"):
+ # extract stream address from .pls URL
+ if (re.search("\.pls", pls)): #audio/x-scpls
+ return action.pls(pls)
+ elif (re.search("\.asx", pls)): #video/x-ms-asf
+ return re.findall("[ 3 and stream_id != "XXXXXX")
+
+ # check if there are any urls in a given file
+ @staticmethod
+ def has_urls(tmp_fn):
+ if os.path.exists(tmp_fn):
+ return open(tmp_fn, "r").read().find("http://") > 0
+
+
+ # create a local .m3u file from it
+ @staticmethod
+ def m3u(pls):
+
+ # temp filename
+ (tmp_fn, unique) = action.tmp_fn(pls)
+ # does it already exist?
+ if tmp_fn and unique and conf.reuse_m3u and action.has_urls(tmp_fn):
+ return tmp_fn
+
+ # download PLS
+ __print__( dbg.DATA, "pls=",pls )
+ url_list = action.extract_urls(pls)
+ __print__( dbg.DATA, "urls=", url_list )
+
+ # output URL list to temporary .m3u file
+ if (len(url_list)):
+ #tmp_fn =
+ f = open(tmp_fn, "w")
+ f.write("#M3U\n")
+ f.write("\n".join(url_list) + "\n")
+ f.close()
+ # return path/name of temporary file
+ return tmp_fn
+ else:
+ __print__( dbg.ERR, "error, there were no URLs in ", pls )
+ raise "Empty PLS"
+
+
+ # open help browser
+ @staticmethod
+ def help(*args):
+ action.run("yelp /usr/share/doc/streamtuner2/help/")
Index: channels/bookmarks.py
==================================================================
--- channels/bookmarks.py
+++ channels/bookmarks.py
@@ -118,11 +118,10 @@
self.urls.append(row["url"])
# simplified gtk TreeStore display logic (just one category for the moment, always rebuilt)
def load(self, category, force=False):
- __print__(dbg.UI, category, self.streams.keys())
self.streams[category] = self.update_streams(category)
#self.liststore[category] = \
uikit.columns(self.gtk_list, self.datamap, self.prepare(self.streams[category]))
]