location_guessing_game_telegram_bot.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. #!/usr/bin/env python3
  2. import argparse
  3. import json
  4. import logging
  5. import os
  6. import pathlib
  7. import random
  8. import typing
  9. import urllib.request
  10. import telegram.ext
  11. import telegram.update
  12. _LOGGER = logging.getLogger(__name__)
  13. class _Photo:
  14. def __init__(
  15. self, photo_url: str, description_url: str, latitude: float, longitude: float
  16. ) -> None:
  17. self.photo_url = photo_url
  18. self.description_url = description_url
  19. self.latitude = latitude
  20. self.longitude = longitude
  21. def __str__(self) -> str:
  22. return "Photo({})".format(self.description_url)
  23. @classmethod
  24. def from_wikimap_export(cls, data: dict) -> "_Photo":
  25. if isinstance(data["coordinates"], list):
  26. coords = data["coordinates"][0]
  27. else:
  28. coords = data["coordinates"]["1"]
  29. assert len(data["imageinfo"]) == 1, data["imageinfo"]
  30. return cls(
  31. latitude=coords["lat"],
  32. longitude=coords["lon"],
  33. photo_url=data["imageinfo"][0]["url"],
  34. description_url=data["imageinfo"][0]["descriptionurl"],
  35. )
  36. def _photo_command(
  37. update: telegram.update.Update,
  38. context: telegram.ext.callbackcontext.CallbackContext,
  39. ):
  40. if "last_photo_message_id" in context.chat_data:
  41. update.effective_chat.send_message(
  42. text="Lösung: {}".format(context.chat_data["last_photo"].description_url),
  43. disable_web_page_preview=True,
  44. reply_to_message_id=context.chat_data["last_photo_message_id"],
  45. )
  46. # https://github.com/python-telegram-bot/python-telegram-bot/pull/2043
  47. update.effective_chat.send_location(
  48. latitude=context.chat_data["last_photo"].latitude,
  49. longitude=context.chat_data["last_photo"].longitude,
  50. disable_notification=True,
  51. )
  52. context.chat_data["last_photo_message_id"] = None
  53. update.effective_chat.send_message(
  54. text="Neues Photo wird ausgewählt und gesendet.", disable_notification=True
  55. )
  56. while True:
  57. photo = random.choice(context.bot_data["photos"])
  58. _LOGGER.info("sending %s", photo)
  59. try:
  60. with urllib.request.urlopen(photo.photo_url) as photo_response:
  61. photo_message = update.effective_chat.send_photo(
  62. photo=photo_response, caption="Wo wurde dieses Photo aufgenommen?",
  63. )
  64. except telegram.error.BadRequest:
  65. _LOGGER.warning("file size limit exceeded?", exc_info=True)
  66. except telegram.error.TimedOut:
  67. _LOGGER.warning("timeout", exc_info=True)
  68. else:
  69. break
  70. context.chat_data["last_photo"] = photo
  71. context.chat_data["last_photo_message_id"] = photo_message.message_id
  72. class _Persistence(telegram.ext.BasePersistence):
  73. """
  74. found no easier way to inject bot_data
  75. https://python-telegram-bot.readthedocs.io/en/latest/telegram.ext.basepersistence.html
  76. """
  77. def __init__(self, photos: typing.List[_Photo]) -> None:
  78. self._bot_data = {"photos": photos}
  79. super().__init__(
  80. store_bot_data=True, store_chat_data=False, store_user_data=False
  81. )
  82. def get_user_data(self) -> dict:
  83. return {}
  84. def get_chat_data(self) -> dict:
  85. return {}
  86. def get_bot_data(self) -> dict:
  87. return self._bot_data
  88. def get_conversations(self, name: str) -> dict:
  89. return {}
  90. def update_user_data(self, user_id: int, data: dict) -> None:
  91. pass
  92. def update_chat_data(self, chat_id: int, data: dict) -> None:
  93. pass
  94. def update_bot_data(self, data: dict) -> None:
  95. pass
  96. def update_conversation(
  97. self, name: str, key: tuple, new_state: typing.Optional[object]
  98. ) -> None:
  99. pass
  100. # https://git.hammerle.me/fphammerle/pyftpd-sink/src/5daf383bc238425cd37d011959a8eeffab0112c3/pyftpd-sink#L48
  101. class _EnvDefaultArgparser(argparse.ArgumentParser):
  102. def add_argument(self, *args, envvar=None, **kwargs):
  103. # pylint: disable=arguments-differ; using *args & **kwargs to catch all
  104. if envvar:
  105. envvar_value = os.environ.get(envvar, None)
  106. if envvar_value:
  107. kwargs["required"] = False
  108. kwargs["default"] = envvar_value
  109. super().add_argument(*args, **kwargs)
  110. def _main():
  111. logging.basicConfig(
  112. level=logging.DEBUG,
  113. format="%(asctime)s:%(levelname)s:%(name)s:%(funcName)s:%(message)s",
  114. datefmt="%Y-%m-%dT%H:%M:%S%z",
  115. )
  116. argparser = _EnvDefaultArgparser()
  117. argparser.add_argument(
  118. "--telegram-token-path",
  119. type=pathlib.Path,
  120. required=True,
  121. envvar="TELEGRAM_TOKEN_PATH",
  122. help="default: env var TELEGRAM_TOKEN_PATH",
  123. )
  124. argparser.add_argument(
  125. "--wikimap-export-path",
  126. type=pathlib.Path,
  127. required=True,
  128. envvar="WIKIMAP_EXPORT_PATH",
  129. help="https://wikimap.toolforge.org/api.php?[...] json, "
  130. "default: env var WIKIMAP_EXPORT_PATH",
  131. )
  132. args = argparser.parse_args()
  133. _LOGGER.debug("args=%r", args)
  134. photos = [
  135. _Photo.from_wikimap_export(attrs)
  136. for attrs in json.loads(args.wikimap_export_path.read_text())
  137. ]
  138. updater = telegram.ext.Updater(
  139. token=args.telegram_token_path.read_text().rstrip(),
  140. use_context=True,
  141. persistence=_Persistence(photos=photos),
  142. )
  143. updater.dispatcher.add_handler(telegram.ext.CommandHandler("photo", _photo_command))
  144. updater.start_polling()
  145. if __name__ == "__main__":
  146. _main()