Add oe1archive.py
authorStefan Huber <shuber@sthu.org>
Wed, 7 Jun 2017 18:09:58 +0000 (20:09 +0200)
committerStefan Huber <shuber@sthu.org>
Wed, 7 Jun 2017 18:20:25 +0000 (20:20 +0200)
oe1archive.py [new file with mode: 0755]

diff --git a/oe1archive.py b/oe1archive.py
new file mode 100755 (executable)
index 0000000..1d4bfbf
--- /dev/null
@@ -0,0 +1,143 @@
+#!/usr/bin/env python3
+"""A simple tool to query the Ö1 7 Tage archive."""
+
+__version__ = "2.0"
+__author__ = "Stefan Huber"
+
+
+import urllib.request
+import simplejson
+import dateutil.parser
+import sys
+import getopt
+import re
+
+
+class Archive:
+
+    def __init__(self):
+        self.json = read_json("http://audioapi.orf.at/oe1/json/2.0/broadcasts/")
+
+    def get_days(self):
+        return map(_json_to_day, self.json)
+
+    def get_broadcasts(self, day):
+        bjson = self.json[day]['broadcasts']
+        return map(_json_to_broadcast, bjson)
+
+    def get_broadcast(self, day, broadcast):
+        return _json_to_broadcast(self.json[day]['broadcasts'][broadcast])
+
+    def get_broadcast_subtitle(self, day, broadcast):
+        return self.json[day]['broadcasts'][broadcast]['subtitle']
+
+    def get_broadcast_url(self, day, broadcast):
+        date = self.json[day]['day']
+        pk = self.json[day]['broadcasts'][broadcast]['programKey']
+
+        burl = 'https://audioapi.orf.at/oe1/api/json/current/broadcast/%s/%d'
+        bjson = read_json(burl % (pk, date))
+
+        sjson = bjson['streams']
+        if len(sjson) == 0:
+            return None
+
+        sid = sjson[0]['loopStreamId']
+        surl = 'http://loopstream01.apa.at/?channel=oe1&shoutcast=0&id=%s'
+        return surl % sid
+
+    def get_broadcasts_by_regex(self, key):
+        rex = re.compile(key, re.IGNORECASE)
+
+        res = []
+        for d, djson in enumerate(self.json):
+            for b, bjson in enumerate(djson['broadcasts']):
+                if rex.search(bjson['title']) is not None:
+                    res.append((d, b))
+                elif bjson['subtitle'] is not None and rex.search(bjson['subtitle']) is not None:
+                    res.append((d, b))
+        return res
+
+def _json_to_day(djson):
+    return dateutil.parser.parse(djson['dateISO'])
+
+def _json_to_broadcast(bjson):
+    dt = dateutil.parser.parse(bjson['startISO'])
+    return (dt, bjson['title'])
+
+
+def read_json(url):
+    with urllib.request.urlopen(url) as f:
+        dec = simplejson.JSONDecoder()
+        return dec.decode(f.read())
+
+def input_index(prompt, li):
+    while True:
+        try:
+            idx = int(input(prompt))
+            if idx < 0 or idx >= len(li):
+                print("Out out range!")
+            else:
+                return idx
+
+        except ValueError:
+            print("Unknown input.")
+        except EOFError:
+            sys.exit(1)
+
+def screen_help():
+    print("""Usage:
+    {0} -h, --help
+    {0} -c, --choose
+    {0} -s, --search TITLE""".format(sys.argv[0]))
+
+def screen_choose():
+    a = Archive()
+
+    print("Choose a date:")
+    days = list(a.get_days())
+    for i, date in enumerate(days):
+        print("  [%d]  %s" % (i, date.strftime("%a %d. %b %Y")))
+    day = input_index("Date: ", days)
+    print()
+
+    print("Choose a broadcast:")
+    broadcasts = list(a.get_broadcasts(day))
+    for i, b in enumerate(broadcasts):
+        date, title = b
+        print("  [%2d]  %s  %s" % (i, date.strftime("%H:%M:%S"), title))
+    broadcast = input_index("Broadcast: ", broadcasts)
+    print()
+
+    url = a.get_broadcast_url(day, broadcast)
+    if url is None:
+        print("No stream found.")
+        sys.exit(1)
+    print(url)
+
+def screen_search(key):
+    a = Archive()
+
+    for d, b in a.get_broadcasts_by_regex(key):
+        date, title = a.get_broadcast(d, b)
+        print("%s   %s" % (date.strftime("%a %d.%m.%Y  %H:%M:%S"), title))
+        print("  %s" % a.get_broadcast_url(d, b))
+        print()
+
+if __name__ == "__main__":
+
+    try:
+        opts, args = getopt.getopt(sys.argv[1:], "hcs:",
+                ["help", "choose", "search="])
+    except getopt.GetoptError as err:
+        print(err)
+        screen_help()
+        sys.exit(2)
+
+    for o, a in opts:
+        if o in ["-h", "--help"]:
+            screen_help()
+        if o in ["-c", "--choose"]:
+            screen_choose()
+        if o in ["-s", "--search"]:
+            screen_search(a)