location_guessing_game_telegram_bot.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. #!/usr/bin/env python3
  2. import argparse
  3. import json
  4. import logging
  5. import os
  6. import pathlib
  7. import random
  8. import typing
  9. import urllib.request
  10. import telegram.ext
  11. import telegram.update
  12. _LOGGER = logging.getLogger(__name__)
  13. class _Photo:
  14. def __init__(
  15. self, photo_url: str, description_url: str, latitude: float, longitude: float
  16. ) -> None:
  17. self.photo_url = photo_url
  18. self.description_url = description_url
  19. self.latitude = latitude
  20. self.longitude = longitude
  21. def __str__(self) -> str:
  22. return "Photo({})".format(self.description_url)
  23. @classmethod
  24. def from_wikimap_export(cls, data: dict) -> "_Photo":
  25. if isinstance(data["coordinates"], list):
  26. coords = data["coordinates"][0]
  27. else:
  28. coords = data["coordinates"]["1"]
  29. assert len(data["imageinfo"]) == 1, data["imageinfo"]
  30. return cls(
  31. latitude=coords["lat"],
  32. longitude=coords["lon"],
  33. photo_url=data["imageinfo"][0]["url"],
  34. description_url=data["imageinfo"][0]["descriptionurl"],
  35. )
  36. def _photo_command(
  37. update: telegram.update.Update,
  38. context: telegram.ext.callbackcontext.CallbackContext,
  39. ):
  40. if "last_photo_message_id" in context.chat_data:
  41. update.effective_chat.send_message(
  42. text="Lösung: {}".format(context.chat_data["last_photo"].description_url),
  43. disable_web_page_preview=True,
  44. reply_to_message_id=context.chat_data["last_photo_message_id"],
  45. )
  46. # https://github.com/python-telegram-bot/python-telegram-bot/pull/2043
  47. context.bot.send_location(
  48. chat_id=update.effective_chat.id,
  49. latitude=context.chat_data["last_photo"].latitude,
  50. longitude=context.chat_data["last_photo"].longitude,
  51. disable_notification=True,
  52. )
  53. context.chat_data["last_photo_message_id"] = None
  54. update.effective_chat.send_message(
  55. text="Neues Photo wird ausgewählt und gesendet.", disable_notification=True
  56. )
  57. while True:
  58. photo = random.choice(context.bot_data["photos"])
  59. _LOGGER.info("sending %s", photo)
  60. try:
  61. with urllib.request.urlopen(photo.photo_url) as photo_response:
  62. photo_message = update.effective_chat.send_photo(
  63. photo=photo_response, caption="Wo wurde dieses Photo aufgenommen?",
  64. )
  65. except telegram.error.BadRequest:
  66. _LOGGER.warning("file size limit exceeded?", exc_info=True)
  67. except telegram.error.TimedOut:
  68. _LOGGER.warning("timeout", exc_info=True)
  69. else:
  70. break
  71. context.chat_data["last_photo"] = photo
  72. context.chat_data["last_photo_message_id"] = photo_message.message_id
  73. class _Persistence(telegram.ext.BasePersistence):
  74. """
  75. found no easier way to inject bot_data
  76. https://python-telegram-bot.readthedocs.io/en/latest/telegram.ext.basepersistence.html
  77. """
  78. def __init__(self, photos: typing.List[_Photo]) -> None:
  79. self._bot_data = {"photos": photos}
  80. super().__init__(
  81. store_bot_data=True, store_chat_data=False, store_user_data=False
  82. )
  83. def get_user_data(self) -> dict:
  84. return {}
  85. def get_chat_data(self) -> dict:
  86. return {}
  87. def get_bot_data(self) -> dict:
  88. return self._bot_data
  89. def get_conversations(self, name: str) -> dict:
  90. return {}
  91. def update_user_data(self, user_id: int, data: dict) -> None:
  92. pass
  93. def update_chat_data(self, chat_id: int, data: dict) -> None:
  94. pass
  95. def update_bot_data(self, data: dict) -> None:
  96. pass
  97. def update_conversation(
  98. self, name: str, key: tuple, new_state: typing.Optional[object]
  99. ) -> None:
  100. pass
  101. # https://git.hammerle.me/fphammerle/pyftpd-sink/src/5daf383bc238425cd37d011959a8eeffab0112c3/pyftpd-sink#L48
  102. class _EnvDefaultArgparser(argparse.ArgumentParser):
  103. def add_argument(self, *args, envvar=None, **kwargs):
  104. # pylint: disable=arguments-differ; using *args & **kwargs to catch all
  105. if envvar:
  106. envvar_value = os.environ.get(envvar, None)
  107. if envvar_value:
  108. kwargs["required"] = False
  109. kwargs["default"] = envvar_value
  110. super().add_argument(*args, **kwargs)
  111. def _main():
  112. logging.basicConfig(
  113. level=logging.DEBUG,
  114. format="%(asctime)s:%(levelname)s:%(name)s:%(funcName)s:%(message)s",
  115. datefmt="%Y-%m-%dT%H:%M:%S%z",
  116. )
  117. argparser = _EnvDefaultArgparser()
  118. argparser.add_argument(
  119. "--telegram-token-path",
  120. type=pathlib.Path,
  121. required=True,
  122. envvar="TELEGRAM_TOKEN_PATH",
  123. help="default: env var TELEGRAM_TOKEN_PATH",
  124. )
  125. argparser.add_argument(
  126. "--wikimap-export-path",
  127. type=pathlib.Path,
  128. required=True,
  129. envvar="WIKIMAP_EXPORT_PATH",
  130. help="https://wikimap.toolforge.org/api.php?[...] json, "
  131. "default: env var WIKIMAP_EXPORT_PATH",
  132. )
  133. args = argparser.parse_args()
  134. _LOGGER.debug("args=%r", args)
  135. photos = [
  136. _Photo.from_wikimap_export(attrs)
  137. for attrs in json.loads(args.wikimap_export_path.read_text())
  138. ]
  139. updater = telegram.ext.Updater(
  140. token=args.telegram_token_path.read_text().rstrip(),
  141. use_context=True,
  142. persistence=_Persistence(photos=photos),
  143. )
  144. updater.dispatcher.add_handler(telegram.ext.CommandHandler("photo", _photo_command))
  145. updater.start_polling()
  146. if __name__ == "__main__":
  147. _main()