#!/usr/bin/env python # -*- coding: utf-8 -*- # PYTHON_ARGCOMPLETE_OK import dingguo import re import os import sys import yaml import email import pprint import random import locale import argparse import datetime import traceback import subprocess import HTMLParser import argcomplete import BeautifulSoup class Order(object): def __init__(self, platform, order_id, order_date, customer_id = None): assert type(platform) is unicode self.platform = platform assert type(order_id) is unicode self.order_id = order_id if type(order_date) is datetime.datetime: order_date = order_date.date() assert type(order_date) is datetime.date self.order_date = order_date assert customer_id is None or type(customer_id) is unicode self.customer_id = customer_id self.items = [] self.discounts = [] def dict_repr(self): return {k: v for (k, v) in { 'articles': self.items, 'customer_id': self.customer_id, 'discounts': self.discounts, 'order_date': self.order_date.strftime('%Y-%m-%d'), 'order_id': self.order_id, 'platform': self.platform, }.items() if v is not None} yaml.SafeDumper.add_representer(Order, lambda dumper, order: dumper.represent_dict(order.dict_repr())) class Distance(dingguo.Figure): def __init__(self, value, unit): assert type(value) is float super(Distance, self).__init__(value, unit) def metres(self): if self.unit == 'km': return self.value * 1000 else: raise Exception() class Sum(object): def __init__(self, value, currency): assert type(value) is float self.value = value if currency == u'€': currency = u'EUR' assert type(currency) is unicode assert currency in [u'EUR'] self.currency = currency class Discount(object): def __init__( self, name = None, amount = None, ): assert type(name) is unicode self.name = name assert type(amount) is Sum assert amount.value >= 0 self.amount = amount def dict_repr(self): return { 'name': self.name, 'value': self.amount.value, 'value_currency': self.amount.currency, } yaml.SafeDumper.add_representer(Discount, lambda dumper, discount: dumper.represent_dict(discount.dict_repr())) class Item(object): def __init__( self, name = None, price_brutto = None, ): assert type(name) is unicode self.name = name assert type(price_brutto) is Sum self.price_brutto = price_brutto def dict_repr(self): return { 'name': self.name, 'price_brutto': self.price_brutto.value, 'price_brutto_currency': self.price_brutto.currency, } yaml.SafeDumper.add_representer(Item, lambda dumper, item: dumper.represent_dict(item.dict_repr())) class Article(Item): def __init__( self, quantity = None, authors = [], state = None, reseller = None, shipper = None, **kwargs ): super(Article, self).__init__(**kwargs) assert type(quantity) is int self.quantity = quantity assert type(authors) is list self.authors = authors assert state is None or type(state) is unicode self.state = state assert reseller is None or type(reseller) is unicode self.reseller = reseller assert shipper is None or type(shipper) is unicode self.shipper = shipper self.delivery_date = None def dict_repr(self): attr = Item.dict_repr(self) attr.update({ 'delivery_date': self.delivery_date, 'quantity': self.quantity, 'reseller': self.reseller, 'shipper': self.shipper, 'state': self.state, }) if len(self.authors) > 0: attr['authors'] = self.authors return attr yaml.SafeDumper.add_representer(Article, lambda dumper, article: dumper.represent_dict(article.dict_repr())) class Transportation(Item): def __init__( self, departure_point = None, destination_point = None, distance = None, route_map = None, **kwargs ): super(Transportation, self).__init__(**kwargs) assert type(departure_point) is unicode self.departure_point = departure_point assert type(destination_point) is unicode self.destination_point = destination_point assert distance is None or type(distance) is Distance self.distance = distance assert route_map is None or type(route_map) is str self.route_map = route_map def dict_repr(self): attr = Item.dict_repr(self) attr.update({ 'departure_point': self.departure_point, 'destination_point': self.destination_point, 'distance_metres': self.distance.metres() if self.distance else None, 'route_map': self.route_map, }) return attr yaml.SafeDumper.add_representer(Transportation, lambda dumper, transportation: dumper.represent_dict(transportation.dict_repr())) class TaxiRide(Transportation): def __init__(self, name = None, driver = None, arrival_time = None, departure_time = None, **kwargs): if name is None: name = u'Taxi Ride' super(TaxiRide, self).__init__(name = name, **kwargs) assert type(driver) is unicode self.driver = driver assert arrival_time is None or type(arrival_time) is datetime.datetime self.arrival_time = arrival_time assert departure_time is None or type(departure_time) is datetime.datetime self.departure_time = departure_time def dict_repr(self): attr = Transportation.dict_repr(self) attr.update({ 'arrival_time': self.arrival_time.strftime('%Y-%m-%d %H:%M') if self.arrival_time else None, 'departure_time': self.departure_time.strftime('%Y-%m-%d %H:%M') if self.departure_time else None, 'driver': self.driver, }) return attr yaml.SafeDumper.add_representer(TaxiRide, lambda dumper, taxi_ride: dumper.represent_dict(taxi_ride.dict_repr())) def parse_amazon(msg): msg_text = msg.get_payload()[0].get_payload(decode = True).decode('utf-8') if not u'Amazon.de Bestellbestätigung' in msg_text: raise Exception('no amazon order confirmation') orders = [] for order_text in re.split(ur'={32,}', msg_text)[1:-1]: order_id = re.search(r'Bestellnummer #(.+)', order_text).group(1) order_date_formatted = re.search(ur'Aufgegeben am (.+)', order_text, re.UNICODE).group(1) locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8') order_date = datetime.datetime.strptime(order_date_formatted.encode('utf-8'), '%d. %B %Y') order = Order( u'amazon.de', order_id, order_date ) articles_text = order_text.split('Bestellte(r) Artikel:')[1].split('_' * 10)[0].strip() for article_text in re.split(ur'\n\t*\n', articles_text): article_match = re.match( ur' *((?P\d+) x )?(?P.*)\n' + ur'( *von (?P.*)\n)?' + ur' *(?P[A-Z]+) (?P\d+,\d+)\n' + ur'( *Zustand: (?P.*)\n)?' + ur' *Verkauft von: (?P.*)' + ur'(\n *Versand durch (?P.*))?', article_text, re.MULTILINE | re.UNICODE ) if article_match is None: sys.stderr.write(repr(article_text) + '\n') raise Exception('could not match article') article = article_match.groupdict() order.items.append(Article( name = article['name'], price_brutto = Sum( float(article['price_brutto'].replace(',', '.')), article['price_brutto_currency'] ), quantity = int(article['quantity']) if article['quantity'] else 1, authors = article['authors'].split(',') if article['authors'] else [], state = article['state'], reseller = article['reseller'], shipper = article['shipper'], )) orders.append(order) return orders def parse_oebb(msg): msg_text = msg.get_payload()[0].get_payload(decode = True).decode('utf8') # msg_text = re.sub( # r'<[^>]+>', # '', # HTMLParser.HTMLParser().unescape(msg.get_payload(decode = True).decode('utf8')) # ) order_match = re.search( ur'Booking code:\s+(?P[\d ]+)\s+' + ur'Customer number:\s+(?PPV\d+)\s+' + ur'Booking date:\s+(?P.* \d{4})\s', msg_text, re.MULTILINE | re.UNICODE ) order_match_groups = order_match.groupdict() locale.setlocale(locale.LC_ALL, 'en_US.UTF-8') order_date = datetime.datetime.strptime( order_match_groups['order_date'], '%b %d, %Y' ) order = Order( u'oebb', order_match_groups['order_id'], order_date, customer_id = order_match_groups['customer_id'], ) item_match = re.search( ur'(?P.)(?P\d+\.\d+)' + ur'[\W\w]+' + ur'Your Booking\s+' + ur'(?P.*)\s+>\s+(?P.*)', msg_text, re.MULTILINE | re.UNICODE ) item = item_match.groupdict() order.items.append(Transportation( name = u'Train Ticket', price_brutto = Sum( float(item['price_brutto']), item['price_brutto_currency'], ), departure_point = item['departure_point'], destination_point = item['destination_point'], )) return [order] def parse_mytaxi(msg): if not 'mytaxi' in msg.get_payload()[0].get_payload()[0].get_payload(decode = True): raise Exception('no mytaxi mail') pdf_compressed = msg.get_payload()[1].get_payload(decode = True) pdftk = subprocess.Popen( ['pdftk - output - uncompress'], shell = True, stdin = subprocess.PIPE, stdout = subprocess.PIPE, ) pdf_uncompressed = pdftk.communicate( input = pdf_compressed, )[0].decode('latin-1') assert type(pdf_uncompressed) is unicode order_match = re.search( ur'Rechnungsnummer:[^\(]+\((?P\w+)\)', pdf_uncompressed, re.MULTILINE | re.UNICODE ) order_id = order_match.groupdict()['order_id'] ride_match_groups = re.search( ur'\(Bruttobetrag\)' + ur'[^\(]+' + ur'\((?P\d+,\d+) (?P.+)\)' + ur'[\w\W]+' + ur'\((?P[^\(]+)\)' + ur'[^\(]+' + ur'\(\d+,\d+ .\)' + ur'[^\(]+' + ur'\((?PTaxifahrt)' + ur'[^\(]+' + ur'\(von: (?P[^\)]+)' + ur'[^\(]+' + ur'\(nach: (?P[^\)]+)' + ur'[\w\W]+' + ur'Belegdatum \\\(Leistungszeitpunkt\\\):[^\(]+\((?P\d\d.\d\d.\d\d \d\d:\d\d)\)', pdf_uncompressed, re.MULTILINE | re.UNICODE ).groupdict() arrival_time = datetime.datetime.strptime( ride_match_groups['arrival_time'], '%d.%m.%y %H:%M' ) order = Order( u'mytaxi', order_id, arrival_time, ) locale.setlocale(locale.LC_ALL, 'en_US.UTF-8') order.items.append(TaxiRide( price_brutto = Sum( float(ride_match_groups['price_brutto'].replace(',', '.')), # why 0x80 ? u'EUR' if (ride_match_groups['price_brutto_currency'] == u'\x80') else ride_match_groups['price_brutto_currency'], ), departure_point = ride_match_groups['departure_point'], destination_point = ride_match_groups['destination_point'], driver = ride_match_groups['driver'], arrival_time = arrival_time, )) return [order] def parse_uber(msg): html = msg.get_payload()[0].get_payload(decode = True) """ document in html2 has the same structure as the one in html. only difference is that hyperlink urls in html2 have been replaced by 'email.uber.com/wf/click?upn=.*' urls. """ html2 = msg.get_payload()[1].get_payload()[0].get_payload(decode = True) route_map = msg.get_payload()[1].get_payload()[1].get_payload(decode = True) doc = BeautifulSoup.BeautifulSoup( html, convertEntities = BeautifulSoup.BeautifulSoup.HTML_ENTITIES, ) # strptime locale.setlocale(locale.LC_ALL, 'en_US.UTF-8') trip_id = re.search( ur'[\da-f\-]{36}', doc.find(text = 'Visit the trip page').parent['href'], ).group(0) order = Order( u'uber', trip_id, datetime.datetime.strptime( doc.find(attrs = {'class': 'date'}).text, '%B %d, %Y', ), ) departure_time_tag = doc.find(attrs = {'class': 'from time'}) departure_time = datetime.datetime.strptime( departure_time_tag.text, '%I:%M%p', ).time() arrival_time_tag = doc.find(attrs = {'class': 'to time'}) arrival_time = datetime.datetime.strptime( arrival_time_tag.text, '%I:%M%p', ).time() distance = Distance( float(doc.find(text = 'kilometers').parent.parent.find(attrs = {'class': 'data'}).text), u'km', ) fare = doc.find(attrs = {'class': 'header-price'}).find(attrs = {'class': 'header-fare text-pad'}).text order.items.append(TaxiRide( name = doc.find(text = 'CAR').parent.parent.find(attrs = {'class': 'data'}).text + ' Ride', price_brutto = Sum(float(fare[1:]), fare[0]), arrival_time = datetime.datetime.combine(order.order_date, arrival_time), departure_time = datetime.datetime.combine(order.order_date, departure_time), departure_point = departure_time_tag.parent.find(attrs = {'class': 'address'}).text, destination_point = arrival_time_tag.parent.find(attrs = {'class': 'address'}).text, distance = distance, driver = doc.find(attrs = {'class': 'driver-info'}).text[len('You rode with '):], route_map = route_map, )) return [order] def parse_yipbee(msg): text = msg.get_payload()[0].get_payload()[0].get_payload(decode = True).decode('utf-8') if not u'Vielen Dank für deine Bestellung bei yipbee' in text: raise Exception('no yipbee confirmation') order_match_groups = re.search( ur'[\W\w]+' + ur'BESTELLUNG: (?P\w+) vom (?P\d\d.\d\d.\d{4} \d\d:\d\d:\d\d)' + ur'[\W\w]+' + ur'GESAMTPREIS\s+' + ur'(?P[\W\w]+)' + ur'(?PARTIKEL [\W\w]+)', text, re.UNICODE ).groupdict() order = Order( u'yipbee', order_match_groups['order_id'], datetime.datetime.strptime(order_match_groups['order_time'], '%d.%m.%Y %H:%M:%S'), ) for article_match in re.finditer( ur'(?P[\w\-\.\:,%\(\) ]+ (Klasse \d|[\w\-\. ]+[^\d ]))' + ur'(?P\d+,\d\d) €(?P\d)(?P\d+,\d\d) €', order_match_groups['articles_and_discount_text'].replace('\n', ' '), re.UNICODE, ): article_match_groups = article_match.groupdict() total_price = float(article_match_groups['total_price'].replace(',', '.')) total_price_2 = float(article_match_groups['total_price_2'].replace(',', '.')) assert abs(total_price - total_price_2) < 0.01, 'expected %f, received %f' % (total_price, total_price_2) quantity = int(article_match_groups['quantity']) order.items.append(Article( name = article_match_groups['name'], price_brutto = Sum(round(total_price / quantity, 2), u'EUR'), quantity = quantity, reseller = u'yipbee', shipper = u'yipbee', )) articles_price = float(text.split('RABATTE')[0].split('ARTIKEL')[-1].strip().split(' ')[0].replace(',', '.')) assert abs(articles_price - sum([a.price_brutto.value * a.quantity for a in order.items])) < 0.01 discount_tag = BeautifulSoup.BeautifulSoup( order_match_groups['articles_and_discount_text'], convertEntities = BeautifulSoup.BeautifulSoup.HTML_ENTITIES, ).find('tr') if discount_tag: name_tag, value_tag = discount_tag.findAll('td', recursive = False) value, currency = value_tag.text.split(' ') order.discounts.append(Discount( name = name_tag.text, amount = Sum(float(value.replace(',', '.')) * -1, currency), )) delivery_price = order_match_groups['summary_text'].split('VERSAND')[1].split('STEUERN')[0].strip() delivery_price_value, delivery_price_currency = delivery_price.split(' ') order.items.append(Item( name = u'Delivery', price_brutto = Sum(float(delivery_price_value.replace(',', '.')), delivery_price_currency), )) return [order] def parse_yipbee_html(msg): html = msg.get_payload()[0].get_payload()[1].get_payload(decode = True) if not 'yipbee' in html: raise Exception('no yipbee confirmation') doc = BeautifulSoup.BeautifulSoup(html, convertEntities = BeautifulSoup.BeautifulSoup.HTML_ENTITIES) content_table = doc.find('table') order_match_groups = re.search( ur'Bestellung:(?P\w+) vom (?P\d\d.\d\d.\d{4} \d\d:\d\d:\d\d)', content_table.find('table').findAll('tr')[3].text, re.UNICODE ).groupdict() order = Order( u'yipbee', order_match_groups['order_id'], datetime.datetime.strptime(order_match_groups['order_time'], '%d.%m.%Y %H:%M:%S'), ) articles_table = content_table.find('table').find('tbody').findAll('tr', recursive = False)[4].find('table') for article_row in articles_table.find('tbody').findAll('tr', recursive = False)[1:]: article_columns = article_row.findAll('td', recursive = False) (price, currency) = re.sub(ur'\s+', ' ', article_columns[2].text.replace(u',', u'.')).split(' ') order.items.append(Article( name = article_columns[1].text, price_brutto = Sum(float(price), currency), quantity = int(article_columns[3].text), reseller = u'yipbee', shipper = u'yipbee', )) discount_row = content_table.find('table').find('tbody').findAll('tr', recursive = False)[6] (discount_name, discount_value_with_currency) = [c.text for c in discount_row.findAll('td', recursive = False)] (discount_value, discount_currency) = discount_value_with_currency.split(' ') order.discounts.append(Discount( name = discount_name, amount = Sum(float(discount_value.replace(',', '.')) * -1, discount_currency) )) shipping_costs_table = content_table.find('tbody').findAll('tr', recursive = False)[3].findAll('table')[1] (shipping_price, shipping_currency) = shipping_costs_table.text.replace(',', '.').split(' ') order.items.append(Item( name = u'Delivery', price_brutto = Sum(float(shipping_price), shipping_currency), )) return [order] def parse_lieferservice(msg): text = msg.get_payload()[0].get_payload(decode = True).decode('utf-8').replace('\r\n', '\n') assert type(text) is unicode if not 'Lieferservice.at' in text: raise Exception('no lieferservice.at confirmation') order_match = re.search( ur'Your order \(.+\) at (?P.*)\s+' + ur'Your order reference is: (?P.*)\s+' + ur'[\W\w]+' + ur'Your order\s+' + ur'(?P[\W\w]+)' + ur'Delivery costs:\s+(?P.*)\s+', text, re.UNICODE, ) order_match_groups = order_match.groupdict() import time import email.utils order_date = datetime.datetime.fromtimestamp( time.mktime(email.utils.parsedate(msg['Date'])) ) order = Order( u'lieferservice.at', order_match_groups['order_id'].strip(), order_date ) for article_match in re.finditer( ur'(?P\d+)x\s' + ur'(?P.*)\s' + ur'(?P.) (?P-?\d+,\d+)\s', order_match_groups['orders_text'], re.UNICODE, ): article_match_groups = article_match.groupdict() quantity = int(article_match_groups['quantity']) assert quantity == 1 name = re.sub(ur' +', ' ', article_match_groups['name']) price = Sum( float(article_match_groups['price'].replace(',', '.')), article_match_groups['currency'], ) if price.value < 0: price.value *= -1 order.discounts.append(Discount( name = name, amount = price, )) else: order.items.append(Article( name = name, quantity = 1, price_brutto = price, reseller = order_match_groups['restaurant'], shipper = order_match_groups['restaurant'], )) delivery_costs = order_match_groups['delivery_costs'].strip() assert delivery_costs == 'FREE' order.items.append(Item( name = u'Delivery', price_brutto = Sum(float('0'.replace(',', '.')), u'EUR'), )) return [order] def parse(msg): tracebacks = {} try: return parse_amazon(msg) except: tracebacks['amazon'] = traceback.format_exc() try: return parse_oebb(msg) except: tracebacks['oebb'] = traceback.format_exc() try: return parse_lieferservice(msg) except: tracebacks['lieferservice'] = traceback.format_exc() try: return parse_mytaxi(msg) except: tracebacks['mytaxi'] = traceback.format_exc() try: return parse_uber(msg) except: tracebacks['uber'] = traceback.format_exc() try: return parse_yipbee(msg) except: tracebacks['yipbee'] = traceback.format_exc() for parser_name in tracebacks: sys.stderr.write('%s parser: \n%s\n' % (parser_name, tracebacks[parser_name])) raise Exception('failed to parse') def compute(register_path): msg = email.message_from_string(sys.stdin.read()) orders = parse(msg) if register_path: with open(register_path, 'r') as register: registered_orders = yaml.load(register.read().decode('utf-8')) if not registered_orders: registered_orders = {} for order in orders: if order.platform not in registered_orders: registered_orders[order.platform] = {} if order.order_id in registered_orders[order.platform]: raise Exception('already registered') registered_orders[order.platform][order.order_id] = order with open(register_path, 'w') as register: register.write(yaml.safe_dump(registered_orders, default_flow_style = False)) else: print(yaml.safe_dump(orders, default_flow_style = False)) def _init_argparser(): argparser = argparse.ArgumentParser(description = None) argparser.add_argument('--register', metavar = 'path', dest = 'register_path') return argparser def main(argv): argparser = _init_argparser() argcomplete.autocomplete(argparser) args = argparser.parse_args(argv) compute(**vars(args)) return 0 if __name__ == "__main__": sys.exit(main(sys.argv[1:]))