__init__.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. # location-guessing-game-telegram-bot - Telegram Bot Sending Random Wikimedia Commons Photos
  2. #
  3. # Copyright (C) 2021 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. from __future__ import annotations
  18. import argparse
  19. import dataclasses
  20. import json
  21. import logging
  22. import os
  23. import pathlib
  24. import random
  25. import typing
  26. import urllib.request
  27. import telegram.ext
  28. import telegram.update
  29. _LOGGER = logging.getLogger(__name__)
  30. @dataclasses.dataclass
  31. class _Photo:
  32. photo_url: str
  33. description_url: str
  34. latitude: float
  35. longitude: float
  36. def __str__(self) -> str:
  37. return "photo " + self.description_url
  38. @classmethod
  39. def from_wikimap_export(cls, data: dict) -> _Photo:
  40. # pylint: disable=consider-ternary-expression; easier to read
  41. if isinstance(data["coordinates"], list):
  42. coords = data["coordinates"][0]
  43. else:
  44. coords = data["coordinates"]["1"]
  45. assert len(data["imageinfo"]) == 1, data["imageinfo"]
  46. return cls(
  47. latitude=coords["lat"],
  48. longitude=coords["lon"],
  49. photo_url=data["imageinfo"][0]["url"],
  50. description_url=data["imageinfo"][0]["descriptionurl"],
  51. )
  52. def _photo_command(
  53. update: telegram.update.Update,
  54. context: telegram.ext.callbackcontext.CallbackContext,
  55. ):
  56. assert isinstance(context.chat_data, dict) # mypy
  57. assert update.effective_chat is not None # mypy
  58. if "last_photo_message_id" in context.chat_data:
  59. update.effective_chat.send_message(
  60. text=f"Lösung: {context.chat_data['last_photo'].description_url}",
  61. disable_web_page_preview=True,
  62. reply_to_message_id=context.chat_data["last_photo_message_id"],
  63. )
  64. # telegram.chat.Chat.send_location shortcut added in v13.0
  65. # https://github.com/python-telegram-bot/python-telegram-bot/commit/fc5844c13da3b3fb20bb2d0bfcdf1efb1a826ba6#diff-2590f2bde47ea3730442f14a3a029ef77d8f2c8f3186cf5edd7e18bcc7243c39R381
  66. context.bot.send_location(
  67. chat_id=update.effective_chat.id,
  68. latitude=context.chat_data["last_photo"].latitude,
  69. longitude=context.chat_data["last_photo"].longitude,
  70. disable_notification=True,
  71. )
  72. context.chat_data["last_photo_message_id"] = None
  73. update.effective_chat.send_message(
  74. text="Neues Photo wird ausgewählt und gesendet.", disable_notification=True
  75. )
  76. while True:
  77. photo = random.choice(context.bot_data["photos"])
  78. _LOGGER.info("sending %s", photo)
  79. try:
  80. with urllib.request.urlopen(photo.photo_url) as photo_response:
  81. photo_message = update.effective_chat.send_photo(
  82. photo=photo_response,
  83. caption="Wo wurde dieses Photo aufgenommen?",
  84. )
  85. except telegram.error.BadRequest:
  86. _LOGGER.warning("file size limit exceeded?", exc_info=True)
  87. except telegram.error.TimedOut:
  88. _LOGGER.warning("timeout", exc_info=True)
  89. else:
  90. break
  91. context.chat_data["last_photo"] = photo
  92. context.chat_data["last_photo_message_id"] = photo_message.message_id
  93. class _Persistence(telegram.ext.BasePersistence):
  94. """
  95. found no easier way to inject bot_data
  96. https://python-telegram-bot.readthedocs.io/en/latest/telegram.ext.basepersistence.html
  97. """
  98. def __init__(self, photos: typing.List[_Photo]) -> None:
  99. self._bot_data = {"photos": photos}
  100. super().__init__(
  101. store_bot_data=True, store_chat_data=False, store_user_data=False
  102. )
  103. def get_user_data(self) -> typing.DefaultDict[int, dict]:
  104. raise NotImplementedError() # pragma: no cover
  105. def get_chat_data(self) -> typing.DefaultDict[int, dict]:
  106. raise NotImplementedError() # pragma: no cover
  107. def get_bot_data(self) -> dict:
  108. return self._bot_data
  109. def get_conversations(self, name: str) -> dict:
  110. return {} # pragma: no cover
  111. def update_user_data(self, user_id: int, data: dict) -> None:
  112. pass # pragma: no cover
  113. def update_chat_data(self, chat_id: int, data: dict) -> None:
  114. pass # pragma: no cover
  115. def update_bot_data(self, data: dict) -> None:
  116. pass # pragma: no cover
  117. def update_conversation(
  118. self, name: str, key: tuple, new_state: typing.Optional[object]
  119. ) -> None:
  120. pass # pragma: no cover
  121. # https://git.hammerle.me/fphammerle/pyftpd-sink/src/5daf383bc238425cd37d011959a8eeffab0112c3/pyftpd-sink#L48
  122. class _EnvDefaultArgparser(argparse.ArgumentParser):
  123. def add_argument(self, *args, envvar=None, **kwargs):
  124. # pylint: disable=arguments-differ; using *args & **kwargs to catch all
  125. if envvar:
  126. envvar_value = os.environ.get(envvar, None)
  127. if envvar_value:
  128. kwargs["required"] = False
  129. kwargs["default"] = envvar_value
  130. super().add_argument(*args, **kwargs)
  131. def _run(
  132. *, telegram_token_path: pathlib.Path, wikimap_export_path: pathlib.Path
  133. ) -> None:
  134. photos = [
  135. _Photo.from_wikimap_export(attrs)
  136. for attrs in json.loads(wikimap_export_path.read_text())
  137. ]
  138. updater = telegram.ext.Updater(
  139. token=telegram_token_path.read_text().rstrip(),
  140. use_context=True,
  141. persistence=_Persistence(photos=photos),
  142. )
  143. # workaround for mypy reporting
  144. # > error: Cannot determine type of "dispatcher" [has-type]
  145. dispatcher: telegram.ext.dispatcher.Dispatcher = updater.dispatcher # type: ignore
  146. dispatcher.add_handler(telegram.ext.CommandHandler("photo", _photo_command))
  147. updater.start_polling()
  148. def _main() -> None:
  149. argparser = _EnvDefaultArgparser()
  150. argparser.add_argument(
  151. "--telegram-token-path",
  152. type=pathlib.Path,
  153. required=True,
  154. envvar="TELEGRAM_TOKEN_PATH",
  155. help="default: env var TELEGRAM_TOKEN_PATH",
  156. )
  157. argparser.add_argument(
  158. "--wikimap-export-path",
  159. type=pathlib.Path,
  160. required=True,
  161. envvar="WIKIMAP_EXPORT_PATH",
  162. help="https://wikimap.toolforge.org/api.php?[...] json, "
  163. "default: env var WIKIMAP_EXPORT_PATH",
  164. )
  165. argparser.add_argument("--debug", action="store_true")
  166. args = argparser.parse_args()
  167. # https://github.com/fphammerle/python-cc1101/blob/26d8122661fc4587ecc7c73df55b92d05cf98fe8/cc1101/_cli.py#L51
  168. logging.basicConfig(
  169. level=logging.DEBUG if args.debug else logging.INFO,
  170. format=(
  171. "%(asctime)s:%(levelname)s:%(name)s:%(funcName)s:%(message)s"
  172. if args.debug
  173. else "%(message)s"
  174. ),
  175. datefmt="%Y-%m-%dT%H:%M:%S%z",
  176. )
  177. _LOGGER.debug("args=%r", args)
  178. _run(
  179. telegram_token_path=args.telegram_token_path,
  180. wikimap_export_path=args.wikimap_export_path,
  181. )