123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import HTMLParser
- import datetime
- import dateutil.parser
- import dateutil.tz
- import json
- import os
- import time
- import urllib2
- import yaml
- from OmegaExpansion import oledExp
- DEFAULT_CONFIG_PATHS = [
- os.path.join(os.path.expanduser('~'), '.omegalines'),
- os.path.join(os.sep, 'etc', 'omegalines'),
- ]
- OLED_DISPLAY_HEIGHT = 8
- OLED_DISPLAY_WIDTH = 21
- WIENER_LINIEN_DEFAULT_UPDATE_INTERVAL_SECONDS = 10
- OEBB_DEFAULT_UPDATE_INTERVAL_SECONDS = 30
- # https://openwrt.org/docs/user-guide/system_configuration
- OEBB_TIMEZONE = dateutil.tz.tzstr('CET-1CEST,M3.5.0,M10.5.0/3')
- html_parser = HTMLParser.HTMLParser()
- def datetime_now_local():
- return datetime.datetime.now(dateutil.tz.tzlocal())
- def format_timedelta(timedelta):
- total_seconds = timedelta.total_seconds()
- return '%s%d:%02d' % (
- '-' if total_seconds < 0 else '',
- int(abs(total_seconds) / 60),
- abs(total_seconds) % 60,
- )
- assert "0:20" == format_timedelta(datetime.timedelta(seconds=20))
- assert "1:20" == format_timedelta(datetime.timedelta(seconds=80))
- assert "2:00" == format_timedelta(datetime.timedelta(seconds=120))
- assert "-0:20" == format_timedelta(datetime.timedelta(seconds=-20))
- assert "-1:20" == format_timedelta(datetime.timedelta(seconds=-80))
- assert "-2:00" == format_timedelta(datetime.timedelta(seconds=-120))
- def oled_write_line(line):
- oledExp.write(
- line.ljust(OLED_DISPLAY_WIDTH, ' ')[:OLED_DISPLAY_WIDTH],
- )
- def oled_encode(text):
- return text.replace(u'ä', u'ue') \
- .replace(u'ö', u'ue') \
- .replace(u'ü', u'ue')
- class Departure:
- def __init__(self, line, towards, predicted_time):
- self.line = line
- self.towards = towards
- self.predicted_time = predicted_time
- @property
- def predicted_timedelta(self):
- return self.predicted_time - datetime_now_local()
- def request_wiener_linien_departures(api_key, rbl):
- req = urllib2.Request(
- "https://www.wienerlinien.at/ogd_realtime/monitor?sender=%s&rbl=%s"
- % (api_key, rbl),
- )
- req.add_header("Accept", "application/json")
- req.add_header("Content-Type", "application/json")
- req_time = datetime_now_local()
- resp = urllib2.urlopen(req)
- resp_data = json.loads(resp.read())
- # datetime.datetime.strptime:
- # ValueError: 'z' is a bad directive in format
- # '%Y-%m-%dT%H:%M:%S.%f%z'
- server_time_delta = req_time - \
- dateutil.parser.parse(resp_data['message']['serverTime'])
- monitors_data = resp_data['data']['monitors']
- assert 1 == len(monitors_data)
- departures = []
- for line_data in monitors_data[0]['lines']:
- assert 1 == len(line_data['departures'])
- for departure_data in line_data['departures']['departure']:
- try:
- predicted_time_server = dateutil.parser.parse(
- departure_data['departureTime']['timeReal'],
- )
- except KeyError as e:
- print(e)
- predicted_time_server = None
- if predicted_time_server:
- departures.append(Departure(
- line=departure_data['vehicle']['name']
- if 'vehicle' in departure_data else line_data['name'],
- towards=departure_data['vehicle']['towards']
- if 'vehicle' in departure_data else line_data['towards'],
- predicted_time=predicted_time_server - server_time_delta,
- ))
- return departures
- def request_oebb_departures(eva_id):
- req_time = datetime_now_local()
- req = urllib2.Request(
- 'http://fahrplan.oebb.at/bin/stboard.exe/dn?' + '&'.join([
- 'L=vs_scotty.vs_liveticker',
- 'evaId=%d' % eva_id,
- 'boardType=dep',
- 'disableEquivs=yes',
- 'outputMode=tickerDataOnly',
- 'start=yes',
- ]),
- )
- print('request %s' % req.get_full_url())
- resp = urllib2.urlopen(req)
- resp_data = json.loads(
- resp.read().replace('journeysObj = ', ''),
- )
- departures = []
- for departure_data in resp_data['journey']:
- departure_time = datetime.datetime.strptime(departure_data['ti'], '%H:%M').replace(
- year=req_time.year,
- month=req_time.month,
- day=req_time.day,
- second=0,
- tzinfo=OEBB_TIMEZONE,
- )
- departures.append(Departure(
- line=departure_data['pr'],
- towards=html_parser.unescape(departure_data['lastStop']),
- predicted_time=departure_time,
- ))
- return departures
- def run(config_path):
- if config_path is None:
- available_config_paths = [
- p for p in DEFAULT_CONFIG_PATHS if os.path.exists(p)
- ]
- if len(available_config_paths) == 0:
- raise Exception('found no config file')
- config_path = available_config_paths[0]
- print('config path: %s' % config_path)
- with open(config_path, 'r') as config_file:
- config = yaml.load(config_file.read())
- if not 'update_interval_seconds' in config['wiener_linien']:
- config['wiener_linien']['update_interval_seconds'] = \
- WIENER_LINIEN_DEFAULT_UPDATE_INTERVAL_SECONDS
- if not 'oebb' in config:
- config['oebb'] = {}
- if not 'update_interval_seconds' in config['oebb']:
- config['oebb']['update_interval_seconds'] = \
- OEBB_DEFAULT_UPDATE_INTERVAL_SECONDS
- assert not oledExp.driverInit()
- assert not oledExp.setDisplayPower(1)
- wiener_linien_departures = []
- wiener_linien_last_update_time = None
- oebb_departures = []
- oebb_last_update_time = None
- while True:
- if wiener_linien_last_update_time is None \
- or time.time() - wiener_linien_last_update_time \
- > config['wiener_linien']['update_interval_seconds']:
- print('update wiener linien')
- try:
- wiener_linien_departures = request_wiener_linien_departures(
- api_key=config['wiener_linien']['api_key'],
- rbl=config['wiener_linien']['rbl'],
- )
- wiener_linien_last_update_time = time.time()
- except urllib2.HTTPError as e:
- print(e)
- if 'eva_id' in config['oebb'] \
- and (oebb_last_update_time is None
- or time.time() - oebb_last_update_time
- > config['oebb']['update_interval_seconds']):
- oebb_departures = request_oebb_departures(
- eva_id=config['oebb']['eva_id'],
- )
- oebb_last_update_time = time.time()
- oledExp.setCursor(0, 0)
- oledExp.write(datetime_now_local().strftime("%Y-%m-%d %H:%M:%S"))
- departures = wiener_linien_departures + oebb_departures
- for departure_idx, departure in enumerate(departures[:OLED_DISPLAY_HEIGHT - 1]):
- oledExp.setCursor(1 + departure_idx, 0)
- oled_write_line("%s %s %s" % (
- format_timedelta(departure.predicted_timedelta),
- departure.line.replace(' ', ''),
- oled_encode(departure.towards),
- ))
- time.sleep(0.1)
- def _init_argparser():
- import argparse
- argparser = argparse.ArgumentParser()
- argparser.add_argument(
- '-c', '--config-path',
- dest='config_path',
- type=str,
- default=None,
- help='default: %r' % DEFAULT_CONFIG_PATHS,
- )
- return argparser
- def main(argv):
- argparser = _init_argparser()
- args = argparser.parse_args(argv)
- run(**vars(args))
- return 0
- if __name__ == "__main__":
- import sys
- sys.exit(main(sys.argv[1:]))
|