219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
|
(mime, cnt) = http_probe_get(url)
# Leave streaming server as is
if mime == "srv":
cnt = ""
return [url]
# Test URL path "extension" for ".pls" / ".m3u" etc.
ext = re.findall("\.(\w)$", url)
ext = ext[0] if ext else None
# Probe MIME type and content per regex
probe = None
for probe,rx in playlist_content_map:
if re.search(rx, cnt, re.X|re.S):
probe = listfmt(probe)
break # with `probe` set
# Check ambiguity (except pseudo extension)
if len(set([source, mime, probe])) > 1:
debug(dbg.ERR, "Possible playlist format mismatch:", "listformat={}, http_mime={}, rx_probe={}, ext={}".format(source, mime, probe, ext))
# Extract URLs from content
for fmt in [id[0] for id in extract_playlist.extr_urls]:
|
<
<
<
|
<
|
|
<
<
<
|
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
|
(mime, cnt) = http_probe_get(url)
# Leave streaming server as is
if mime == "srv":
cnt = ""
return [url]
# Deduce likely content format
ext = probe_playlist_fn_ext(url)
probe = probe_playlist_content(cnt)
# Check ambiguity (except pseudo extension)
if len(set([source, mime, probe])) > 1:
debug(dbg.ERR, "Possible playlist format mismatch:", "listformat={}, http_mime={}, rx_probe={}, ext={}".format(source, mime, probe, ext))
# Extract URLs from content
for fmt in [id[0] for id in extract_playlist.extr_urls]:
|
256
257
258
259
260
261
262
263
264
265
266
267
268
269
|
with open(fn, "w") as f:
debug(dbg.DATA, "exporting with format:", dest, " into filename:", fn)
f.write( save_playlist(source="srv", multiply=True).export(urls, row, dest) )
return [fn]
else:
return urls
# Tries to fetch a resource, aborts on ICY responses.
#
def http_probe_get(url):
# HTTP request, abort if streaming server hit (no HTTP/ header, but ICY/ response)
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
|
with open(fn, "w") as f:
debug(dbg.DATA, "exporting with format:", dest, " into filename:", fn)
f.write( save_playlist(source="srv", multiply=True).export(urls, row, dest) )
return [fn]
else:
return urls
# Test URL/path "extension" for ".pls" / ".m3u" etc.
def probe_playlist_fn_ext(url):
e = re.findall("\.(pls|m3u|xspf|jspf|asx|wpl|wsf|smil|html|url|json)$", url)
if e: return e[0]
else: pass
# Probe MIME type and content per regex
def probe_playlist_content(cnt):
for probe,rx in playlist_content_map:
if re.search(rx, cnt, re.X|re.S):
return listfmt(probe)
return None
# Tries to fetch a resource, aborts on ICY responses.
#
def http_probe_get(url):
# HTTP request, abort if streaming server hit (no HTTP/ header, but ICY/ response)
|