Internet radio browser GUI for music/video streams from various directory services.

⌈⌋ ⎇ branch:  streamtuner2


action.py at [ea628d6426]

File action.py artifact 693e44deeb part of check-in ea628d6426



# encoding: UTF-8
# api: streamtuner2
# type: functions
# title: play/record actions
# description: Starts audio applications, guesses MIME types for URLs
# version: 0.8
#
# Multimedia interface for starting audio players, recording app,
# or web browser (listed as "url/http" association in players).
#
# Each channel plugin has a .listtype which describes the linked
# audio playlist format. It's audio/x-scpls mostly, seldomly m3u,
# but sometimes url/direct if the entry[url] directly leads to the
# streaming server.
#
# As fallback there is a regex which just looks for URLs in the
# given resource (works for m3u/pls/xspf/asx/...).


import re
import os
from ahttp import fix_url as http_fix_url, get as http_get
from config import conf, __print__ as debug, dbg
import platform


# Coupling to main window
#
main = None



# Streamlink/listformat mapping
#
lt = dict(
    pls = "audio/x-scpls",
    m3u = "audio/x-mpegurl",
    asx = "video/x-ms-asf",
    xspf = "application/xspf+xml",
    href = "url/http",
    srv = "url/direct",
    ram = "audio/x-pn-realaudio",
    smil = "application/smil",
    script = "text/x-urn-streamtuner2-script", # unused
)


# Audio type MIME map
#
mf = dict(
    mp3 = "audio/mpeg",
    ogg = "audio/ogg",
    aac = "audio/aac",
    midi = "audio/midi",
    mod = "audio/mod",
)

# Player command placeholders for playlist formats
placeholder_map = dict(
    pls = "%url | %pls | %u | %l | %r",
    m3u = "%m3u | %f | %g | %m",
    pls = "%srv | %d | %s",
)



# Exec wrapper
#
def run(cmd):
    if cmd: debug(dbg.PROC, "Exec:", cmd)
    try:    os.system("start \"%s\"" % cmd if conf.windows else cmd + " &")
    except: debug(dbg.ERR, "Command not found:", cmd)


# Start web browser
#
def browser(url):
    bin = conf.play.get("url/http", "sensible-browser")
    run(bin + " " + quote(url))


# Open help browser, streamtuner2 pages
#
def help(*args):
    run("yelp /usr/share/doc/streamtuner2/help/")


# Calls player for stream url and format
#
def play(url, audioformat="audio/mpeg", listformat="href"):
    cmd = mime_app(audioformat, conf.play)
    cmd = interpol(cmd, url, listformat)
    run(cmd)


# Call streamripper
#
def record(url, audioformat="audio/mpeg", listformat="href", append="", row={}):
    cmd = mime_app(audioformat, conf.record)
    cmd = interpol(cmd, url, listformat, row)
    run(cmd)


# OS shell command escaping
#
def quote(s):
    return "%r" % str(s)


# Convert e.g. "text/x-scpls" MIME types to just "pls" monikers
#
def listfmt(t = "pls"):
    if t in lf.values():
       for short,mime in lf.items():
           if mime == t:
               return short
    return t # "pls"


# Convert MIME type into list of ["audio/xyz", "audio/*", "*/*"]
# for comparison against configured record/play association.
def mime_app(fmt, cmd_list):
    for match in [ fmt, fmt[:fmt.find("/")] + "/*", "*/*" ]:
        if cmd_list.get(match):
            return cmd_list[match]



# Replaces instances of %m3u, %pls, %srv in a command string.
#  · Also understands short aliases %l, %f, %d.
#  · And can embed %title or %genre placeholders.
#  · Replace .pls URL with local .m3u file depending on map.
#
def interpol(cmd, url, source="pls", row={}):

    # inject other meta fields
    if row:
        for field in row:
            cmd = cmd.replace("%"+field, "%r" % row.get(field))

    # add default if cmd has no %url placeholder
    if cmd.find("%") < 0:
        cmd = cmd + " %m3u"

    # standard placeholders
    for dest, rx in placeholder_map.items():
        if re.search(rx, cmd, re.X):
            # from .pls to .m3u
            url = convert_playlist(url, listfmt(source), listfmt(dest))
            # insert quoted URL/filepath
            return re.sub(rx, cmd, quote(url), 2, re.X)

    return "false"


# Substitute .pls URL with local .m3u,
# or direct srv address, or leave as-is.
#
def convert_playlist(url, source, dest):

    # Leave alone
    if source == dest or source in ("srv", "href"):
        return url

    # Else
    return url



# Save row(s) in one of the export formats,
# depending on file extension:
#
#  · m3u
#  · pls
#  · xspf
#  · asx
#  · json
#  · smil
#
def save(row, fn, listformat="audio/x-scpls"):

    # output format
    format = re.findall("\.(m3u|pls|xspf|jspf|json|asx|smil)", fn)

    # modify stream url
    stream_urls = extract_urls(row["url"], listformat)

    # M3U
    if "m3u" in format:
        txt = "#M3U\n"
        for url in stream_urls:
            txt += http_fix_url(url) + "\n"

    # PLS
    elif "pls" in format:
        txt = "[playlist]\n" + "numberofentries=1\n"
        for i,u in enumerate(stream_urls):
            i = str(i + 1)
            txt += "File"+i + "=" + u + "\n"
            txt += "Title"+i + "=" + row["title"] + "\n"
            txt += "Length"+i + "=-1\n"
        txt += "Version=2\n"

    # XSPF
    elif "xspf" in format:
        txt = '<?xml version="1.0" encoding="UTF-8"?>' + "\n"
        txt += '<?http header="Content-Type: application/xspf+xml" ?>' + "\n"
        txt += '<playlist version="1" xmlns="http://xspf.org/ns/0/">' + "\n"
        for attr,tag in [("title","title"), ("homepage","info"), ("playing","annotation"), ("description","annotation")]:
            if row.get(attr):
                txt += "  <"+tag+">" + xmlentities(row[attr]) + "</"+tag+">\n"
        txt += "  <trackList>\n"
        for u in stream_urls:
            txt += '	<track><location>' + xmlentities(u) + '</location></track>' + "\n"
        txt += "  </trackList>\n</playlist>\n"

    # JSPF
    elif "jspf" in format:
        pass

    # JSON
    elif "json" in format:
        row["stream_urls"] = stream_urls
        txt = str(row)   # pseudo-json (python format)
    
    # ASX
    elif "asx" in format:
        txt = "<ASX version=\"3.0\">\n"			\
            + " <Title>" + xmlentities(row["title"]) + "</Title>\n"	\
            + " <Entry>\n"				\
            + "  <Title>" + xmlentities(row["title"]) + "</Title>\n"	\
            + "  <MoreInfo href=\"" + row["homepage"] + "\"/>\n"	\
            + "  <Ref href=\"" + stream_urls[0] + "\"/>\n"		\
            + " </Entry>\n</ASX>\n"

    # SMIL
    elif "smil" in format:
            txt = "<smil>\n<head>\n  <meta name=\"title\" content=\"" + xmlentities(row["title"]) + "\"/>\n</head>\n"	\
                + "<body>\n  <seq>\n    <audio src=\"" + stream_urls[0] + "\"/>\n  </seq>\n</body>\n</smil>\n"

    # unknown
    else:
        return

    # write
    if txt:
        f = open(fn, "wb")
        f.write(txt)
        f.close()
    pass




# retrieve real stream urls from .pls or .m3u links
def extract_urls(pls, listformat="__not_used_yet__"):
    # extract stream address from .pls URL
    if (re.search("\.pls", pls)):       #audio/x-scpls
        return pls(pls)
    elif (re.search("\.asx", pls)):	#video/x-ms-asf
        return re.findall("<Ref\s+href=\"(http://.+?)\"", http_get(pls))
    elif (re.search("\.m3u|\.ram|\.smil", pls)):	#audio/x-mpegurl
        return re.findall("(http://[^\s]+)", http_get(pls), re.I)
    else:  # just assume it was a direct mpeg/ogg streamserver link
        return [ (pls if pls.startswith("/") else http_fix_url(pls)) ]
    pass


# generate filename for temporary .m3u, if possible with unique id
def tmp_fn(pls):
    # use shoutcast unique stream id if available
    stream_id = re.search("http://.+?/.*?(\d+)", pls, re.M)
    stream_id = stream_id and stream_id.group(1) or "XXXXXX"
    try:
        channelname = main.current_channel
    except:
        channelname = "unknown"
    return (str(conf.tmp) + os.sep + "streamtuner2."+channelname+"."+stream_id+".m3u", len(stream_id) > 3 and stream_id != "XXXXXX")

# check if there are any urls in a given file
def has_urls(tmp_fn):
    if os.path.exists(tmp_fn):
        return open(tmp_fn, "r").read().find("http://") > 0
    

# create a local .m3u file from it
def m3u(pls):

    # temp filename
    (tmp_fn, unique) = tmp_fn(pls)
    # does it already exist?
    if tmp_fn and unique and conf.reuse_m3u and has_urls(tmp_fn):
        return tmp_fn

    # download PLS
    debug( dbg.DATA, "pls=",pls )
    url_list = extract_urls(pls)
    debug( dbg.DATA, "urls=", url_list )

    # output URL list to temporary .m3u file
    if (len(url_list)):
        #tmp_fn = 
        f = open(tmp_fn, "w")
        f.write("#M3U\n")
        f.write("\n".join(url_list) + "\n")
        f.close()
        # return path/name of temporary file
        return tmp_fn
    else:
        debug( dbg.ERR, "error, there were no URLs in ", pls )
        raise "Empty PLS"

# Download a .pls resource and extract urls
def extract_from_pls(url):
    text = http_get(url)
    debug(dbg.DATA, "pls_text=", text)
    return re.findall("\s*File\d*\s*=\s*(\w+://[^\s]+)", text, re.I)
    # currently misses out on the titles            


# get a single direct ICY stream url (extract either from PLS or M3U)
def srv(url):
    return extract_urls(url)[0]