@@ -13,295 +13,353 @@
# audio playlist format. It's audio/x-scpls mostly, seldomly m3u,
# but sometimes url/direct if the entry[url] directly leads to the
# streaming server.
#
# As fallback there is a regex which just looks for URLs in the
-# given resource (works for m3u/pls/xspf/asx/...). There is no
-# actual url "filename" extension guessing.
+# given resource (works for m3u/pls/xspf/asx/...).
import re
import os
-import ahttp as http
-from config import conf, __print__, dbg
+from ahttp import fix_url as http_fix_url, session
+from config import conf, __print__ as debug, dbg
import platform
-# coupling to main window
+# Coupling to main window
+#
main = None
-#-- media actions
-#
-# implements "play" and "record" methods,
-# but also "browser" for web URLs
-#
-class action:
-
- # streamlink map
- lt = dict(
- asx = "video/x-ms-asf",
- pls = "audio/x-scpls",
- m3u = "audio/x-mpegurl",
- xspf = "application/xspf+xml",
- href = "url/http",
- src = "url/direct",
- ram = "audio/x-pn-realaudio",
- smil = "application/smil",
- )
- # media map
- mf = dict(
- mp3 = "audio/mpeg",
- ogg = "audio/ogg",
- aac = "audio/aac",
- )
-
-
- # web
- @staticmethod
- def browser(url):
- bin = conf.play.get("url/http", "sensible-browser")
- __print__( dbg.CONF, bin )
- action.run(bin + " " + action.quote(url))
-
-
-
- # os shell cmd escaping
- @staticmethod
- def quote(s):
- if conf.windows:
- return str(s) # should actually be "\\\"%s\\\"" % s
- else:
- return "%r" % str(s)
-
-
- # calls player for stream url and format
- @staticmethod
- def play(url, audioformat="audio/mpeg", listformat="text/x-href"):
- if (url):
- url = action.url(url, listformat)
- if audioformat == "audio/mp3":
- audioformat = "audio/mpeg"
- cmd = action.mime_match(audioformat, conf.play)
- try:
- __print__( dbg.PROC, "play", url, cmd )
- action.run( action.interpol(cmd, url) )
- except:
- pass
-
-
- # exec wrapper
- @staticmethod
- def run(cmd):
- if conf.windows:
- os.system("start \"%s\"")
- else:
- os.system(cmd + " &")
-
-
- # streamripper
- @staticmethod
- def record(url, audioformat="audio/mpeg", listformat="text/x-href", append="", row={}):
- __print__( dbg.PROC, "record", url )
- cmd = action.mime_match(audioformat, conf.record)
- try: action.run( action.interpol(cmd, url, row) + append )
- except: pass
-
-
- # Convert MIME type into list of ["audio/xyz", "audio/*", "*/*"] for comparison against record/play association
- @staticmethod
- def mime_match(fmt, cmd_list):
- for match in [ fmt, fmt[:fmt.find("/")] + "/*", "*/*" ]:
- if cmd_list.get(match, None):
- return cmd_list[match]
-
-
- # save as .m3u
- @staticmethod
- def save(row, fn, listformat="audio/x-scpls"):
-
- # output format
- format = re.findall("\.(m3u|pls|xspf|jspf|json|asx|smil)", fn)
-
- # modify stream url
- row["url"] = action.url(row["url"], listformat)
- stream_urls = action.extract_urls(row["url"], listformat)
-
- # M3U
- if "m3u" in format:
- txt = "#M3U\n"
- for url in stream_urls:
- txt += http.fix_url(url) + "\n"
-
- # PLS
- elif "pls" in format:
- txt = "[playlist]\n" + "numberofentries=1\n"
- for i,u in enumerate(stream_urls):
- i = str(i + 1)
- txt += "File"+i + "=" + u + "\n"
- txt += "Title"+i + "=" + row["title"] + "\n"
- txt += "Length"+i + "=-1\n"
- txt += "Version=2\n"
-
- # XSPF
- elif "xspf" in format:
- txt = '' + "\n"
- txt += '' + "\n"
- txt += '' + "\n"
- for attr,tag in [("title","title"), ("homepage","info"), ("playing","annotation"), ("description","annotation")]:
- if row.get(attr):
- txt += " <"+tag+">" + xmlentities(row[attr]) + ""+tag+">\n"
- txt += " \n"
- for u in stream_urls:
- txt += ' ' + "\n"
- txt += " \n\n"
-
- # JSPF
- elif "jspf" in format:
- pass
-
- # JSON
- elif "json" in format:
- row["stream_urls"] = stream_urls
- txt = str(row) # pseudo-json (python format)
-
- # ASX
- elif "asx" in format:
- txt = "\n" \
- + " " + xmlentities(row["title"]) + "\n" \
- + " \n" \
- + " " + xmlentities(row["title"]) + "\n" \
- + " \n" \
- + " \n" \
- + " \n\n"
-
- # SMIL
- elif "smil" in format:
- txt = "\n\n \n\n" \
- + "\n \n \n \n\n\n"
-
- # unknown
- else:
- return
-
- # write
- if txt:
- f = open(fn, "wb")
- f.write(txt)
- f.close()
- pass
-
-
- # replaces instances of %u, %l, %pls with urls / %g, %f, %s, %m, %m3u or local filenames
- @staticmethod
- def interpol(cmd, url, row={}):
- # inject other meta fields
- if row:
- for field in row:
- cmd = cmd.replace("%"+field, "%r" % row.get(field))
- # add default if cmd has no %url placeholder
- if cmd.find("%") < 0:
- cmd = cmd + " %m3u"
- # standard placeholders
- if (re.search("%(url|pls|[ulr])", cmd)):
- cmd = re.sub("%(url|pls|[ulr])", action.quote(url), cmd)
- if (re.search("%(m3u|[fgm])", cmd)):
- cmd = re.sub("%(m3u|[fgm])", action.quote(action.m3u(url)), cmd)
- if (re.search("%(srv|[ds])", cmd)):
- cmd = re.sub("%(srv|[ds])", action.quote(action.srv(url)), cmd)
- return cmd
-
-
- # eventually transforms internal URN/IRI to URL
- @staticmethod
- def url(url, listformat):
- if (listformat == "audio/x-scpls"):
- url = url
- elif (listformat == "text/x-urn-streamtuner2-script"):
- url = main.special.stream_url(url)
- else:
- url = url
- return url
-
-
- # download a .pls resource and extract urls
- @staticmethod
- def pls(url):
- text = http.get(url)
- __print__( dbg.DATA, "pls_text=", text )
- return re.findall("\s*File\d*\s*=\s*(\w+://[^\s]+)", text, re.I)
- # currently misses out on the titles
-
- # get a single direct ICY stream url (extract either from PLS or M3U)
- @staticmethod
- def srv(url):
- return action.extract_urls(url)[0]
-
-
- # retrieve real stream urls from .pls or .m3u links
- @staticmethod
- def extract_urls(pls, listformat="__not_used_yet__"):
- # extract stream address from .pls URL
- if (re.search("\.pls", pls)): #audio/x-scpls
- return action.pls(pls)
- elif (re.search("\.asx", pls)): #video/x-ms-asf
- return re.findall("[ 3 and stream_id != "XXXXXX")
-
- # check if there are any urls in a given file
- @staticmethod
- def has_urls(tmp_fn):
- if os.path.exists(tmp_fn):
- return open(tmp_fn, "r").read().find("http://") > 0
-
-
- # create a local .m3u file from it
- @staticmethod
- def m3u(pls):
-
- # temp filename
- (tmp_fn, unique) = action.tmp_fn(pls)
- # does it already exist?
- if tmp_fn and unique and conf.reuse_m3u and action.has_urls(tmp_fn):
- return tmp_fn
-
- # download PLS
- __print__( dbg.DATA, "pls=",pls )
- url_list = action.extract_urls(pls)
- __print__( dbg.DATA, "urls=", url_list )
-
- # output URL list to temporary .m3u file
- if (len(url_list)):
- #tmp_fn =
- f = open(tmp_fn, "w")
- f.write("#M3U\n")
- f.write("\n".join(url_list) + "\n")
- f.close()
- # return path/name of temporary file
- return tmp_fn
- else:
- __print__( dbg.ERR, "error, there were no URLs in ", pls )
- raise "Empty PLS"
-
-
- # open help browser
- @staticmethod
- def help(*args):
- action.run("yelp /usr/share/doc/streamtuner2/help/")
+
+# Streamlink/listformat mapping
+listfmt_t = {
+ "audio/x-scpls": "pls",
+ "audio/x-mpegurl": "m3u",
+ "video/x-ms-asf": "asx",
+ "application/xspf+xml": "xspf",
+ "*/*": "href",
+ "url/direct": "srv",
+ "url/youtube": "href",
+ "url/http": "href",
+ "audio/x-pn-realaudio": "ram",
+ "application/smil": "smil",
+ "application/vnd.ms-wpl":"smil",
+ "x-urn/st2-script": "script", # unused
+}
+
+# Audio type MIME map
+mediafmt_t = {
+ "audio/mpeg": "mp3",
+ "audio/ogg": "ogg",
+ "audio/aac" : "aac",
+ "audio/aacp" : "aac",
+ "audio/midi": "midi",
+ "audio/mod": "mod",
+ "audio/it+zip": "mod",
+ "audio/s3+zip": "mod",
+ "audio/xm+zip": "mod",
+}
+
+# Player command placeholders for playlist formats
+placeholder_map = dict(
+ pls = "%url | %pls | %u | %l | %r",
+ m3u = "%m3u | %f | %g | %m",
+ srv = "%srv | %d | %s",
+)
+
+# Playlist format content probing (assert type)
+playlist_content_map = {
+ "pls": r""" (?i)\[playlist\].*numberofentries""",
+ "xspf": r""" <\?xml .* ]*> .* """,
+ "wpl": r""" <\?wpl \s+ version="1\.0" \s* \?>""",
+ "jspf": r""" \{ \s* "playlist": \s* \{ """,
+ "json": r""" "url": \s* "\w+:// """,
+ "href": r""" .* """,
+}
+
+
+
+# Exec wrapper
+#
+def run(cmd):
+ if cmd: debug(dbg.PROC, "Exec:", cmd)
+ try: os.system("start \"%s\"" % cmd if conf.windows else cmd + " &")
+ except: debug(dbg.ERR, "Command not found:", cmd)
+
+
+# Start web browser
+#
+def browser(url):
+ bin = conf.play.get("url/http", "sensible-browser")
+ run(bin + " " + quote(url))
+
+
+# Open help browser, streamtuner2 pages
+#
+def help(*args):
+ run("yelp /usr/share/doc/streamtuner2/help/")
+
+
+# Calls player for stream url and format
+#
+def play(url, audioformat="audio/mpeg", listformat="href"):
+ cmd = mime_app(audioformat, conf.play)
+ cmd = interpol(cmd, url, listformat)
+ run(cmd)
+
+
+# Call streamripper
+#
+def record(url, audioformat="audio/mpeg", listformat="href", append="", row={}):
+ cmd = mime_app(audioformat, conf.record)
+ cmd = interpol(cmd, url, listformat, row)
+ run(cmd)
+
+
+# OS shell command escaping
+#
+def quote(ins):
+ if type(ins) is str:
+ return "%r" % str(ins)
+ else:
+ return " ".join(["%r" % str(s) for s in ins])
+
+
+# Convert e.g. "text/x-scpls" MIME types to just "pls" monikers
+#
+def listfmt(t = "pls"):
+ if t in listfmt_t.values():
+ for short,mime in listfmt_t.items():
+ if mime == t:
+ return short
+ return t # "pls"
+
+
+# Convert MIME type into list of ["audio/xyz", "audio/*", "*/*"]
+# for comparison against configured record/play association.
+def mime_app(fmt, cmd_list):
+ major = fmt[:fmt.find("/")]
+ for match in [ fmt, major + "/*", "*/*" ]:
+ if cmd_list.get(match):
+ return cmd_list[match]
+
+
+
+# Replaces instances of %m3u, %pls, %srv in a command string.
+# · Also understands short aliases %l, %f, %d.
+# · And can embed %title or %genre placeholders.
+# · Replace .pls URL with local .m3u file depending on map.
+#
+def interpol(cmd, url, source="pls", row={}):
+
+ # inject other meta fields
+ if row:
+ for field in row:
+ cmd = cmd.replace("%"+field, "%r" % row.get(field))
+
+ # add default if cmd has no %url placeholder
+ if cmd.find("%") < 0:
+ cmd = cmd + " %m3u"
+
+ # standard placeholders
+ for dest, rx in placeholder_map.items():
+ if re.search(rx, cmd, re.X):
+ # from .pls to .m3u
+ urls = convert_playlist(url, listfmt(source), listfmt(dest))
+ # insert quoted URL/filepath
+ return re.sub(rx, cmd, quote(urls), 2, re.X)
+
+ return "false"
+
+
+# Substitute .pls URL with local .m3u,
+# or direct srv address, or leave as-is.
+#
+def convert_playlist(url, source, dest):
+ urls = []
+
+ print(dbg.PROC, "convert_playlist(", url, source, dest, ")")
+
+ # Leave alone
+ is_url = re.search("\w+://", url)
+ if source == dest or source in ("srv", "href") or not is_url:
+ return [url]
+
+ # Retrieve from URL
+ (mime, cnt) = http_probe_get(url)
+
+ # Leave streaming server as is
+ if mime == "srv":
+ cnt = ""
+ return [url]
+
+ # Test URL path "extension" for ".pls" / ".m3u" etc.
+ ext = re.findall("\.(\w)$|($)", url)[0]
+
+ # Probe MIME type and content per regex
+ probe = None
+ for probe,rx in playlist_content_map.items():
+ if re.search(rx, cnt, re.X|re.S):
+ break # with `probe` set
+
+ # Check ambiguity (except pseudo extension)
+ if len(set([source, mime, probe])) > 1:
+ print(dbg.ERR, "Possible playlist format mismatch:", (source, mime, probe, ext))
+
+ # Extract URLs from content
+ for fmt,extractor in [ ("pls",extract_playlist.pls), ("asx",extract_playlist.asx), ("raw",extract_playlist.raw) ]:
+ if not urls and fmt in (source, mime, probe, ext):
+ urls = extractor(cnt)
+
+ # Return asis for srv targets
+ if dest in ("srv", "href", "any"):
+ return urls
+ print urls
+
+ # Otherwise convert to local file
+ fn = tmp_fn(cnt)
+ save(urls[0], fn, dest)
+ return [fn]
+
+
+
+# Tries to fetch a resource, aborts on ICY responses.
+#
+def http_probe_get(url):
+
+ # possible streaming request
+ r = session.get(url, stream=True)
+ if not len(r.headers):
+ return ("srv", r)
+
+ # extract payload
+ mime = r.headers.get("content-type", "any")
+ if mediafmt_t.get(mime):
+ mime = mediafmt_t.get(mime)
+ content = "".join(r.iter_lines())
+ return (mime, content)
+
+
+
+# Extract URLs from playlist formats:
+#
+class extract_playlist(object):
+
+ @staticmethod
+ def pls(text):
+ return re.findall("\s*File\d*\s*=\s*(\w+://[^\s]+)", text, re.I)
+
+ @staticmethod
+ def asx(text):
+ return re.findall("][' + "\n"
+ txt += '' + "\n"
+ for attr,tag in [("title","title"), ("homepage","info"), ("playing","annotation"), ("description","annotation")]:
+ if row.get(attr):
+ txt += " <"+tag+">" + xmlentities(row[attr]) + ""+tag+">\n"
+ txt += " \n"
+ for u in stream_urls:
+ txt += ' ' + "\n"
+ txt += " \n\n"
+
+ # JSPF
+ elif "jspf" in format:
+ pass
+
+ # JSON
+ elif "json" in format:
+ row["stream_urls"] = stream_urls
+ txt = str(row) # pseudo-json (python format)
+
+ # ASX
+ elif "asx" in format:
+ txt = "\n" \
+ + " " + xmlentities(row["title"]) + "\n" \
+ + " \n" \
+ + " " + xmlentities(row["title"]) + "\n" \
+ + " \n" \
+ + " ]\n" \
+ + " \n\n"
+
+ # SMIL
+ elif "smil" in format:
+ txt = "\n\n \n\n" \
+ + "\n \n \n \n\n\n"
+
+ # unknown
+ else:
+ return
+
+ # write
+ if txt:
+ with open(fn, "wb") as f:
+ f.write(txt)
+ pass
+
+
+
+# generate filename for temporary .m3u, if possible with unique id
+def tmp_fn(pls):
+ # use shoutcast unique stream id if available
+ stream_id = re.search("http://.+?/.*?(\d+)", pls, re.M)
+ stream_id = stream_id and stream_id.group(1) or "XXXXXX"
+ try:
+ channelname = main.current_channel
+ except:
+ channelname = "unknown"
+ return (str(conf.tmp) + os.sep + "streamtuner2."+channelname+"."+stream_id+".m3u", len(stream_id) > 3 and stream_id != "XXXXXX")
+
+
+# check if there are any urls in a given file
+def has_urls(tmp_fn):
+ if os.path.exists(tmp_fn):
+ return open(tmp_fn, "r").read().find("http://") > 0
+