Internet radio browser GUI for music/video streams from various directory services.

⌈⌋ ⎇ branch:  streamtuner2


Artifact [2ff7501bb2]

Artifact 2ff7501bb2d5eb2c4f3b2283ee33d5d49d5511f9:



# encoding: UTF-8
# api: streamtuner2
# type: functions
# title: play/record actions
# description: Starts audio applications, guesses MIME types for URLs
# version: 0.8
#
# Multimedia interface for starting audio players, recording app,
# or web browser (listed as "url/http" association in players).
#
# Each channel plugin has a .listtype which describes the linked
# audio playlist format. It's audio/x-scpls mostly, seldomly m3u,
# but sometimes url/direct if the entry[url] directly leads to the
# streaming server.
#
# As fallback there is a regex which just looks for URLs in the
# given resource (works for m3u/pls/xspf/asx/...).


import re
import os
from ahttp import fix_url as http_fix_url, session
from config import conf, __print__ as debug, dbg
import platform


# Coupling to main window
#
main = None



# Streamlink/listformat mapping
listfmt_t = {
    "audio/x-scpls":        "pls",
    "audio/x-mpegurl":      "m3u",
    "video/x-ms-asf":       "asx",
    "application/xspf+xml": "xspf",
    "*/*":                  "href",
    "url/direct":           "srv",
    "url/youtube":          "href",
    "url/http":             "href",
    "audio/x-pn-realaudio": "ram",
    "application/smil":     "smil",
    "application/vnd.ms-wpl":"smil",
    "x-urn/st2-script":     "script", # unused
}

# Audio type MIME map
mediafmt_t = {
    "audio/mpeg":   "mp3",
    "audio/ogg":    "ogg",
    "audio/aac" :   "aac",
    "audio/aacp" :  "aac",
    "audio/midi":   "midi",
    "audio/mod":    "mod",
    "audio/it+zip": "mod",
    "audio/s3+zip": "mod",
    "audio/xm+zip": "mod",
}

# Player command placeholders for playlist formats
placeholder_map = dict(
    pls = "%url | %pls | %u | %l | %r",
    m3u = "%m3u | %f | %g | %m",
    srv = "%srv | %d | %s",
)

# Playlist format content probing (assert type)
playlist_content_map = {
   "pls":  r""" (?i)\[playlist\].*numberofentries""",
   "xspf": r""" <\?xml .* <playlist .* http://xspf\.org/ns/0/""",
   "m3u":  r""" #M3U""",
   "asx" : r""" <ASX\b""",
   "smil": r""" <smil[^>]*> .* <seq>""",
   "wpl":  r""" <\?wpl \s+ version="1\.0" \s* \?>""",
   "jspf": r""" \{ \s* "playlist": \s* \{ """,
   "json": r""" "url": \s* "\w+:// """,
   "href": r""" .* """,
}



# Exec wrapper
#
def run(cmd):
    if cmd: debug(dbg.PROC, "Exec:", cmd)
    try:    os.system("start \"%s\"" % cmd if conf.windows else cmd + " &")
    except: debug(dbg.ERR, "Command not found:", cmd)


# Start web browser
#
def browser(url):
    bin = conf.play.get("url/http", "sensible-browser")
    run(bin + " " + quote(url))


# Open help browser, streamtuner2 pages
#
def help(*args):
    run("yelp /usr/share/doc/streamtuner2/help/")


# Calls player for stream url and format
#
def play(url, audioformat="audio/mpeg", listformat="href"):
    cmd = mime_app(audioformat, conf.play)
    cmd = interpol(cmd, url, listformat)
    run(cmd)


# Call streamripper
#
def record(url, audioformat="audio/mpeg", listformat="href", append="", row={}):
    cmd = mime_app(audioformat, conf.record)
    cmd = interpol(cmd, url, listformat, row)
    run(cmd)


# OS shell command escaping
#
def quote(ins):
    if type(ins) is str:
        return "%r" % str(ins)
    else:
        return " ".join(["%r" % str(s) for s in ins])


# Convert e.g. "text/x-scpls" MIME types to just "pls" monikers
#
def listfmt(t = "pls"):
    if t in listfmt_t.values():
       for short,mime in listfmt_t.items():
           if mime == t:
               return short
    return t # "pls"


# Convert MIME type into list of ["audio/xyz", "audio/*", "*/*"]
# for comparison against configured record/play association.
def mime_app(fmt, cmd_list):
    major = fmt[:fmt.find("/")]
    for match in [ fmt, major + "/*", "*/*" ]:
        if cmd_list.get(match):
            return cmd_list[match]



# Replaces instances of %m3u, %pls, %srv in a command string.
#  · Also understands short aliases %l, %f, %d.
#  · And can embed %title or %genre placeholders.
#  · Replace .pls URL with local .m3u file depending on map.
#
def interpol(cmd, url, source="pls", row={}):

    # inject other meta fields
    if row:
        for field in row:
            cmd = cmd.replace("%"+field, "%r" % row.get(field))

    # add default if cmd has no %url placeholder
    if cmd.find("%") < 0:
        cmd = cmd + " %m3u"

    # standard placeholders
    for dest, rx in placeholder_map.items():
        if re.search(rx, cmd, re.X):
            # from .pls to .m3u
            urls = convert_playlist(url, listfmt(source), listfmt(dest))
            # insert quoted URL/filepath
            return re.sub(rx, cmd, quote(urls), 2, re.X)

    return "false"


# Substitute .pls URL with local .m3u,
# or direct srv address, or leave as-is.
#
def convert_playlist(url, source, dest):
    urls = []
    
    print(dbg.PROC, "convert_playlist(", url, source, dest, ")")

    # Leave alone
    is_url = re.search("\w+://", url)
    if source == dest or source in ("srv", "href") or not is_url:
        return [url]
        
    # Retrieve from URL
    (mime, cnt) = http_probe_get(url)
    
    # Leave streaming server as is
    if mime == "srv":
        cnt = ""
        return [url]

    # Test URL path "extension" for ".pls" / ".m3u" etc.
    ext = re.findall("\.(\w)$|($)", url)[0]

    # Probe MIME type and content per regex
    probe = None
    for probe,rx in playlist_content_map.items():
        if re.search(rx, cnt, re.X|re.S):
            break # with `probe` set

    # Check ambiguity (except pseudo extension)
    if len(set([source, mime, probe])) > 1:
        print(dbg.ERR, "Possible playlist format mismatch:", (source, mime, probe, ext))

    # Extract URLs from content
    for fmt,extractor in [ ("pls",extract_playlist.pls), ("asx",extract_playlist.asx), ("raw",extract_playlist.raw) ]:
        if not urls and fmt in (source, mime, probe, ext):
            urls = extractor(cnt)
            
    # Return asis for srv targets
    if dest in ("srv", "href", "any"):
        return urls
    print urls

    # Otherwise convert to local file
    fn = tmp_fn(cnt)
    save(urls[0], fn, dest)
    return [fn]



# Tries to fetch a resource, aborts on ICY responses.
#
def http_probe_get(url):

    # possible streaming request
    r = session.get(url, stream=True)
    if not len(r.headers):
        return ("srv", r)

    # extract payload
    mime = r.headers.get("content-type", "any")
    if mediafmt_t.get(mime):
        mime = mediafmt_t.get(mime)
    content = "".join(r.iter_lines())
    return (mime, content)



# Extract URLs from playlist formats:
#
class extract_playlist(object):

    @staticmethod
    def pls(text):
        return re.findall("\s*File\d*\s*=\s*(\w+://[^\s]+)", text, re.I)

    @staticmethod
    def asx(text):
        return re.findall("<Ref\s+href=\"(http://.+?)\"", text)

    @staticmethod
    def raw(text):
        print(dbg.WARN, "Raw playlist extraction")
        return re.findall("(https?://[^\s]+)", content, re.I)


# Save row(s) in one of the export formats,
# depending on file extension:
#
#  · m3u
#  · pls
#  · xspf
#  · asx
#  · json
#  · smil
#
def save(row, fn, listformat="audio/x-scpls"):

    # output format
    format = re.findall("\.(m3u|pls|xspf|jspf|json|asx|smil)", fn)

    # modify stream url
    stream_urls = extract_urls(row["url"], listformat)

    # M3U
    if "m3u" in format:
        txt = "#M3U\n"
        for url in stream_urls:
            txt += http_fix_url(url) + "\n"

    # PLS
    elif "pls" in format:
        txt = "[playlist]\n" + "numberofentries=1\n"
        for i,u in enumerate(stream_urls):
            i = str(i + 1)
            txt += "File"+i + "=" + u + "\n"
            txt += "Title"+i + "=" + row["title"] + "\n"
            txt += "Length"+i + "=-1\n"
        txt += "Version=2\n"

    # XSPF
    elif "xspf" in format:
        txt = '<?xml version="1.0" encoding="UTF-8"?>' + "\n"
        txt += '<?http header="Content-Type: application/xspf+xml" ?>' + "\n"
        txt += '<playlist version="1" xmlns="http://xspf.org/ns/0/">' + "\n"
        for attr,tag in [("title","title"), ("homepage","info"), ("playing","annotation"), ("description","annotation")]:
            if row.get(attr):
                txt += "  <"+tag+">" + xmlentities(row[attr]) + "</"+tag+">\n"
        txt += "  <trackList>\n"
        for u in stream_urls:
            txt += '	<track><location>' + xmlentities(u) + '</location></track>' + "\n"
        txt += "  </trackList>\n</playlist>\n"

    # JSPF
    elif "jspf" in format:
        pass

    # JSON
    elif "json" in format:
        row["stream_urls"] = stream_urls
        txt = str(row)   # pseudo-json (python format)
    
    # ASX
    elif "asx" in format:
        txt = "<ASX version=\"3.0\">\n"			\
            + " <Title>" + xmlentities(row["title"]) + "</Title>\n"	\
            + " <Entry>\n"				\
            + "  <Title>" + xmlentities(row["title"]) + "</Title>\n"	\
            + "  <MoreInfo href=\"" + row["homepage"] + "\"/>\n"	\
            + "  <Ref href=\"" + stream_urls[0] + "\"/>\n"		\
            + " </Entry>\n</ASX>\n"

    # SMIL
    elif "smil" in format:
            txt = "<smil>\n<head>\n  <meta name=\"title\" content=\"" + xmlentities(row["title"]) + "\"/>\n</head>\n"	\
                + "<body>\n  <seq>\n    <audio src=\"" + stream_urls[0] + "\"/>\n  </seq>\n</body>\n</smil>\n"

    # unknown
    else:
        return

    # write
    if txt:
        with open(fn, "wb") as f:
            f.write(txt)
    pass



# generate filename for temporary .m3u, if possible with unique id
def tmp_fn(pls):
    # use shoutcast unique stream id if available
    stream_id = re.search("http://.+?/.*?(\d+)", pls, re.M)
    stream_id = stream_id and stream_id.group(1) or "XXXXXX"
    try:
        channelname = main.current_channel
    except:
        channelname = "unknown"
    return (str(conf.tmp) + os.sep + "streamtuner2."+channelname+"."+stream_id+".m3u", len(stream_id) > 3 and stream_id != "XXXXXX")


# check if there are any urls in a given file
def has_urls(tmp_fn):
    if os.path.exists(tmp_fn):
        return open(tmp_fn, "r").read().find("http://") > 0