test.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. #!/usr/bin/env python3
  2. import argparse
  3. import io
  4. import json
  5. import logging
  6. import pathlib
  7. import random
  8. import typing
  9. import urllib.request
  10. import telegram.ext
  11. import telegram.update
  12. _LOGGER = logging.getLogger(__name__)
  13. class _Photo:
  14. def __init__(
  15. self, photo_url: str, description_url: str, latitude: float, longitude: float
  16. ) -> None:
  17. self.photo_url = photo_url
  18. self.description_url = description_url
  19. self.latitude = latitude
  20. self.longitude = longitude
  21. def __str__(self) -> str:
  22. return "Photo({})".format(self.description_url)
  23. @classmethod
  24. def from_wikimap_export(cls, data: dict) -> "_Photo":
  25. if isinstance(data["coordinates"], list):
  26. coords = data["coordinates"][0]
  27. else:
  28. coords = data["coordinates"]["1"]
  29. assert len(data["imageinfo"]) == 1, data["imageinfo"]
  30. return cls(
  31. latitude=coords["lat"],
  32. longitude=coords["lon"],
  33. photo_url=data["imageinfo"][0]["url"],
  34. description_url=data["imageinfo"][0]["descriptionurl"],
  35. )
  36. def _photo_command(
  37. update: telegram.update.Update,
  38. context: telegram.ext.callbackcontext.CallbackContext,
  39. ):
  40. if "last_photo_message_id" in context.chat_data:
  41. update.effective_chat.send_message(
  42. text="Lösung: {}".format(context.chat_data["last_photo"].description_url),
  43. disable_notification=True,
  44. )
  45. # https://github.com/python-telegram-bot/python-telegram-bot/pull/2043
  46. context.bot.send_location(
  47. chat_id=update.effective_chat.id,
  48. latitude=context.chat_data["last_photo"].latitude,
  49. longitude=context.chat_data["last_photo"].longitude,
  50. reply_to_message_id=context.chat_data["last_photo_message_id"],
  51. )
  52. context.chat_data["last_photo_message_id"] = None
  53. update.effective_chat.send_message(
  54. text="Neues Photo wird ausgewählt und gesendet.", disable_notification=True
  55. )
  56. while True:
  57. photo = random.choice(context.bot_data["photos"])
  58. try:
  59. with urllib.request.urlopen(photo.photo_url) as photo_response:
  60. photo_message = update.effective_chat.send_photo(
  61. photo=photo_response, caption="Wo wurde dieses Photo aufgenommen?",
  62. )
  63. except telegram.error.BadRequest:
  64. _LOGGER.warning("file size limit exceeded?", exc_info=True)
  65. except telegram.error.TimedOut:
  66. _LOGGER.warning("timeout", exc_info=True)
  67. else:
  68. break
  69. context.chat_data["last_photo"] = photo
  70. context.chat_data["last_photo_message_id"] = photo_message.message_id
  71. class _Persistence(telegram.ext.BasePersistence):
  72. """
  73. found no easier way to inject bot_data
  74. https://python-telegram-bot.readthedocs.io/en/latest/telegram.ext.basepersistence.html
  75. """
  76. def __init__(self, photos: typing.List[_Photo]) -> None:
  77. self._bot_data = {"photos": photos}
  78. super().__init__(
  79. store_bot_data=True, store_chat_data=False, store_user_data=False
  80. )
  81. def get_user_data(self) -> dict:
  82. return {}
  83. def get_chat_data(self) -> dict:
  84. return {}
  85. def get_bot_data(self) -> dict:
  86. return self._bot_data
  87. def get_conversations(self, name: str) -> dict:
  88. return {}
  89. def update_user_data(self, user_id: int, data: dict) -> None:
  90. pass
  91. def update_chat_data(self, chat_id: int, data: dict) -> None:
  92. pass
  93. def update_bot_data(self, data: dict) -> None:
  94. pass
  95. def update_conversation(
  96. self, name: str, key: tuple, new_state: typing.Optional[object]
  97. ) -> None:
  98. pass
  99. def _main():
  100. logging.basicConfig(
  101. level=logging.DEBUG,
  102. format="%(asctime)s:%(levelname)s:%(name)s:%(funcName)s:%(message)s",
  103. datefmt="%Y-%m-%dT%H:%M:%S%z",
  104. )
  105. argparser = argparse.ArgumentParser()
  106. argparser.add_argument("--token-path", type=pathlib.Path, required=True)
  107. argparser.add_argument(
  108. "--wikimap-export-path",
  109. type=pathlib.Path,
  110. required=True,
  111. help="https://wikimap.toolforge.org/api.php?[...] json",
  112. )
  113. args = argparser.parse_args()
  114. _LOGGER.debug("args=%r", args)
  115. photos = [
  116. _Photo.from_wikimap_export(attrs)
  117. for attrs in json.loads(args.wikimap_export_path.read_text())
  118. ]
  119. updater = telegram.ext.Updater(
  120. token=args.token_path.read_text().rstrip(),
  121. use_context=True,
  122. persistence=_Persistence(photos=photos),
  123. )
  124. updater.dispatcher.add_handler(telegram.ext.CommandHandler("photo", _photo_command))
  125. updater.start_polling()
  126. if __name__ == "__main__":
  127. _main()