omegalines 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. #!/usr/bin/env python
  2. import datetime
  3. import dateutil.parser
  4. import dateutil.tz
  5. import json
  6. import os
  7. import time
  8. import urllib2
  9. import yaml
  10. from OmegaExpansion import oledExp
  11. def datetime_now_local():
  12. return datetime.datetime.now(dateutil.tz.tzlocal())
  13. class Departure:
  14. def __init__(self, line, towards, predicted_time):
  15. self.line = line
  16. self.towards = towards
  17. self.predicted_time = predicted_time
  18. @property
  19. def predicted_timedelta(self):
  20. return self.predicted_time - datetime_now_local()
  21. def request_wiener_linien_departures(api_key, rbl):
  22. req = urllib2.Request(
  23. "https://www.wienerlinien.at/ogd_realtime/monitor?sender=%s&rbl=%s"
  24. % (api_key, rbl),
  25. )
  26. req.add_header("Accept", "application/json")
  27. req.add_header("Content-Type", "application/json")
  28. req_time = datetime_now_local()
  29. resp = urllib2.urlopen(req)
  30. resp_data = json.loads(resp.read())
  31. # datetime.datetime.strptime:
  32. # ValueError: 'z' is a bad directive in format
  33. # '%Y-%m-%dT%H:%M:%S.%f%z'
  34. server_time_delta = req_time - \
  35. dateutil.parser.parse(resp_data['message']['serverTime'])
  36. monitors_data = resp_data['data']['monitors']
  37. assert 1 == len(monitors_data)
  38. departures = []
  39. for line_data in monitors_data[0]['lines']:
  40. assert 1 == len(line_data['departures'])
  41. for departure_data in line_data['departures']['departure']:
  42. try:
  43. predicted_time_server = dateutil.parser.parse(
  44. departure_data['departureTime']['timeReal'],
  45. )
  46. except KeyError as e:
  47. print(e)
  48. predicted_time_server = None
  49. if predicted_time_server:
  50. departures.append(Departure(
  51. line=departure_data['vehicle']['name']
  52. if 'vehicle' in departure_data else line_data['name'],
  53. towards=departure_data['vehicle']['towards']
  54. if 'vehicle' in departure_data else line_data['towards'],
  55. predicted_time=predicted_time_server - server_time_delta,
  56. ))
  57. return departures
  58. def run(config_path):
  59. with open(config_path, 'r') as config_file:
  60. config = yaml.load(config_file.read())
  61. wiener_linien_api_key = config['wiener_linien_api_key']
  62. assert not oledExp.driverInit()
  63. assert not oledExp.setDisplayPower(1)
  64. while True:
  65. oledExp.clear()
  66. oledExp.setCursor(0, 0)
  67. oledExp.write(datetime_now_local().strftime("%Y-%m-%d %H:%M:%S"))
  68. try:
  69. departures = request_wiener_linien_departures(
  70. api_key=wiener_linien_api_key,
  71. rbl=4648,
  72. )
  73. except urllib2.HTTPError:
  74. departures = None
  75. if departures:
  76. for departure_idx, departure in enumerate(departures):
  77. oledExp.setCursor(1 + departure_idx, 0)
  78. oledExp.write("%03d %s %s" % (
  79. departure.predicted_timedelta.total_seconds(),
  80. departure.line,
  81. departure.towards,
  82. ))
  83. time.sleep(9.9)
  84. def _init_argparser():
  85. import argparse
  86. argparser = argparse.ArgumentParser()
  87. argparser.add_argument(
  88. '-c', '--config-path',
  89. dest='config_path',
  90. type=str,
  91. default=os.path.join(os.path.expanduser('~'), '.omegalines'),
  92. help='default: %(default)s',
  93. )
  94. return argparser
  95. def main(argv):
  96. argparser = _init_argparser()
  97. args = argparser.parse_args(argv)
  98. run(**vars(args))
  99. return 0
  100. if __name__ == "__main__":
  101. import sys
  102. sys.exit(main(sys.argv[1:]))