41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 | "url/direct": "srv",
"url/youtube": "href",
"url/http": "href",
"audio/x-pn-realaudio": "ram",
"application/smil": "smil",
"application/vnd.ms-wpl":"smil",
"x-urn/st2-script": "script", # unused
}
# Audio type MIME map
mediafmt_t = {
"audio/mpeg": "mp3",
"audio/ogg": "ogg",
"audio/aac" : "aac",
"audio/aacp" : "aac",
"audio/midi": "midi",
"audio/mod": "mod",
"audio/it+zip": "mod",
"audio/s3+zip": "mod",
"audio/xm+zip": "mod",
}
# Player command placeholders for playlist formats
placeholder_map = dict(
pls = "%url | %pls | %u | %l | %r",
m3u = "%m3u | %f | %g | %m",
srv = "%srv | %d | %s",
)
# Playlist format content probing (assert type)
playlist_content_map = {
"pls": r""" (?i)\[playlist\].*numberofentries""",
"xspf": r""" <\?xml .* <playlist .* http://xspf\.org/ns/0/""",
"m3u": r""" #M3U""",
"asx" : r""" <ASX\b""",
"smil": r""" <smil[^>]*> .* <seq>""",
"wpl": r""" <\?wpl \s+ version="1\.0" \s* \?>""",
"jspf": r""" \{ \s* "playlist": \s* \{ """,
"json": r""" "url": \s* "\w+:// """,
"href": r""" .* """,
}
# Exec wrapper
#
def run(cmd):
if cmd: debug(dbg.PROC, "Exec:", cmd)
try: os.system("start \"%s\"" % cmd if conf.windows else cmd + " &")
except: debug(dbg.ERR, "Command not found:", cmd)
# Start web browser
#
def browser(url): |
>
|
|
|
|
|
|
|
|
|
>
|
>
|
|
|
<
>
|
| 41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 | "url/direct": "srv",
"url/youtube": "href",
"url/http": "href",
"audio/x-pn-realaudio": "ram",
"application/smil": "smil",
"application/vnd.ms-wpl":"smil",
"x-urn/st2-script": "script", # unused
"application/x-shockwave-flash": "href", # fallback
}
# Audio type MIME map
mediafmt_t = {
"audio/mpeg": "mp3",
"audio/ogg": "ogg",
"audio/aac" : "aac",
"audio/aacp" : "aac",
"audio/midi": "midi",
"audio/mod": "mod",
"audio/it+zip": "mod",
"audio/s3+zip": "mod",
"audio/xm+zip": "mod",
}
# Player command placeholders for playlist formats
placeholder_map = dict(
pls = "(%url | %pls | %u | %l | %r) \\b",
m3u = "(%m3u | %f | %g | %m) \\b",
srv = "(%srv | %d | %s) \\b",
)
# Playlist format content probing (assert type)
playlist_content_map = [
("pls", r""" (?i)\[playlist\].*numberofentries """),
("xspf", r""" <\?xml .* <playlist .* http://xspf\.org/ns/0/ """),
("m3u", r""" ^ \s* #(EXT)?M3U """),
("asx" , r""" <ASX\b """),
("smil", r""" <smil[^>]*> .* <seq> """),
("html", r""" <(audio|video)\b[^>]+\bsrc\s*=\s*["']?https?:// """),
("wpl", r""" <\?wpl \s+ version="1\.0" \s* \?> """),
("b4s", r""" <WinampXML> """), # http://gonze.com/playlists/playlist-format-survey.html
("jspf", r""" ^ \s* \{ \s* "playlist": \s* \{ """),
("json", r""" "url": \s* "\w+:// """),
("href", r""" .* """),
]
# Exec wrapper
#
def run(cmd):
debug(dbg.PROC, "Exec:", cmd)
try: os.system("start \"%s\"" % cmd if conf.windows else cmd + " &")
except: debug(dbg.ERR, "Command not found:", cmd)
# Start web browser
#
def browser(url): |
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269 |
# standard placeholders
for dest, rx in placeholder_map.items():
if re.search(rx, cmd, re.X):
# from .pls to .m3u
urls = convert_playlist(url, listfmt(source), listfmt(dest))
# insert quoted URL/filepath
return re.sub(rx, cmd, quote(urls), 2, re.X)
return "false"
# Substitute .pls URL with local .m3u,
# or direct srv address, or leave as-is.
#
def convert_playlist(url, source, dest):
urls = []
print(dbg.PROC, "convert_playlist(", url, source, dest, ")")
# Leave alone
is_url = re.search("\w+://", url)
if source == dest or source in ("srv", "href") or not is_url:
return [url]
# Retrieve from URL
(mime, cnt) = http_probe_get(url)
# Leave streaming server as is
if mime == "srv":
cnt = ""
return [url]
# Test URL path "extension" for ".pls" / ".m3u" etc.
ext = re.findall("\.(\w)$|($)", url)[0]
# Probe MIME type and content per regex
probe = None
for probe,rx in playlist_content_map.items():
if re.search(rx, cnt, re.X|re.S):
break # with `probe` set
# Check ambiguity (except pseudo extension)
if len(set([source, mime, probe])) > 1:
print(dbg.ERR, "Possible playlist format mismatch:", (source, mime, probe, ext))
# Extract URLs from content
for fmt,extractor in [ ("pls",extract_playlist.pls), ("asx",extract_playlist.asx), ("raw",extract_playlist.raw) ]:
if not urls and fmt in (source, mime, probe, ext):
urls = extractor(cnt)
# Return asis for srv targets
if dest in ("srv", "href", "any"):
return urls
print urls
# Otherwise convert to local file
fn = tmp_fn(cnt)
save(urls[0], fn, dest)
return [fn]
# Tries to fetch a resource, aborts on ICY responses.
#
def http_probe_get(url):
# possible streaming request
r = session.get(url, stream=True)
if not len(r.headers):
return ("srv", r)
# extract payload
mime = r.headers.get("content-type", "any")
if mediafmt_t.get(mime):
mime = mediafmt_t.get(mime)
content = "".join(r.iter_lines())
return (mime, content)
# Extract URLs from playlist formats:
#
class extract_playlist(object):
@staticmethod
def pls(text):
return re.findall("\s*File\d*\s*=\s*(\w+://[^\s]+)", text, re.I)
@staticmethod
def asx(text):
return re.findall("<Ref\s+href=\"(http://.+?)\"", text)
@staticmethod
def raw(text):
print(dbg.WARN, "Raw playlist extraction")
return re.findall("(https?://[^\s]+)", content, re.I)
# Save row(s) in one of the export formats,
# depending on file extension:
#
# · m3u
# · pls |
|
<
|
|
<
|
|
|
>
>
|
>
|
>
|
>
>
|
|
|
|
>
>
>
|
|
|
| 165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275 |
# standard placeholders
for dest, rx in placeholder_map.items():
if re.search(rx, cmd, re.X):
# from .pls to .m3u
urls = convert_playlist(url, listfmt(source), listfmt(dest))
# insert quoted URL/filepath
return re.sub(rx, quote(urls), cmd, 2, re.X)
return "false"
# Substitute .pls URL with local .m3u,
# or direct srv address, or leave as-is.
#
def convert_playlist(url, source, dest):
urls = []
debug(dbg.PROC, "convert_playlist(", url, source, dest, ")")
# Leave alone if format matches, or if "srv" URL class, or if it's a local path already
if source == dest or source in ("srv", "href") or not re.search("\w+://", url):
return [url]
# Retrieve from URL
(mime, cnt) = http_probe_get(url)
# Leave streaming server as is
if mime == "srv":
cnt = ""
return [url]
# Test URL path "extension" for ".pls" / ".m3u" etc.
ext = re.findall("\.(\w)$", url)
ext = ext[0] if ext else ""
# Probe MIME type and content per regex
probe = None
print cnt
for probe,rx in playlist_content_map:
if re.search(rx, cnt, re.X|re.S):
probe = listfmt(probe)
break # with `probe` set
# Check ambiguity (except pseudo extension)
if len(set([source, mime, probe])) > 1:
debug(dbg.ERR, "Possible playlist format mismatch:", (source, mime, probe, ext))
# Extract URLs from content
for fmt,extractor in [ ("pls",extract_playlist.pls), ("asx",extract_playlist.asx), ("raw",extract_playlist.raw) ]:
if not urls and fmt in (source, mime, probe, ext):
urls = extractor(cnt)
debug(fmt, extractor, urls)
# Return original, or asis for srv targets
if not urls:
return [url]
elif dest in ("srv", "href", "any"):
return urls
debug( urls )
# Otherwise convert to local file
fn = tmp_fn(cnt)
save(urls[0], fn, dest)
return [fn]
# Tries to fetch a resource, aborts on ICY responses.
#
def http_probe_get(url):
# possible streaming request
r = session.get(url, stream=True)
if not len(r.headers):
return ("srv", r)
# extract payload
mime = r.headers.get("content-type", "any")
if listfmt_t.get(mime):
mime = listfmt_t.get(mime)
elif mimefmt_t.get(mime):
mime = mimefmt_t.get(mime)
return (mime, url)
content = "\n".join(r.iter_lines())
return (mime, content)
# Extract URLs from playlist formats:
#
class extract_playlist(object):
@staticmethod
def pls(text):
return re.findall("\s*File\d*\s*=\s*(\w+://[^\s]+)", text, re.I)
@staticmethod
def asx(text):
return re.findall("<Ref\s+href=\"(http://.+?)\"", text)
@staticmethod
def raw(text):
debug(dbg.WARN, "Raw playlist extraction")
return re.findall("([\w+]+://[^\s\"\'\>\#]+)", content)
# Save row(s) in one of the export formats,
# depending on file extension:
#
# · m3u
# · pls |