__init__.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. # location-guessing-game-telegram-bot - Telegram Bot Sending Random Wikimedia Commons Photos
  2. #
  3. # Copyright (C) 2021 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import argparse
  18. import json
  19. import logging
  20. import os
  21. import pathlib
  22. import random
  23. import typing
  24. import urllib.request
  25. import telegram.ext
  26. import telegram.update
  27. _LOGGER = logging.getLogger(__name__)
  28. class _Photo:
  29. def __init__(
  30. self, photo_url: str, description_url: str, latitude: float, longitude: float
  31. ) -> None:
  32. self.photo_url = photo_url
  33. self.description_url = description_url
  34. self.latitude = latitude
  35. self.longitude = longitude
  36. def __str__(self) -> str:
  37. return "photo " + self.description_url
  38. @classmethod
  39. def from_wikimap_export(cls, data: dict) -> "_Photo":
  40. if isinstance(data["coordinates"], list):
  41. coords = data["coordinates"][0]
  42. else:
  43. coords = data["coordinates"]["1"]
  44. assert len(data["imageinfo"]) == 1, data["imageinfo"]
  45. return cls(
  46. latitude=coords["lat"],
  47. longitude=coords["lon"],
  48. photo_url=data["imageinfo"][0]["url"],
  49. description_url=data["imageinfo"][0]["descriptionurl"],
  50. )
  51. def _photo_command(
  52. update: telegram.update.Update,
  53. context: telegram.ext.callbackcontext.CallbackContext,
  54. ):
  55. if "last_photo_message_id" in context.chat_data:
  56. update.effective_chat.send_message(
  57. text="Lösung: {}".format(context.chat_data["last_photo"].description_url),
  58. disable_web_page_preview=True,
  59. reply_to_message_id=context.chat_data["last_photo_message_id"],
  60. )
  61. # https://github.com/python-telegram-bot/python-telegram-bot/pull/2043
  62. update.effective_chat.send_location(
  63. latitude=context.chat_data["last_photo"].latitude,
  64. longitude=context.chat_data["last_photo"].longitude,
  65. disable_notification=True,
  66. )
  67. context.chat_data["last_photo_message_id"] = None
  68. update.effective_chat.send_message(
  69. text="Neues Photo wird ausgewählt und gesendet.", disable_notification=True
  70. )
  71. while True:
  72. photo = random.choice(context.bot_data["photos"])
  73. _LOGGER.info("sending %s", photo)
  74. try:
  75. with urllib.request.urlopen(photo.photo_url) as photo_response:
  76. photo_message = update.effective_chat.send_photo(
  77. photo=photo_response, caption="Wo wurde dieses Photo aufgenommen?",
  78. )
  79. except telegram.error.BadRequest:
  80. _LOGGER.warning("file size limit exceeded?", exc_info=True)
  81. except telegram.error.TimedOut:
  82. _LOGGER.warning("timeout", exc_info=True)
  83. else:
  84. break
  85. context.chat_data["last_photo"] = photo
  86. context.chat_data["last_photo_message_id"] = photo_message.message_id
  87. class _Persistence(telegram.ext.BasePersistence):
  88. """
  89. found no easier way to inject bot_data
  90. https://python-telegram-bot.readthedocs.io/en/latest/telegram.ext.basepersistence.html
  91. """
  92. def __init__(self, photos: typing.List[_Photo]) -> None:
  93. self._bot_data = {"photos": photos}
  94. super().__init__(
  95. store_bot_data=True, store_chat_data=False, store_user_data=False
  96. )
  97. def get_user_data(self) -> dict:
  98. return {}
  99. def get_chat_data(self) -> dict:
  100. return {}
  101. def get_bot_data(self) -> dict:
  102. return self._bot_data
  103. def get_conversations(self, name: str) -> dict:
  104. return {}
  105. def update_user_data(self, user_id: int, data: dict) -> None:
  106. pass
  107. def update_chat_data(self, chat_id: int, data: dict) -> None:
  108. pass
  109. def update_bot_data(self, data: dict) -> None:
  110. pass
  111. def update_conversation(
  112. self, name: str, key: tuple, new_state: typing.Optional[object]
  113. ) -> None:
  114. pass
  115. # https://git.hammerle.me/fphammerle/pyftpd-sink/src/5daf383bc238425cd37d011959a8eeffab0112c3/pyftpd-sink#L48
  116. class _EnvDefaultArgparser(argparse.ArgumentParser):
  117. def add_argument(self, *args, envvar=None, **kwargs):
  118. # pylint: disable=arguments-differ; using *args & **kwargs to catch all
  119. if envvar:
  120. envvar_value = os.environ.get(envvar, None)
  121. if envvar_value:
  122. kwargs["required"] = False
  123. kwargs["default"] = envvar_value
  124. super().add_argument(*args, **kwargs)
  125. def _main():
  126. argparser = _EnvDefaultArgparser()
  127. argparser.add_argument(
  128. "--telegram-token-path",
  129. type=pathlib.Path,
  130. required=True,
  131. envvar="TELEGRAM_TOKEN_PATH",
  132. help="default: env var TELEGRAM_TOKEN_PATH",
  133. )
  134. argparser.add_argument(
  135. "--wikimap-export-path",
  136. type=pathlib.Path,
  137. required=True,
  138. envvar="WIKIMAP_EXPORT_PATH",
  139. help="https://wikimap.toolforge.org/api.php?[...] json, "
  140. "default: env var WIKIMAP_EXPORT_PATH",
  141. )
  142. argparser.add_argument("--debug", action="store_true")
  143. args = argparser.parse_args()
  144. # https://github.com/fphammerle/python-cc1101/blob/26d8122661fc4587ecc7c73df55b92d05cf98fe8/cc1101/_cli.py#L51
  145. logging.basicConfig(
  146. level=logging.DEBUG if args.debug else logging.INFO,
  147. format="%(asctime)s:%(levelname)s:%(name)s:%(funcName)s:%(message)s"
  148. if args.debug
  149. else "%(message)s",
  150. datefmt="%Y-%m-%dT%H:%M:%S%z",
  151. )
  152. _LOGGER.debug("args=%r", args)
  153. photos = [
  154. _Photo.from_wikimap_export(attrs)
  155. for attrs in json.loads(args.wikimap_export_path.read_text())
  156. ]
  157. updater = telegram.ext.Updater(
  158. token=args.telegram_token_path.read_text().rstrip(),
  159. use_context=True,
  160. persistence=_Persistence(photos=photos),
  161. )
  162. updater.dispatcher.add_handler(telegram.ext.CommandHandler("photo", _photo_command))
  163. updater.start_polling()