__init__.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. # location-guessing-game-telegram-bot - Telegram Bot Sending Random Wikimedia Commons Photos
  2. #
  3. # Copyright (C) 2021 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. from __future__ import annotations
  18. import argparse
  19. import dataclasses
  20. import json
  21. import logging
  22. import os
  23. import pathlib
  24. import random
  25. import typing
  26. import urllib.request
  27. import telegram.ext
  28. import telegram.update
  29. _LOGGER = logging.getLogger(__name__)
  30. @dataclasses.dataclass
  31. class _Photo:
  32. photo_url: str
  33. description_url: str
  34. latitude: float
  35. longitude: float
  36. def __str__(self) -> str:
  37. return "photo " + self.description_url
  38. @classmethod
  39. def from_wikimap_export(cls, data: dict) -> _Photo:
  40. if isinstance(data["coordinates"], list):
  41. coords = data["coordinates"][0]
  42. else:
  43. coords = data["coordinates"]["1"]
  44. assert len(data["imageinfo"]) == 1, data["imageinfo"]
  45. return cls(
  46. latitude=coords["lat"],
  47. longitude=coords["lon"],
  48. photo_url=data["imageinfo"][0]["url"],
  49. description_url=data["imageinfo"][0]["descriptionurl"],
  50. )
  51. def _photo_command(
  52. update: telegram.update.Update,
  53. context: telegram.ext.callbackcontext.CallbackContext,
  54. ):
  55. assert isinstance(context.chat_data, dict) # mypy
  56. assert update.effective_chat is not None # mypy
  57. if "last_photo_message_id" in context.chat_data:
  58. update.effective_chat.send_message(
  59. text=f"Lösung: {context.chat_data['last_photo'].description_url}",
  60. disable_web_page_preview=True,
  61. reply_to_message_id=context.chat_data["last_photo_message_id"],
  62. )
  63. # telegram.chat.Chat.send_location shortcut added in v13.0
  64. # https://github.com/python-telegram-bot/python-telegram-bot/commit/fc5844c13da3b3fb20bb2d0bfcdf1efb1a826ba6#diff-2590f2bde47ea3730442f14a3a029ef77d8f2c8f3186cf5edd7e18bcc7243c39R381
  65. context.bot.send_location(
  66. chat_id=update.effective_chat.id,
  67. latitude=context.chat_data["last_photo"].latitude,
  68. longitude=context.chat_data["last_photo"].longitude,
  69. disable_notification=True,
  70. )
  71. context.chat_data["last_photo_message_id"] = None
  72. update.effective_chat.send_message(
  73. text="Neues Photo wird ausgewählt und gesendet.", disable_notification=True
  74. )
  75. while True:
  76. photo = random.choice(context.bot_data["photos"])
  77. _LOGGER.info("sending %s", photo)
  78. try:
  79. with urllib.request.urlopen(photo.photo_url) as photo_response:
  80. photo_message = update.effective_chat.send_photo(
  81. photo=photo_response,
  82. caption="Wo wurde dieses Photo aufgenommen?",
  83. )
  84. except telegram.error.BadRequest:
  85. _LOGGER.warning("file size limit exceeded?", exc_info=True)
  86. except telegram.error.TimedOut:
  87. _LOGGER.warning("timeout", exc_info=True)
  88. else:
  89. break
  90. context.chat_data["last_photo"] = photo
  91. context.chat_data["last_photo_message_id"] = photo_message.message_id
  92. class _Persistence(telegram.ext.BasePersistence):
  93. """
  94. found no easier way to inject bot_data
  95. https://python-telegram-bot.readthedocs.io/en/latest/telegram.ext.basepersistence.html
  96. """
  97. def __init__(self, photos: typing.List[_Photo]) -> None:
  98. self._bot_data = {"photos": photos}
  99. super().__init__(
  100. store_bot_data=True, store_chat_data=False, store_user_data=False
  101. )
  102. def get_user_data(self) -> typing.DefaultDict[int, dict]:
  103. raise NotImplementedError() # pragma: no cover
  104. def get_chat_data(self) -> typing.DefaultDict[int, dict]:
  105. raise NotImplementedError() # pragma: no cover
  106. def get_bot_data(self) -> dict:
  107. return self._bot_data
  108. def get_conversations(self, name: str) -> dict:
  109. return {} # pragma: no cover
  110. def update_user_data(self, user_id: int, data: dict) -> None:
  111. pass # pragma: no cover
  112. def update_chat_data(self, chat_id: int, data: dict) -> None:
  113. pass # pragma: no cover
  114. def update_bot_data(self, data: dict) -> None:
  115. pass # pragma: no cover
  116. def update_conversation(
  117. self, name: str, key: tuple, new_state: typing.Optional[object]
  118. ) -> None:
  119. pass # pragma: no cover
  120. # https://git.hammerle.me/fphammerle/pyftpd-sink/src/5daf383bc238425cd37d011959a8eeffab0112c3/pyftpd-sink#L48
  121. class _EnvDefaultArgparser(argparse.ArgumentParser):
  122. def add_argument(self, *args, envvar=None, **kwargs):
  123. # pylint: disable=arguments-differ; using *args & **kwargs to catch all
  124. if envvar:
  125. envvar_value = os.environ.get(envvar, None)
  126. if envvar_value:
  127. kwargs["required"] = False
  128. kwargs["default"] = envvar_value
  129. super().add_argument(*args, **kwargs)
  130. def _run(
  131. *, telegram_token_path: pathlib.Path, wikimap_export_path: pathlib.Path
  132. ) -> None:
  133. photos = [
  134. _Photo.from_wikimap_export(attrs)
  135. for attrs in json.loads(wikimap_export_path.read_text())
  136. ]
  137. updater = telegram.ext.Updater(
  138. token=telegram_token_path.read_text().rstrip(),
  139. use_context=True,
  140. persistence=_Persistence(photos=photos),
  141. )
  142. updater.dispatcher.add_handler(telegram.ext.CommandHandler("photo", _photo_command))
  143. updater.start_polling()
  144. def _main() -> None:
  145. argparser = _EnvDefaultArgparser()
  146. argparser.add_argument(
  147. "--telegram-token-path",
  148. type=pathlib.Path,
  149. required=True,
  150. envvar="TELEGRAM_TOKEN_PATH",
  151. help="default: env var TELEGRAM_TOKEN_PATH",
  152. )
  153. argparser.add_argument(
  154. "--wikimap-export-path",
  155. type=pathlib.Path,
  156. required=True,
  157. envvar="WIKIMAP_EXPORT_PATH",
  158. help="https://wikimap.toolforge.org/api.php?[...] json, "
  159. "default: env var WIKIMAP_EXPORT_PATH",
  160. )
  161. argparser.add_argument("--debug", action="store_true")
  162. args = argparser.parse_args()
  163. # https://github.com/fphammerle/python-cc1101/blob/26d8122661fc4587ecc7c73df55b92d05cf98fe8/cc1101/_cli.py#L51
  164. logging.basicConfig(
  165. level=logging.DEBUG if args.debug else logging.INFO,
  166. format="%(asctime)s:%(levelname)s:%(name)s:%(funcName)s:%(message)s"
  167. if args.debug
  168. else "%(message)s",
  169. datefmt="%Y-%m-%dT%H:%M:%S%z",
  170. )
  171. _LOGGER.debug("args=%r", args)
  172. _run(
  173. telegram_token_path=args.telegram_token_path,
  174. wikimap_export_path=args.wikimap_export_path,
  175. )