# encoding: UTF-8
# api: streamtuner2
# type: functions
# title: play/record actions
# description: Starts audio applications, guesses MIME types for URLs
# version: 0.8
#
# Multimedia interface for starting audio players, recording app,
# or web browser (listed as "url/http" association in players).
#
# Each channel plugin has a .listtype which describes the linked
# audio playlist format. It's audio/x-scpls mostly, seldomly m3u,
# but sometimes url/direct if the entry[url] directly leads to the
# streaming server.
#
# As fallback there is a regex which just looks for URLs in the
# given resource (works for m3u/pls/xspf/asx/...). There is no
# actual url "filename" extension guessing.
import re
import os
import ahttp as http
from config import conf, __print__, dbg
import platform
# coupling to main window
main = None
#-- media actions
#
# implements "play" and "record" methods,
# but also "browser" for web URLs
#
class action:
# streamlink map
lt = dict(
asx = "video/x-ms-asf",
pls = "audio/x-scpls",
m3u = "audio/x-mpegurl",
xspf = "application/xspf+xml",
href = "url/http",
src = "url/direct",
ram = "audio/x-pn-realaudio",
smil = "application/smil",
)
# media map
mf = dict(
mp3 = "audio/mpeg",
ogg = "audio/ogg",
aac = "audio/aac",
)
# web
@staticmethod
def browser(url):
bin = conf.play.get("url/http", "sensible-browser")
__print__( dbg.CONF, bin )
action.run(bin + " " + action.quote(url))
# os shell cmd escaping
@staticmethod
def quote(s):
if conf.windows:
return str(s) # should actually be "\\\"%s\\\"" % s
else:
return "%r" % str(s)
# calls player for stream url and format
@staticmethod
def play(url, audioformat="audio/mpeg", listformat="text/x-href"):
if (url):
url = action.url(url, listformat)
if audioformat == "audio/mp3":
audioformat = "audio/mpeg"
cmd = action.mime_match(audioformat, conf.play)
try:
__print__( dbg.PROC, "play", url, cmd )
action.run( action.interpol(cmd, url) )
except:
pass
# exec wrapper
@staticmethod
def run(cmd):
if conf.windows:
os.system("start \"%s\"")
else:
os.system(cmd + " &")
# streamripper
@staticmethod
def record(url, audioformat="audio/mpeg", listformat="text/x-href", append="", row={}):
__print__( dbg.PROC, "record", url )
cmd = action.mime_match(audioformat, conf.record)
try: action.run( action.interpol(cmd, url, row) + append )
except: pass
# Convert MIME type into list of ["audio/xyz", "audio/*", "*/*"] for comparison against record/play association
@staticmethod
def mime_match(fmt, cmd_list):
for match in [ fmt, fmt[:fmt.find("/")] + "/*", "*/*" ]:
if cmd_list.get(match, None):
return cmd_list[match]
# save as .m3u
@staticmethod
def save(row, fn, listformat="audio/x-scpls"):
# output format
format = re.findall("\.(m3u|pls|xspf|jspf|json|asx|smil)", fn)
# modify stream url
row["url"] = action.url(row["url"], listformat)
stream_urls = action.extract_urls(row["url"], listformat)
# M3U
if "m3u" in format:
txt = "#M3U\n"
for url in stream_urls:
txt += http.fix_url(url) + "\n"
# PLS
elif "pls" in format:
txt = "[playlist]\n" + "numberofentries=1\n"
for i,u in enumerate(stream_urls):
i = str(i + 1)
txt += "File"+i + "=" + u + "\n"
txt += "Title"+i + "=" + row["title"] + "\n"
txt += "Length"+i + "=-1\n"
txt += "Version=2\n"
# XSPF
elif "xspf" in format:
txt = '<?xml version="1.0" encoding="UTF-8"?>' + "\n"
txt += '<?http header="Content-Type: application/xspf+xml" ?>' + "\n"
txt += '<playlist version="1" xmlns="http://xspf.org/ns/0/">' + "\n"
for attr,tag in [("title","title"), ("homepage","info"), ("playing","annotation"), ("description","annotation")]:
if row.get(attr):
txt += " <"+tag+">" + xmlentities(row[attr]) + "</"+tag+">\n"
txt += " <trackList>\n"
for u in stream_urls:
txt += ' <track><location>' + xmlentities(u) + '</location></track>' + "\n"
txt += " </trackList>\n</playlist>\n"
# JSPF
elif "jspf" in format:
pass
# JSON
elif "json" in format:
row["stream_urls"] = stream_urls
txt = str(row) # pseudo-json (python format)
# ASX
elif "asx" in format:
txt = "<ASX version=\"3.0\">\n" \
+ " <Title>" + xmlentities(row["title"]) + "</Title>\n" \
+ " <Entry>\n" \
+ " <Title>" + xmlentities(row["title"]) + "</Title>\n" \
+ " <MoreInfo href=\"" + row["homepage"] + "\"/>\n" \
+ " <Ref href=\"" + stream_urls[0] + "\"/>\n" \
+ " </Entry>\n</ASX>\n"
# SMIL
elif "smil" in format:
txt = "<smil>\n<head>\n <meta name=\"title\" content=\"" + xmlentities(row["title"]) + "\"/>\n</head>\n" \
+ "<body>\n <seq>\n <audio src=\"" + stream_urls[0] + "\"/>\n </seq>\n</body>\n</smil>\n"
# unknown
else:
return
# write
if txt:
f = open(fn, "wb")
f.write(txt)
f.close()
pass
# replaces instances of %u, %l, %pls with urls / %g, %f, %s, %m, %m3u or local filenames
@staticmethod
def interpol(cmd, url, row={}):
# inject other meta fields
if row:
for field in row:
cmd = cmd.replace("%"+field, "%r" % row.get(field))
# add default if cmd has no %url placeholder
if cmd.find("%") < 0:
cmd = cmd + " %m3u"
# standard placeholders
if (re.search("%(url|pls|[ulr])", cmd)):
cmd = re.sub("%(url|pls|[ulr])", action.quote(url), cmd)
if (re.search("%(m3u|[fgm])", cmd)):
cmd = re.sub("%(m3u|[fgm])", action.quote(action.m3u(url)), cmd)
if (re.search("%(srv|[ds])", cmd)):
cmd = re.sub("%(srv|[ds])", action.quote(action.srv(url)), cmd)
return cmd
# eventually transforms internal URN/IRI to URL
@staticmethod
def url(url, listformat):
if (listformat == "audio/x-scpls"):
url = url
elif (listformat == "text/x-urn-streamtuner2-script"):
url = main.special.stream_url(url)
else:
url = url
return url
# download a .pls resource and extract urls
@staticmethod
def pls(url):
text = http.get(url)
__print__( dbg.DATA, "pls_text=", text )
return re.findall("\s*File\d*\s*=\s*(\w+://[^\s]+)", text, re.I)
# currently misses out on the titles
# get a single direct ICY stream url (extract either from PLS or M3U)
@staticmethod
def srv(url):
return action.extract_urls(url)[0]
# retrieve real stream urls from .pls or .m3u links
@staticmethod
def extract_urls(pls, listformat="__not_used_yet__"):
# extract stream address from .pls URL
if (re.search("\.pls", pls)): #audio/x-scpls
return action.pls(pls)
elif (re.search("\.asx", pls)): #video/x-ms-asf
return re.findall("<Ref\s+href=\"(http://.+?)\"", http.get(pls))
elif (re.search("\.m3u|\.ram|\.smil", pls)): #audio/x-mpegurl
return re.findall("(http://[^\s]+)", http.get(pls), re.I)
else: # just assume it was a direct mpeg/ogg streamserver link
return [ (pls if pls.startswith("/") else http.fix_url(pls)) ]
pass
# generate filename for temporary .m3u, if possible with unique id
@staticmethod
def tmp_fn(pls):
# use shoutcast unique stream id if available
stream_id = re.search("http://.+?/.*?(\d+)", pls, re.M)
stream_id = stream_id and stream_id.group(1) or "XXXXXX"
try:
channelname = main.current_channel
except:
channelname = "unknown"
return (str(conf.tmp) + os.sep + "streamtuner2."+channelname+"."+stream_id+".m3u", len(stream_id) > 3 and stream_id != "XXXXXX")
# check if there are any urls in a given file
@staticmethod
def has_urls(tmp_fn):
if os.path.exists(tmp_fn):
return open(tmp_fn, "r").read().find("http://") > 0
# create a local .m3u file from it
@staticmethod
def m3u(pls):
# temp filename
(tmp_fn, unique) = action.tmp_fn(pls)
# does it already exist?
if tmp_fn and unique and conf.reuse_m3u and action.has_urls(tmp_fn):
return tmp_fn
# download PLS
__print__( dbg.DATA, "pls=",pls )
url_list = action.extract_urls(pls)
__print__( dbg.DATA, "urls=", url_list )
# output URL list to temporary .m3u file
if (len(url_list)):
#tmp_fn =
f = open(tmp_fn, "w")
f.write("#M3U\n")
f.write("\n".join(url_list) + "\n")
f.close()
# return path/name of temporary file
return tmp_fn
else:
__print__( dbg.ERR, "error, there were no URLs in ", pls )
raise "Empty PLS"
# open help browser
@staticmethod
def help(*args):
action.run("yelp /usr/share/doc/streamtuner2/help/")