omegalines 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import HTMLParser
  4. import datetime
  5. import dateutil.parser
  6. import dateutil.tz
  7. import json
  8. import os
  9. import time
  10. import urllib2
  11. import yaml
  12. from OmegaExpansion import oledExp
  13. DEFAULT_CONFIG_PATHS = [
  14. os.path.join(os.path.expanduser('~'), '.omegalines'),
  15. os.path.join(os.sep, 'etc', 'omegalines'),
  16. ]
  17. OLED_DISPLAY_HEIGHT = 8
  18. OLED_DISPLAY_WIDTH = 21
  19. WIENER_LINIEN_DEFAULT_UPDATE_INTERVAL_SECONDS = 10
  20. OEBB_DEFAULT_UPDATE_INTERVAL_SECONDS = 30
  21. # https://openwrt.org/docs/user-guide/system_configuration
  22. OEBB_TIMEZONE = dateutil.tz.tzstr('CET-1CEST,M3.5.0,M10.5.0/3')
  23. html_parser = HTMLParser.HTMLParser()
  24. def datetime_now_local():
  25. return datetime.datetime.now(dateutil.tz.tzlocal())
  26. def format_timedelta(timedelta):
  27. total_seconds = timedelta.total_seconds()
  28. return '%s%d:%02d' % (
  29. '-' if total_seconds < 0 else '',
  30. int(abs(total_seconds) / 60),
  31. abs(total_seconds) % 60,
  32. )
  33. assert "0:20" == format_timedelta(datetime.timedelta(seconds=20))
  34. assert "1:20" == format_timedelta(datetime.timedelta(seconds=80))
  35. assert "2:00" == format_timedelta(datetime.timedelta(seconds=120))
  36. assert "-0:20" == format_timedelta(datetime.timedelta(seconds=-20))
  37. assert "-1:20" == format_timedelta(datetime.timedelta(seconds=-80))
  38. assert "-2:00" == format_timedelta(datetime.timedelta(seconds=-120))
  39. def oled_write_line(line):
  40. oledExp.write(
  41. line.ljust(OLED_DISPLAY_WIDTH, ' ')[:OLED_DISPLAY_WIDTH],
  42. )
  43. def oled_encode(text):
  44. return text.replace(u'ä', u'ue') \
  45. .replace(u'ö', u'ue') \
  46. .replace(u'ü', u'ue')
  47. class Departure:
  48. def __init__(self, line, towards, predicted_time):
  49. self.line = line
  50. self.towards = towards
  51. self.predicted_time = predicted_time
  52. @property
  53. def predicted_timedelta(self):
  54. return self.predicted_time - datetime_now_local()
  55. def request_wiener_linien_departures(api_key, rbl):
  56. req = urllib2.Request(
  57. "https://www.wienerlinien.at/ogd_realtime/monitor?sender=%s&rbl=%s"
  58. % (api_key, rbl),
  59. )
  60. req.add_header("Accept", "application/json")
  61. req.add_header("Content-Type", "application/json")
  62. req_time = datetime_now_local()
  63. resp = urllib2.urlopen(req)
  64. resp_data = json.loads(resp.read())
  65. # datetime.datetime.strptime:
  66. # ValueError: 'z' is a bad directive in format
  67. # '%Y-%m-%dT%H:%M:%S.%f%z'
  68. server_time_delta = req_time - \
  69. dateutil.parser.parse(resp_data['message']['serverTime'])
  70. monitors_data = resp_data['data']['monitors']
  71. assert 1 == len(monitors_data)
  72. departures = []
  73. for line_data in monitors_data[0]['lines']:
  74. assert 1 == len(line_data['departures'])
  75. for departure_data in line_data['departures']['departure']:
  76. try:
  77. predicted_time_server = dateutil.parser.parse(
  78. departure_data['departureTime']['timeReal'],
  79. )
  80. except KeyError as e:
  81. print(e)
  82. predicted_time_server = None
  83. if predicted_time_server:
  84. departures.append(Departure(
  85. line=departure_data['vehicle']['name']
  86. if 'vehicle' in departure_data else line_data['name'],
  87. towards=departure_data['vehicle']['towards']
  88. if 'vehicle' in departure_data else line_data['towards'],
  89. predicted_time=predicted_time_server - server_time_delta,
  90. ))
  91. return departures
  92. def request_oebb_departures(eva_id):
  93. req_time = datetime_now_local()
  94. req = urllib2.Request(
  95. 'http://fahrplan.oebb.at/bin/stboard.exe/dn?' + '&'.join([
  96. 'L=vs_scotty.vs_liveticker',
  97. 'evaId=%d' % eva_id,
  98. 'boardType=dep',
  99. 'disableEquivs=yes',
  100. 'outputMode=tickerDataOnly',
  101. 'start=yes',
  102. ]),
  103. )
  104. print('request %s' % req.get_full_url())
  105. resp = urllib2.urlopen(req)
  106. resp_data = json.loads(
  107. resp.read().replace('journeysObj = ', ''),
  108. )
  109. departures = []
  110. for departure_data in resp_data['journey']:
  111. departure_time = datetime.datetime.strptime(departure_data['ti'], '%H:%M').replace(
  112. year=req_time.year,
  113. month=req_time.month,
  114. day=req_time.day,
  115. second=0,
  116. tzinfo=OEBB_TIMEZONE,
  117. )
  118. departures.append(Departure(
  119. line=departure_data['pr'],
  120. towards=html_parser.unescape(departure_data['lastStop']),
  121. predicted_time=departure_time,
  122. ))
  123. return departures
  124. def run(config_path):
  125. if config_path is None:
  126. available_config_paths = [
  127. p for p in DEFAULT_CONFIG_PATHS if os.path.exists(p)
  128. ]
  129. if len(available_config_paths) == 0:
  130. raise Exception('found no config file')
  131. config_path = available_config_paths[0]
  132. print('config path: %s' % config_path)
  133. with open(config_path, 'r') as config_file:
  134. config = yaml.load(config_file.read())
  135. if not 'update_interval_seconds' in config['wiener_linien']:
  136. config['wiener_linien']['update_interval_seconds'] = \
  137. WIENER_LINIEN_DEFAULT_UPDATE_INTERVAL_SECONDS
  138. if not 'oebb' in config:
  139. config['oebb'] = {}
  140. if not 'update_interval_seconds' in config['oebb']:
  141. config['oebb']['update_interval_seconds'] = \
  142. OEBB_DEFAULT_UPDATE_INTERVAL_SECONDS
  143. assert not oledExp.driverInit()
  144. assert not oledExp.setDisplayPower(1)
  145. wiener_linien_departures = []
  146. wiener_linien_last_update_time = None
  147. oebb_departures = []
  148. oebb_last_update_time = None
  149. while True:
  150. if wiener_linien_last_update_time is None \
  151. or time.time() - wiener_linien_last_update_time \
  152. > config['wiener_linien']['update_interval_seconds']:
  153. print('update wiener linien')
  154. try:
  155. wiener_linien_departures = request_wiener_linien_departures(
  156. api_key=config['wiener_linien']['api_key'],
  157. rbl=config['wiener_linien']['rbl'],
  158. )
  159. wiener_linien_last_update_time = time.time()
  160. except urllib2.HTTPError as e:
  161. print(e)
  162. if 'eva_id' in config['oebb'] \
  163. and (oebb_last_update_time is None
  164. or time.time() - oebb_last_update_time
  165. > config['oebb']['update_interval_seconds']):
  166. oebb_departures = request_oebb_departures(
  167. eva_id=config['oebb']['eva_id'],
  168. )
  169. oebb_last_update_time = time.time()
  170. oledExp.setCursor(0, 0)
  171. oledExp.write(datetime_now_local().strftime("%Y-%m-%d %H:%M:%S"))
  172. departures = wiener_linien_departures + oebb_departures
  173. for departure_idx, departure in enumerate(departures[:OLED_DISPLAY_HEIGHT - 1]):
  174. oledExp.setCursor(1 + departure_idx, 0)
  175. oled_write_line("%s %s %s" % (
  176. format_timedelta(departure.predicted_timedelta),
  177. departure.line.replace(' ', ''),
  178. oled_encode(departure.towards),
  179. ))
  180. time.sleep(0.1)
  181. def _init_argparser():
  182. import argparse
  183. argparser = argparse.ArgumentParser()
  184. argparser.add_argument(
  185. '-c', '--config-path',
  186. dest='config_path',
  187. type=str,
  188. default=None,
  189. help='default: %r' % DEFAULT_CONFIG_PATHS,
  190. )
  191. return argparser
  192. def main(argv):
  193. argparser = _init_argparser()
  194. args = argparser.parse_args(argv)
  195. run(**vars(args))
  196. return 0
  197. if __name__ == "__main__":
  198. import sys
  199. sys.exit(main(sys.argv[1:]))