__init__.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. import argparse
  2. import json
  3. import logging
  4. import os
  5. import pathlib
  6. import random
  7. import typing
  8. import urllib.request
  9. import telegram.ext
  10. import telegram.update
  11. _LOGGER = logging.getLogger(__name__)
  12. class _Photo:
  13. def __init__(
  14. self, photo_url: str, description_url: str, latitude: float, longitude: float
  15. ) -> None:
  16. self.photo_url = photo_url
  17. self.description_url = description_url
  18. self.latitude = latitude
  19. self.longitude = longitude
  20. def __str__(self) -> str:
  21. return "photo " + self.description_url
  22. @classmethod
  23. def from_wikimap_export(cls, data: dict) -> "_Photo":
  24. if isinstance(data["coordinates"], list):
  25. coords = data["coordinates"][0]
  26. else:
  27. coords = data["coordinates"]["1"]
  28. assert len(data["imageinfo"]) == 1, data["imageinfo"]
  29. return cls(
  30. latitude=coords["lat"],
  31. longitude=coords["lon"],
  32. photo_url=data["imageinfo"][0]["url"],
  33. description_url=data["imageinfo"][0]["descriptionurl"],
  34. )
  35. def _photo_command(
  36. update: telegram.update.Update,
  37. context: telegram.ext.callbackcontext.CallbackContext,
  38. ):
  39. if "last_photo_message_id" in context.chat_data:
  40. update.effective_chat.send_message(
  41. text="Lösung: {}".format(context.chat_data["last_photo"].description_url),
  42. disable_web_page_preview=True,
  43. reply_to_message_id=context.chat_data["last_photo_message_id"],
  44. )
  45. # https://github.com/python-telegram-bot/python-telegram-bot/pull/2043
  46. update.effective_chat.send_location(
  47. latitude=context.chat_data["last_photo"].latitude,
  48. longitude=context.chat_data["last_photo"].longitude,
  49. disable_notification=True,
  50. )
  51. context.chat_data["last_photo_message_id"] = None
  52. update.effective_chat.send_message(
  53. text="Neues Photo wird ausgewählt und gesendet.", disable_notification=True
  54. )
  55. while True:
  56. photo = random.choice(context.bot_data["photos"])
  57. _LOGGER.info("sending %s", photo)
  58. try:
  59. with urllib.request.urlopen(photo.photo_url) as photo_response:
  60. photo_message = update.effective_chat.send_photo(
  61. photo=photo_response, caption="Wo wurde dieses Photo aufgenommen?",
  62. )
  63. except telegram.error.BadRequest:
  64. _LOGGER.warning("file size limit exceeded?", exc_info=True)
  65. except telegram.error.TimedOut:
  66. _LOGGER.warning("timeout", exc_info=True)
  67. else:
  68. break
  69. context.chat_data["last_photo"] = photo
  70. context.chat_data["last_photo_message_id"] = photo_message.message_id
  71. class _Persistence(telegram.ext.BasePersistence):
  72. """
  73. found no easier way to inject bot_data
  74. https://python-telegram-bot.readthedocs.io/en/latest/telegram.ext.basepersistence.html
  75. """
  76. def __init__(self, photos: typing.List[_Photo]) -> None:
  77. self._bot_data = {"photos": photos}
  78. super().__init__(
  79. store_bot_data=True, store_chat_data=False, store_user_data=False
  80. )
  81. def get_user_data(self) -> dict:
  82. return {}
  83. def get_chat_data(self) -> dict:
  84. return {}
  85. def get_bot_data(self) -> dict:
  86. return self._bot_data
  87. def get_conversations(self, name: str) -> dict:
  88. return {}
  89. def update_user_data(self, user_id: int, data: dict) -> None:
  90. pass
  91. def update_chat_data(self, chat_id: int, data: dict) -> None:
  92. pass
  93. def update_bot_data(self, data: dict) -> None:
  94. pass
  95. def update_conversation(
  96. self, name: str, key: tuple, new_state: typing.Optional[object]
  97. ) -> None:
  98. pass
  99. # https://git.hammerle.me/fphammerle/pyftpd-sink/src/5daf383bc238425cd37d011959a8eeffab0112c3/pyftpd-sink#L48
  100. class _EnvDefaultArgparser(argparse.ArgumentParser):
  101. def add_argument(self, *args, envvar=None, **kwargs):
  102. # pylint: disable=arguments-differ; using *args & **kwargs to catch all
  103. if envvar:
  104. envvar_value = os.environ.get(envvar, None)
  105. if envvar_value:
  106. kwargs["required"] = False
  107. kwargs["default"] = envvar_value
  108. super().add_argument(*args, **kwargs)
  109. def _main():
  110. argparser = _EnvDefaultArgparser()
  111. argparser.add_argument(
  112. "--telegram-token-path",
  113. type=pathlib.Path,
  114. required=True,
  115. envvar="TELEGRAM_TOKEN_PATH",
  116. help="default: env var TELEGRAM_TOKEN_PATH",
  117. )
  118. argparser.add_argument(
  119. "--wikimap-export-path",
  120. type=pathlib.Path,
  121. required=True,
  122. envvar="WIKIMAP_EXPORT_PATH",
  123. help="https://wikimap.toolforge.org/api.php?[...] json, "
  124. "default: env var WIKIMAP_EXPORT_PATH",
  125. )
  126. argparser.add_argument("--debug", action="store_true")
  127. args = argparser.parse_args()
  128. # https://github.com/fphammerle/python-cc1101/blob/26d8122661fc4587ecc7c73df55b92d05cf98fe8/cc1101/_cli.py#L51
  129. logging.basicConfig(
  130. level=logging.DEBUG if args.debug else logging.INFO,
  131. format="%(asctime)s:%(levelname)s:%(name)s:%(funcName)s:%(message)s"
  132. if args.debug
  133. else "%(message)s",
  134. datefmt="%Y-%m-%dT%H:%M:%S%z",
  135. )
  136. _LOGGER.debug("args=%r", args)
  137. photos = [
  138. _Photo.from_wikimap_export(attrs)
  139. for attrs in json.loads(args.wikimap_export_path.read_text())
  140. ]
  141. updater = telegram.ext.Updater(
  142. token=args.telegram_token_path.read_text().rstrip(),
  143. use_context=True,
  144. persistence=_Persistence(photos=photos),
  145. )
  146. updater.dispatcher.add_handler(telegram.ext.CommandHandler("photo", _photo_command))
  147. updater.start_polling()