123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # PYTHON_ARGCOMPLETE_OK
- import re
- import os
- import sys
- import yaml
- import email
- import pprint
- import random
- import locale
- import argparse
- import datetime
- import traceback
- import subprocess
- import HTMLParser
- import argcomplete
- import BeautifulSoup
- class Order(object):
- def __init__(self, platform, order_id, order_date, customer_id = None):
- assert type(platform) is unicode
- self.platform = platform
- assert type(order_id) is unicode
- self.order_id = order_id
- assert type(order_date) is datetime.datetime
- self.order_date = order_date
- assert customer_id is None or type(customer_id) is unicode
- self.customer_id = customer_id
- self.items = []
- self.discounts = []
- def dict_repr(self):
- return {k: v for (k, v) in {
- 'articles': self.items,
- 'customer_id': self.customer_id,
- 'discounts': self.discounts,
- 'order_date': self.order_date.strftime('%Y-%m-%d'),
- 'order_id': self.order_id,
- 'platform': self.platform,
- }.items() if v is not None}
- yaml.SafeDumper.add_representer(Order, lambda dumper, order: dumper.represent_dict(order.dict_repr()))
- class Sum(object):
- def __init__(self, value, currency):
- assert type(value) is float
- self.value = value
- if currency == u'€':
- currency = u'EUR'
- assert type(currency) is unicode
- assert currency in [u'EUR']
- self.currency = currency
- class Discount(object):
- def __init__(
- self,
- name = None,
- amount = None,
- ):
- assert type(name) is unicode
- self.name = name
- assert type(amount) is Sum
- assert amount.value >= 0
- self.amount = amount
- def dict_repr(self):
- return {
- 'name': self.name,
- 'value': self.amount.value,
- 'value_currency': self.amount.currency,
- }
- yaml.SafeDumper.add_representer(Discount, lambda dumper, discount: dumper.represent_dict(discount.dict_repr()))
- class Item(object):
- def __init__(
- self,
- name = None,
- price_brutto = None,
- ):
- assert type(name) is unicode
- self.name = name
- assert type(price_brutto) is Sum
- self.price_brutto = price_brutto
- def dict_repr(self):
- return {
- 'name': self.name,
- 'price_brutto': self.price_brutto.value,
- 'price_brutto_currency': self.price_brutto.currency,
- }
- yaml.SafeDumper.add_representer(Item, lambda dumper, item: dumper.represent_dict(item.dict_repr()))
- class Article(Item):
- def __init__(
- self,
- quantity = None,
- authors = [],
- state = None,
- reseller = None,
- shipper = None,
- **kwargs
- ):
- super(Article, self).__init__(**kwargs)
- assert type(quantity) is int
- self.quantity = quantity
- assert type(authors) is list
- self.authors = authors
- assert state is None or type(state) is unicode
- self.state = state
- assert reseller is None or type(reseller) is unicode
- self.reseller = reseller
- assert shipper is None or type(shipper) is unicode
- self.shipper = shipper
- self.delivery_date = None
- def dict_repr(self):
- attr = Item.dict_repr(self)
- attr.update({
- 'delivery_date': self.delivery_date,
- 'quantity': self.quantity,
- 'reseller': self.reseller,
- 'shipper': self.shipper,
- 'state': self.state,
- })
- if len(self.authors) > 0:
- attr['authors'] = self.authors
- return attr
- yaml.SafeDumper.add_representer(Article, lambda dumper, article: dumper.represent_dict(article.dict_repr()))
- class Transportation(Item):
- def __init__(self, departure_point = None, destination_point = None, **kwargs):
- super(Transportation, self).__init__(**kwargs)
- assert type(departure_point) is unicode
- self.departure_point = departure_point
- assert type(destination_point) is unicode
- self.destination_point = destination_point
- def dict_repr(self):
- attr = Item.dict_repr(self)
- attr.update({
- 'departure_point': self.departure_point,
- 'destination_point': self.destination_point,
- })
- return attr
- yaml.SafeDumper.add_representer(Transportation, lambda dumper, transportation: dumper.represent_dict(transportation.dict_repr()))
- class TaxiRide(Transportation):
- def __init__(self, driver = None, arrival_time = None, **kwargs):
- super(TaxiRide, self).__init__(name = u'Taxi Ride', **kwargs)
- assert type(driver) is unicode
- self.driver = driver
- assert type(arrival_time) is datetime.datetime
- self.arrival_time = arrival_time
- def dict_repr(self):
- attr = Transportation.dict_repr(self)
- attr.update({
- 'driver': self.driver,
- 'arrival_time': self.arrival_time.strftime('%Y-%m-%d %H:%M'),
- })
- return attr
- yaml.SafeDumper.add_representer(TaxiRide, lambda dumper, taxi_ride: dumper.represent_dict(taxi_ride.dict_repr()))
- def parse_amazon(msg):
- msg_text = msg.get_payload()[0].get_payload(decode = True).decode('utf-8')
- order_id = re.search(r'Bestellnummer #(.+)', msg_text).group(1)
- order_date_formatted = re.search(ur'Aufgegeben am (.+)', msg_text, re.UNICODE).group(1)
- locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8')
- order_date = datetime.datetime.strptime(order_date_formatted.encode('utf-8'), '%d. %B %Y')
- order = Order(
- u'amazon.de',
- order_id,
- order_date
- )
- articles_text = msg_text.split('Bestellte(r) Artikel:')[1].split('_' * 10)[0].strip()
- for article_text in re.split(ur'\n\t*\n', articles_text):
- article_match = re.match(
- ur' *((?P<quantity>\d+) x )?(?P<name>.*)\n'
- + ur'( *von (?P<authors>.*)\n)?'
- + ur' *(?P<price_brutto_currency>[A-Z]+) (?P<price_brutto>\d+,\d+)\n'
- + ur'( *Zustand: (?P<state>.*)\n)?'
- + ur' *Verkauft von: (?P<reseller>.*)'
- + ur'(\n *Versand durch (?P<shipper>.*))?',
- article_text,
- re.MULTILINE | re.UNICODE
- )
- if article_match is None:
- sys.stderr.write(repr(article_text) + '\n')
- raise Exception('could not match article')
- article = article_match.groupdict()
- order.items.append(Article(
- name = article['name'],
- price_brutto = Sum(
- float(article['price_brutto'].replace(',', '.')),
- article['price_brutto_currency']
- ),
- quantity = int(article['quantity']) if article['quantity'] else 1,
- authors = article['authors'].split(',') if article['authors'] else [],
- state = article['state'],
- reseller = article['reseller'],
- shipper = article['shipper'],
- ))
- return order
- def parse_oebb(msg):
- msg_text = msg.get_payload()[0].get_payload(decode = True).decode('utf8')
- # msg_text = re.sub(
- # r'<[^>]+>',
- # '',
- # HTMLParser.HTMLParser().unescape(msg.get_payload(decode = True).decode('utf8'))
- # )
- order_match = re.search(
- ur'Booking code:\s+(?P<order_id>[\d ]+)\s+'
- + ur'Customer number:\s+(?P<customer_id>PV\d+)\s+'
- + ur'Booking date:\s+(?P<order_date>.* \d{4})\s',
- msg_text,
- re.MULTILINE | re.UNICODE
- )
- order_match_groups = order_match.groupdict()
- locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
- order_date = datetime.datetime.strptime(
- order_match_groups['order_date'],
- '%b %d, %Y'
- )
- order = Order(
- u'oebb',
- order_match_groups['order_id'],
- order_date,
- customer_id = order_match_groups['customer_id'],
- )
- item_match = re.search(
- ur'(?P<price_brutto_currency>.)(?P<price_brutto>\d+\.\d+)'
- + ur'[\W\w]+'
- + ur'Your Booking\s+'
- + ur'(?P<departure_point>.*)\s+>\s+(?P<destination_point>.*)',
- msg_text,
- re.MULTILINE | re.UNICODE
- )
- item = item_match.groupdict()
- order.items.append(Transportation(
- name = u'Train Ticket',
- price_brutto = Sum(
- float(item['price_brutto']),
- item['price_brutto_currency'],
- ),
- departure_point = item['departure_point'],
- destination_point = item['destination_point'],
- ))
- return order
- def parse_mytaxi(msg):
- if not 'mytaxi' in msg.get_payload()[0].get_payload()[0].get_payload(decode = True):
- raise Exception('no mytaxi mail')
- pdf_compressed = msg.get_payload()[1].get_payload(decode = True)
- pdftk = subprocess.Popen(
- ['pdftk - output - uncompress'],
- shell = True,
- stdin = subprocess.PIPE,
- stdout = subprocess.PIPE,
- )
- pdf_uncompressed = pdftk.communicate(
- input = pdf_compressed,
- )[0].decode('latin-1')
- assert type(pdf_uncompressed) is unicode
- order_match = re.search(
- ur'Rechnungsnummer:[^\(]+\((?P<order_id>\w+)\)',
- pdf_uncompressed,
- re.MULTILINE | re.UNICODE
- )
- order_id = order_match.groupdict()['order_id']
- ride_match_groups = re.search(
- ur'\(Bruttobetrag\)'
- + ur'[^\(]+'
- + ur'\((?P<price_brutto>\d+,\d+) (?P<price_brutto_currency>.+)\)'
- + ur'[\w\W]+'
- + ur'\((?P<driver>[^\(]+)\)'
- + ur'[^\(]+'
- + ur'\(\d+,\d+ .\)'
- + ur'[^\(]+'
- + ur'\((?P<name>Taxifahrt)'
- + ur'[^\(]+'
- + ur'\(von: (?P<departure_point>[^\)]+)'
- + ur'[^\(]+'
- + ur'\(nach: (?P<destination_point>[^\)]+)'
- + ur'[\w\W]+'
- + ur'Belegdatum \\\(Leistungszeitpunkt\\\):[^\(]+\((?P<arrival_time>\d\d.\d\d.\d\d \d\d:\d\d)\)',
- pdf_uncompressed,
- re.MULTILINE | re.UNICODE
- ).groupdict()
- arrival_time = datetime.datetime.strptime(
- ride_match_groups['arrival_time'],
- '%d.%m.%y %H:%M'
- )
- order = Order(
- u'mytaxi',
- order_id,
- arrival_time,
- )
- locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
- order.items.append(TaxiRide(
- price_brutto = Sum(
- float(ride_match_groups['price_brutto'].replace(',', '.')),
- # why 0x80 ?
- u'EUR' if (ride_match_groups['price_brutto_currency'] == u'\x80')
- else ride_match_groups['price_brutto_currency'],
- ),
- departure_point = ride_match_groups['departure_point'],
- destination_point = ride_match_groups['destination_point'],
- driver = ride_match_groups['driver'],
- arrival_time = arrival_time,
- ))
- return order
- def parse_yipbee(msg):
- html = msg.get_payload()[0].get_payload()[1].get_payload(decode = True)
- if not 'yipbee' in html:
- raise Exception('no yipbee confirmation')
- doc = BeautifulSoup.BeautifulSoup(html, convertEntities = BeautifulSoup.BeautifulSoup.HTML_ENTITIES)
- content_table = doc.find('table')
- order_match_groups = re.search(
- ur'Bestellung:(?P<order_id>\w+) vom (?P<order_time>\d\d.\d\d.\d{4} \d\d:\d\d:\d\d)',
- content_table.find('table').findAll('tr')[3].text,
- re.UNICODE
- ).groupdict()
- order = Order(
- u'yipbee',
- order_match_groups['order_id'],
- datetime.datetime.strptime(order_match_groups['order_time'], '%d.%m.%Y %H:%M:%S'),
- )
- articles_table = content_table.find('table').find('tbody').findAll('tr', recursive = False)[4].find('table')
- for article_row in articles_table.find('tbody').findAll('tr', recursive = False)[1:]:
- article_columns = article_row.findAll('td', recursive = False)
- (price, currency) = re.sub(ur'\s+', ' ', article_columns[2].text.replace(u',', u'.')).split(' ')
- order.items.append(Article(
- name = article_columns[1].text,
- price_brutto = Sum(float(price), currency),
- quantity = int(article_columns[3].text),
- reseller = u'yipbee',
- shipper = u'yipbee',
- ))
- discount_row = content_table.find('table').find('tbody').findAll('tr', recursive = False)[6]
- (discount_name, discount_value_with_currency) = [c.text for c in discount_row.findAll('td', recursive = False)]
- (discount_value, discount_currency) = discount_value_with_currency.split(' ')
- order.discounts.append(Discount(
- name = discount_name,
- amount = Sum(float(discount_value.replace(',', '.')) * -1, discount_currency)
- ))
- shipping_costs_table = content_table.find('tbody').findAll('tr', recursive = False)[3].findAll('table')[1]
- (shipping_price, shipping_currency) = shipping_costs_table.text.replace(',', '.').split(' ')
- order.items.append(Item(
- name = u'Delivery',
- price_brutto = Sum(float(shipping_price), shipping_currency),
- ))
- return order
- def parse(msg):
- tracebacks = {}
- try:
- return parse_amazon(msg)
- except:
- tracebacks['amazon'] = traceback.format_exc()
- try:
- return parse_oebb(msg)
- except:
- tracebacks['oebb'] = traceback.format_exc()
- try:
- return parse_mytaxi(msg)
- except:
- tracebacks['mytaxi'] = traceback.format_exc()
- try:
- return parse_yipbee(msg)
- except:
- tracebacks['yipbee'] = traceback.format_exc()
- for parser_name in tracebacks:
- sys.stderr.write('%s parser: \n%s\n' % (parser_name, tracebacks[parser_name]))
- raise Exception('failed to parse')
- def compute(register_path):
- msg = email.message_from_string(sys.stdin.read())
- order = parse(msg)
- if register_path:
- with open(register_path, 'r') as register:
- orders = yaml.load(register.read().decode('utf-8'))
- if not orders:
- orders = {}
- if order.platform not in orders:
- orders[order.platform] = {}
- if order.order_id in orders[order.platform]:
- raise Exception('already registered')
- orders[order.platform][order.order_id] = order
- with open(register_path, 'w') as register:
- register.write(yaml.safe_dump(orders, default_flow_style = False))
- else:
- print(yaml.safe_dump(order, default_flow_style = False))
- def _init_argparser():
- argparser = argparse.ArgumentParser(description = None)
- argparser.add_argument('--register', metavar = 'path', dest = 'register_path')
- return argparser
- def main(argv):
- argparser = _init_argparser()
- argcomplete.autocomplete(argparser)
- args = argparser.parse_args(argv)
- compute(**vars(args))
- return 0
- if __name__ == "__main__":
- sys.exit(main(sys.argv[1:]))
|