123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import HTMLParser
- import datetime as dt
- import dateutil.parser
- import dateutil.tz
- import json
- import os
- import socket
- import sys
- import time
- import urllib2
- import yaml
- from OmegaExpansion import oledExp
- DEFAULT_CONFIG_PATHS = [
- os.path.join(os.path.expanduser('~'), '.omegalines'),
- os.path.join(os.sep, 'etc', 'omegalines'),
- ]
- OLED_DISPLAY_HEIGHT = 8
- OLED_DISPLAY_WIDTH = 21
- WIENER_LINIEN_DEFAULT_UPDATE_INTERVAL_SECONDS = 10
- OEBB_DEFAULT_UPDATE_INTERVAL_SECONDS = 30
- # https://openwrt.org/docs/user-guide/system_configuration
- OEBB_TIMEZONE = dateutil.tz.tzstr('CET-1CEST,M3.5.0,M10.5.0/3')
- REQUEST_TIMEOUT_SECONDS = 10
- html_parser = HTMLParser.HTMLParser()
- def datetime_now_local():
- return dt.datetime.now(dateutil.tz.tzlocal())
- def format_timedelta(timedelta):
- total_seconds = timedelta.total_seconds()
- return '%s%d:%02d' % (
- '-' if total_seconds < 0 else '',
- int(abs(total_seconds) / 60),
- abs(total_seconds) % 60,
- )
- assert "0:20" == format_timedelta(dt.timedelta(seconds=20))
- assert "1:20" == format_timedelta(dt.timedelta(seconds=80))
- assert "2:00" == format_timedelta(dt.timedelta(seconds=120))
- assert "-0:20" == format_timedelta(dt.timedelta(seconds=-20))
- assert "-1:20" == format_timedelta(dt.timedelta(seconds=-80))
- assert "-2:00" == format_timedelta(dt.timedelta(seconds=-120))
- def parse_oebb_datetime(date_str, time_str):
- return dt.datetime.combine(
- dt.datetime.strptime(date_str, '%d.%m.%Y').date(),
- dt.datetime.strptime(time_str, '%H:%M').time().replace(second=0),
- ).replace(tzinfo=OEBB_TIMEZONE)
- assert '2018-02-22T09:46:00+01:00' == \
- parse_oebb_datetime(u'22.02.2018', u'09:46').isoformat()
- def oled_write_line(line):
- oledExp.write(
- line.ljust(OLED_DISPLAY_WIDTH, ' ')[:OLED_DISPLAY_WIDTH],
- )
- def oled_encode(text):
- return text.replace(u'ä', u'ae') \
- .replace(u'ö', u'oe') \
- .replace(u'ü', u'ue')
- class Departure:
- def __init__(self, line, towards, predicted_time):
- self.line = line
- self.towards = towards
- self.predicted_time = predicted_time
- @property
- def predicted_timedelta(self):
- return self.predicted_time - datetime_now_local()
- def request_wiener_linien_departures(api_key, rbl):
- req = urllib2.Request(
- "https://www.wienerlinien.at/ogd_realtime/monitor?sender=%s&rbl=%s"
- % (api_key, rbl),
- )
- req.add_header("Accept", "application/json")
- req.add_header("Content-Type", "application/json")
- req_time = datetime_now_local()
- resp = urllib2.urlopen(req, timeout=REQUEST_TIMEOUT_SECONDS)
- resp_data = json.loads(resp.read())
- # dt.datetime.strptime:
- # ValueError: 'z' is a bad directive in format
- # '%Y-%m-%dT%H:%M:%S.%f%z'
- server_time_delta = req_time - \
- dateutil.parser.parse(resp_data['message']['serverTime'])
- monitors_data = resp_data['data']['monitors']
- if len(monitors_data) == 0:
- return []
- else:
- assert 1 == len(monitors_data), monitors_data
- departures = []
- for line_data in monitors_data[0]['lines']:
- departures.extend(departures_from_wiener_linien_line_data(
- line_data=line_data,
- server_time_delta=server_time_delta,
- ))
- return departures
- def departures_from_wiener_linien_line_data(line_data, server_time_delta):
- assert 1 == len(line_data['departures']), line_data
- departures = []
- for departure_data in line_data['departures']['departure']:
- if 'timeReal' in departure_data['departureTime']:
- predicted_time_server = dateutil.parser.parse(
- departure_data['departureTime']['timeReal'],
- )
- else:
- predicted_time_server = dateutil.parser.parse(
- departure_data['departureTime']['timePlanned'],
- ) + dt.timedelta(
- minutes=int(departure_data['departureTime']['countdown']),
- )
- departures.append(Departure(
- line=departure_data['vehicle']['name']
- if 'vehicle' in departure_data else line_data['name'],
- towards=departure_data['vehicle']['towards']
- if 'vehicle' in departure_data else line_data['towards'],
- predicted_time=predicted_time_server - server_time_delta,
- ))
- return departures
- def request_oebb_departures(eva_id):
- req_time = datetime_now_local()
- req = urllib2.Request(
- 'http://fahrplan.oebb.at/bin/stboard.exe/dn?' + '&'.join([
- 'L=vs_scotty.vs_liveticker',
- 'evaId=%d' % eva_id,
- 'boardType=dep',
- 'disableEquivs=yes',
- 'outputMode=tickerDataOnly',
- 'start=yes',
- ]),
- )
- print('request %s' % req.get_full_url())
- resp = urllib2.urlopen(req, timeout=REQUEST_TIMEOUT_SECONDS)
- resp_data = json.loads(
- resp.read().replace('journeysObj = ', ''),
- )
- departures = []
- for departure_data in resp_data.get('journey', []):
- """
- pr: line (u'R 2323')
- lastStop (u'Wr.Neustadt Hbf)
- da: planned departure date (u'22.02.2018')
- ti: planned departure time (u'09:42')
- rt: dict if delayed, otherwise False
- rt.status (e.g. u'Ausfall)
- rt.dld: estimated departure date
- rt.dlt: estimated departure time
- """
- if departure_data['rt']:
- if departure_data['rt']['dlt'] == '': # canceled?
- predicted_time = None
- else: # delayed
- predicted_time = parse_oebb_datetime(
- departure_data['rt']['dld'],
- departure_data['rt']['dlt'],
- )
- else: # on time
- predicted_time = parse_oebb_datetime(
- departure_data['da'],
- departure_data['ti'],
- )
- if predicted_time:
- departures.append(Departure(
- line=departure_data['pr'],
- towards=html_parser.unescape(departure_data['lastStop']),
- predicted_time=predicted_time,
- ))
- return departures
- def draw_departures(departures, indicate_error=False):
- oledExp.setCursor(0, 0)
- headline = datetime_now_local().strftime("%Y-%m-%d %H:%M:%S")
- if indicate_error:
- headline += 'E'
- oled_write_line(headline)
- departures.sort(key=lambda d: d.predicted_time)
- for departure_idx, departure in enumerate(departures[:OLED_DISPLAY_HEIGHT - 1]):
- oledExp.setCursor(1 + departure_idx, 0)
- oled_write_line("%s %s %s" % (
- format_timedelta(departure.predicted_timedelta),
- departure.line.replace(' ', ''),
- oled_encode(departure.towards),
- ))
- def run(config_path):
- if config_path is None:
- available_config_paths = [
- p for p in DEFAULT_CONFIG_PATHS if os.path.exists(p)
- ]
- if len(available_config_paths) == 0:
- raise Exception('found no config file')
- config_path = available_config_paths[0]
- print('config path: %s' % config_path)
- with open(config_path, 'r') as config_file:
- config = yaml.load(config_file.read())
- if not 'update_interval_seconds' in config['wiener_linien']:
- config['wiener_linien']['update_interval_seconds'] = \
- WIENER_LINIEN_DEFAULT_UPDATE_INTERVAL_SECONDS
- if not 'oebb' in config:
- config['oebb'] = {}
- if not 'update_interval_seconds' in config['oebb']:
- config['oebb']['update_interval_seconds'] = \
- OEBB_DEFAULT_UPDATE_INTERVAL_SECONDS
- assert not oledExp.driverInit()
- assert not oledExp.setDisplayPower(1)
- wiener_linien_departures = []
- wiener_linien_last_update_time = None
- oebb_departures = []
- oebb_last_update_time = None
- while True:
- if wiener_linien_last_update_time is None \
- or time.time() - wiener_linien_last_update_time \
- > config['wiener_linien']['update_interval_seconds']:
- print('update wiener linien')
- wiener_linien_error = False
- try:
- wiener_linien_departures = request_wiener_linien_departures(
- api_key=config['wiener_linien']['api_key'],
- rbl=config['wiener_linien']['rbl'],
- )
- except (urllib2.URLError, socket.timeout) as e:
- wiener_linien_departures = []
- wiener_linien_error = True
- print(e)
- wiener_linien_last_update_time = time.time()
- if 'eva_ids' in config['oebb'] \
- and (oebb_last_update_time is None
- or time.time() - oebb_last_update_time
- > config['oebb']['update_interval_seconds']):
- oebb_departures = []
- oebb_error = False
- for eva_id in config['oebb']['eva_ids']:
- try:
- oebb_departures.extend(request_oebb_departures(eva_id))
- except (urllib2.URLError, socket.timeout) as e:
- oebb_error = True
- print(e)
- oebb_last_update_time = time.time()
- departures = wiener_linien_departures + oebb_departures
- if 'offset_seconds' in config:
- current_time = datetime_now_local()
- departures = filter(
- lambda d: (d.predicted_time - current_time).total_seconds()
- >= config['offset_seconds'],
- departures,
- )
- draw_departures(
- departures=departures,
- indicate_error=wiener_linien_error or oebb_error,
- )
- time.sleep(0.1)
- def _init_argparser():
- import argparse
- argparser = argparse.ArgumentParser()
- argparser.add_argument(
- '-c', '--config-path',
- dest='config_path',
- type=str,
- default=None,
- help='default: %r' % DEFAULT_CONFIG_PATHS,
- )
- return argparser
- def main(argv):
- argparser = _init_argparser()
- args = argparser.parse_args(argv)
- run(**vars(args))
- return 0
- if __name__ == "__main__":
- import sys
- sys.exit(main(sys.argv[1:]))
|