__init__.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. # location-guessing-game-telegram-bot - Telegram Bot Sending Random Wikimedia Commons Photos
  2. #
  3. # Copyright (C) 2021 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import argparse
  18. import json
  19. import logging
  20. import os
  21. import pathlib
  22. import random
  23. import typing
  24. import urllib.request
  25. import telegram.ext
  26. import telegram.update
  27. _LOGGER = logging.getLogger(__name__)
  28. class _Photo:
  29. def __init__(
  30. self, photo_url: str, description_url: str, latitude: float, longitude: float
  31. ) -> None:
  32. self.photo_url = photo_url
  33. self.description_url = description_url
  34. self.latitude = latitude
  35. self.longitude = longitude
  36. def __str__(self) -> str:
  37. return "photo " + self.description_url
  38. @classmethod
  39. def from_wikimap_export(cls, data: dict) -> "_Photo":
  40. if isinstance(data["coordinates"], list):
  41. coords = data["coordinates"][0]
  42. else:
  43. coords = data["coordinates"]["1"]
  44. assert len(data["imageinfo"]) == 1, data["imageinfo"]
  45. return cls(
  46. latitude=coords["lat"],
  47. longitude=coords["lon"],
  48. photo_url=data["imageinfo"][0]["url"],
  49. description_url=data["imageinfo"][0]["descriptionurl"],
  50. )
  51. def _photo_command(
  52. update: telegram.update.Update,
  53. context: telegram.ext.callbackcontext.CallbackContext,
  54. ):
  55. assert isinstance(context.chat_data, dict) # mypy
  56. assert update.effective_chat is not None # mypy
  57. if "last_photo_message_id" in context.chat_data:
  58. update.effective_chat.send_message(
  59. text="Lösung: {}".format(context.chat_data["last_photo"].description_url),
  60. disable_web_page_preview=True,
  61. reply_to_message_id=context.chat_data["last_photo_message_id"],
  62. )
  63. # https://github.com/python-telegram-bot/python-telegram-bot/pull/2043
  64. update.effective_chat.send_location(
  65. latitude=context.chat_data["last_photo"].latitude,
  66. longitude=context.chat_data["last_photo"].longitude,
  67. disable_notification=True,
  68. )
  69. context.chat_data["last_photo_message_id"] = None
  70. update.effective_chat.send_message(
  71. text="Neues Photo wird ausgewählt und gesendet.", disable_notification=True
  72. )
  73. while True:
  74. photo = random.choice(context.bot_data["photos"])
  75. _LOGGER.info("sending %s", photo)
  76. try:
  77. with urllib.request.urlopen(photo.photo_url) as photo_response:
  78. photo_message = update.effective_chat.send_photo(
  79. photo=photo_response,
  80. caption="Wo wurde dieses Photo aufgenommen?",
  81. )
  82. except telegram.error.BadRequest:
  83. _LOGGER.warning("file size limit exceeded?", exc_info=True)
  84. except telegram.error.TimedOut:
  85. _LOGGER.warning("timeout", exc_info=True)
  86. else:
  87. break
  88. context.chat_data["last_photo"] = photo
  89. context.chat_data["last_photo_message_id"] = photo_message.message_id
  90. class _Persistence(telegram.ext.BasePersistence):
  91. """
  92. found no easier way to inject bot_data
  93. https://python-telegram-bot.readthedocs.io/en/latest/telegram.ext.basepersistence.html
  94. """
  95. def __init__(self, photos: typing.List[_Photo]) -> None:
  96. self._bot_data = {"photos": photos}
  97. super().__init__(
  98. store_bot_data=True, store_chat_data=False, store_user_data=False
  99. )
  100. def get_user_data(self) -> typing.DefaultDict[int, dict]:
  101. raise NotImplementedError() # pragma: no cover
  102. def get_chat_data(self) -> typing.DefaultDict[int, dict]:
  103. raise NotImplementedError() # pragma: no cover
  104. def get_bot_data(self) -> dict:
  105. return self._bot_data
  106. def get_conversations(self, name: str) -> dict:
  107. return {} # pragma: no cover
  108. def update_user_data(self, user_id: int, data: dict) -> None:
  109. pass # pragma: no cover
  110. def update_chat_data(self, chat_id: int, data: dict) -> None:
  111. pass # pragma: no cover
  112. def update_bot_data(self, data: dict) -> None:
  113. pass # pragma: no cover
  114. def update_conversation(
  115. self, name: str, key: tuple, new_state: typing.Optional[object]
  116. ) -> None:
  117. pass # pragma: no cover
  118. # https://git.hammerle.me/fphammerle/pyftpd-sink/src/5daf383bc238425cd37d011959a8eeffab0112c3/pyftpd-sink#L48
  119. class _EnvDefaultArgparser(argparse.ArgumentParser):
  120. def add_argument(self, *args, envvar=None, **kwargs):
  121. # pylint: disable=arguments-differ; using *args & **kwargs to catch all
  122. if envvar:
  123. envvar_value = os.environ.get(envvar, None)
  124. if envvar_value:
  125. kwargs["required"] = False
  126. kwargs["default"] = envvar_value
  127. super().add_argument(*args, **kwargs)
  128. def _run(telegram_token_path: pathlib.Path, wikimap_export_path: pathlib.Path) -> None:
  129. photos = [
  130. _Photo.from_wikimap_export(attrs)
  131. for attrs in json.loads(wikimap_export_path.read_text())
  132. ]
  133. updater = telegram.ext.Updater(
  134. token=telegram_token_path.read_text().rstrip(),
  135. use_context=True,
  136. persistence=_Persistence(photos=photos),
  137. )
  138. updater.dispatcher.add_handler(telegram.ext.CommandHandler("photo", _photo_command))
  139. updater.start_polling()
  140. def _main():
  141. argparser = _EnvDefaultArgparser()
  142. argparser.add_argument(
  143. "--telegram-token-path",
  144. type=pathlib.Path,
  145. required=True,
  146. envvar="TELEGRAM_TOKEN_PATH",
  147. help="default: env var TELEGRAM_TOKEN_PATH",
  148. )
  149. argparser.add_argument(
  150. "--wikimap-export-path",
  151. type=pathlib.Path,
  152. required=True,
  153. envvar="WIKIMAP_EXPORT_PATH",
  154. help="https://wikimap.toolforge.org/api.php?[...] json, "
  155. "default: env var WIKIMAP_EXPORT_PATH",
  156. )
  157. argparser.add_argument("--debug", action="store_true")
  158. args = argparser.parse_args()
  159. # https://github.com/fphammerle/python-cc1101/blob/26d8122661fc4587ecc7c73df55b92d05cf98fe8/cc1101/_cli.py#L51
  160. logging.basicConfig(
  161. level=logging.DEBUG if args.debug else logging.INFO,
  162. format="%(asctime)s:%(levelname)s:%(name)s:%(funcName)s:%(message)s"
  163. if args.debug
  164. else "%(message)s",
  165. datefmt="%Y-%m-%dT%H:%M:%S%z",
  166. )
  167. _LOGGER.debug("args=%r", args)
  168. _run(
  169. telegram_token_path=args.telegram_token_path,
  170. wikimap_export_path=args.wikimap_export_path,
  171. )