__init__.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. # location-guessing-game-telegram-bot - Telegram Bot Sending Random Wikimedia Commons Photos
  2. #
  3. # Copyright (C) 2021 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import argparse
  18. import json
  19. import logging
  20. import os
  21. import pathlib
  22. import random
  23. import typing
  24. import urllib.request
  25. import telegram.ext
  26. import telegram.update
  27. _LOGGER = logging.getLogger(__name__)
  28. class _Photo:
  29. def __init__(
  30. self, *, photo_url: str, description_url: str, latitude: float, longitude: float
  31. ) -> None:
  32. self.photo_url = photo_url
  33. self.description_url = description_url
  34. self.latitude = latitude
  35. self.longitude = longitude
  36. def __str__(self) -> str:
  37. return "photo " + self.description_url
  38. @classmethod
  39. def from_wikimap_export(cls, data: dict) -> "_Photo":
  40. if isinstance(data["coordinates"], list):
  41. coords = data["coordinates"][0]
  42. else:
  43. coords = data["coordinates"]["1"]
  44. assert len(data["imageinfo"]) == 1, data["imageinfo"]
  45. return cls(
  46. latitude=coords["lat"],
  47. longitude=coords["lon"],
  48. photo_url=data["imageinfo"][0]["url"],
  49. description_url=data["imageinfo"][0]["descriptionurl"],
  50. )
  51. def _photo_command(
  52. update: telegram.update.Update,
  53. context: telegram.ext.callbackcontext.CallbackContext,
  54. ):
  55. assert isinstance(context.chat_data, dict) # mypy
  56. assert update.effective_chat is not None # mypy
  57. if "last_photo_message_id" in context.chat_data:
  58. update.effective_chat.send_message(
  59. text=f"Lösung: {context.chat_data['last_photo'].description_url}",
  60. disable_web_page_preview=True,
  61. reply_to_message_id=context.chat_data["last_photo_message_id"],
  62. )
  63. # telegram.chat.Chat.send_location shortcut added in v13.0
  64. # https://github.com/python-telegram-bot/python-telegram-bot/commit/fc5844c13da3b3fb20bb2d0bfcdf1efb1a826ba6#diff-2590f2bde47ea3730442f14a3a029ef77d8f2c8f3186cf5edd7e18bcc7243c39R381
  65. context.bot.send_location(
  66. chat_id=update.effective_chat.id,
  67. latitude=context.chat_data["last_photo"].latitude,
  68. longitude=context.chat_data["last_photo"].longitude,
  69. disable_notification=True,
  70. )
  71. context.chat_data["last_photo_message_id"] = None
  72. update.effective_chat.send_message(
  73. text="Neues Photo wird ausgewählt und gesendet.", disable_notification=True
  74. )
  75. while True:
  76. photo = random.choice(context.bot_data["photos"])
  77. _LOGGER.info("sending %s", photo)
  78. try:
  79. with urllib.request.urlopen(photo.photo_url) as photo_response:
  80. photo_message = update.effective_chat.send_photo(
  81. photo=photo_response,
  82. caption="Wo wurde dieses Photo aufgenommen?",
  83. )
  84. except telegram.error.BadRequest:
  85. _LOGGER.warning("file size limit exceeded?", exc_info=True)
  86. except telegram.error.TimedOut:
  87. _LOGGER.warning("timeout", exc_info=True)
  88. else:
  89. break
  90. context.chat_data["last_photo"] = photo
  91. context.chat_data["last_photo_message_id"] = photo_message.message_id
  92. class _Persistence(telegram.ext.BasePersistence):
  93. """
  94. found no easier way to inject bot_data
  95. https://python-telegram-bot.readthedocs.io/en/latest/telegram.ext.basepersistence.html
  96. """
  97. def __init__(self, photos: typing.List[_Photo]) -> None:
  98. self._bot_data = {"photos": photos}
  99. super().__init__(
  100. store_bot_data=True, store_chat_data=False, store_user_data=False
  101. )
  102. def get_user_data(self) -> typing.DefaultDict[int, dict]:
  103. raise NotImplementedError() # pragma: no cover
  104. def get_chat_data(self) -> typing.DefaultDict[int, dict]:
  105. raise NotImplementedError() # pragma: no cover
  106. def get_bot_data(self) -> dict:
  107. return self._bot_data
  108. def get_conversations(self, name: str) -> dict:
  109. return {} # pragma: no cover
  110. def update_user_data(self, user_id: int, data: dict) -> None:
  111. pass # pragma: no cover
  112. def update_chat_data(self, chat_id: int, data: dict) -> None:
  113. pass # pragma: no cover
  114. def update_bot_data(self, data: dict) -> None:
  115. pass # pragma: no cover
  116. def update_conversation(
  117. self, name: str, key: tuple, new_state: typing.Optional[object]
  118. ) -> None:
  119. pass # pragma: no cover
  120. # https://git.hammerle.me/fphammerle/pyftpd-sink/src/5daf383bc238425cd37d011959a8eeffab0112c3/pyftpd-sink#L48
  121. class _EnvDefaultArgparser(argparse.ArgumentParser):
  122. def add_argument(self, *args, envvar=None, **kwargs):
  123. # pylint: disable=arguments-differ; using *args & **kwargs to catch all
  124. if envvar:
  125. envvar_value = os.environ.get(envvar, None)
  126. if envvar_value:
  127. kwargs["required"] = False
  128. kwargs["default"] = envvar_value
  129. super().add_argument(*args, **kwargs)
  130. def _run(
  131. *, telegram_token_path: pathlib.Path, wikimap_export_path: pathlib.Path
  132. ) -> None:
  133. photos = [
  134. _Photo.from_wikimap_export(attrs)
  135. for attrs in json.loads(wikimap_export_path.read_text())
  136. ]
  137. updater = telegram.ext.Updater(
  138. token=telegram_token_path.read_text().rstrip(),
  139. use_context=True,
  140. persistence=_Persistence(photos=photos),
  141. )
  142. updater.dispatcher.add_handler(telegram.ext.CommandHandler("photo", _photo_command))
  143. updater.start_polling()
  144. def _main():
  145. argparser = _EnvDefaultArgparser()
  146. argparser.add_argument(
  147. "--telegram-token-path",
  148. type=pathlib.Path,
  149. required=True,
  150. envvar="TELEGRAM_TOKEN_PATH",
  151. help="default: env var TELEGRAM_TOKEN_PATH",
  152. )
  153. argparser.add_argument(
  154. "--wikimap-export-path",
  155. type=pathlib.Path,
  156. required=True,
  157. envvar="WIKIMAP_EXPORT_PATH",
  158. help="https://wikimap.toolforge.org/api.php?[...] json, "
  159. "default: env var WIKIMAP_EXPORT_PATH",
  160. )
  161. argparser.add_argument("--debug", action="store_true")
  162. args = argparser.parse_args()
  163. # https://github.com/fphammerle/python-cc1101/blob/26d8122661fc4587ecc7c73df55b92d05cf98fe8/cc1101/_cli.py#L51
  164. logging.basicConfig(
  165. level=logging.DEBUG if args.debug else logging.INFO,
  166. format="%(asctime)s:%(levelname)s:%(name)s:%(funcName)s:%(message)s"
  167. if args.debug
  168. else "%(message)s",
  169. datefmt="%Y-%m-%dT%H:%M:%S%z",
  170. )
  171. _LOGGER.debug("args=%r", args)
  172. _run(
  173. telegram_token_path=args.telegram_token_path,
  174. wikimap_export_path=args.wikimap_export_path,
  175. )