test.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. #!/usr/bin/env python3
  2. import argparse
  3. import io
  4. import json
  5. import logging
  6. import pathlib
  7. import random
  8. import typing
  9. import urllib.request
  10. import telegram.ext
  11. import telegram.update
  12. _LOGGER = logging.getLogger(__name__)
  13. class _Photo:
  14. def __init__(
  15. self, photo_url: str, description_url: str, latitude: float, longitude: float
  16. ) -> None:
  17. self.photo_url = photo_url
  18. self.description_url = description_url
  19. self.latitude = latitude
  20. self.longitude = longitude
  21. def __str__(self) -> str:
  22. return "Photo({})".format(self.description_url)
  23. @classmethod
  24. def from_wikimap_export(cls, data: dict) -> "_Photo":
  25. if isinstance(data["coordinates"], list):
  26. coords = data["coordinates"][0]
  27. else:
  28. coords = data["coordinates"]["1"]
  29. assert len(data["imageinfo"]) == 1, data["imageinfo"]
  30. return cls(
  31. latitude=coords["lat"],
  32. longitude=coords["lon"],
  33. photo_url=data["imageinfo"][0]["url"],
  34. description_url=data["imageinfo"][0]["descriptionurl"],
  35. )
  36. def _photo_command(
  37. update: telegram.update.Update,
  38. context: telegram.ext.callbackcontext.CallbackContext,
  39. ):
  40. if "last_photo_message_id" in context.chat_data:
  41. update.effective_chat.send_message(
  42. text="Lösung: {}".format(context.chat_data["last_photo"].description_url),
  43. disable_web_page_preview=True,
  44. reply_to_message_id=context.chat_data["last_photo_message_id"],
  45. )
  46. # https://github.com/python-telegram-bot/python-telegram-bot/pull/2043
  47. context.bot.send_location(
  48. chat_id=update.effective_chat.id,
  49. latitude=context.chat_data["last_photo"].latitude,
  50. longitude=context.chat_data["last_photo"].longitude,
  51. disable_notification=True,
  52. )
  53. context.chat_data["last_photo_message_id"] = None
  54. update.effective_chat.send_message(
  55. text="Neues Photo wird ausgewählt und gesendet.", disable_notification=True
  56. )
  57. while True:
  58. photo = random.choice(context.bot_data["photos"])
  59. try:
  60. with urllib.request.urlopen(photo.photo_url) as photo_response:
  61. photo_message = update.effective_chat.send_photo(
  62. photo=photo_response, caption="Wo wurde dieses Photo aufgenommen?",
  63. )
  64. except telegram.error.BadRequest:
  65. _LOGGER.warning("file size limit exceeded?", exc_info=True)
  66. except telegram.error.TimedOut:
  67. _LOGGER.warning("timeout", exc_info=True)
  68. else:
  69. break
  70. context.chat_data["last_photo"] = photo
  71. context.chat_data["last_photo_message_id"] = photo_message.message_id
  72. class _Persistence(telegram.ext.BasePersistence):
  73. """
  74. found no easier way to inject bot_data
  75. https://python-telegram-bot.readthedocs.io/en/latest/telegram.ext.basepersistence.html
  76. """
  77. def __init__(self, photos: typing.List[_Photo]) -> None:
  78. self._bot_data = {"photos": photos}
  79. super().__init__(
  80. store_bot_data=True, store_chat_data=False, store_user_data=False
  81. )
  82. def get_user_data(self) -> dict:
  83. return {}
  84. def get_chat_data(self) -> dict:
  85. return {}
  86. def get_bot_data(self) -> dict:
  87. return self._bot_data
  88. def get_conversations(self, name: str) -> dict:
  89. return {}
  90. def update_user_data(self, user_id: int, data: dict) -> None:
  91. pass
  92. def update_chat_data(self, chat_id: int, data: dict) -> None:
  93. pass
  94. def update_bot_data(self, data: dict) -> None:
  95. pass
  96. def update_conversation(
  97. self, name: str, key: tuple, new_state: typing.Optional[object]
  98. ) -> None:
  99. pass
  100. def _main():
  101. logging.basicConfig(
  102. level=logging.DEBUG,
  103. format="%(asctime)s:%(levelname)s:%(name)s:%(funcName)s:%(message)s",
  104. datefmt="%Y-%m-%dT%H:%M:%S%z",
  105. )
  106. argparser = argparse.ArgumentParser()
  107. argparser.add_argument("--token-path", type=pathlib.Path, required=True)
  108. argparser.add_argument(
  109. "--wikimap-export-path",
  110. type=pathlib.Path,
  111. required=True,
  112. help="https://wikimap.toolforge.org/api.php?[...] json",
  113. )
  114. args = argparser.parse_args()
  115. _LOGGER.debug("args=%r", args)
  116. photos = [
  117. _Photo.from_wikimap_export(attrs)
  118. for attrs in json.loads(args.wikimap_export_path.read_text())
  119. ]
  120. updater = telegram.ext.Updater(
  121. token=args.token_path.read_text().rstrip(),
  122. use_context=True,
  123. persistence=_Persistence(photos=photos),
  124. )
  125. updater.dispatcher.add_handler(telegram.ext.CommandHandler("photo", _photo_command))
  126. updater.start_polling()
  127. if __name__ == "__main__":
  128. _main()