Advertisement
DeaD_EyE

wikimedia.org/v2/stream/recentchange niquests with asyncio

Jun 22nd, 2025 (edited)
653
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.00 KB | None | 0 0
  1. import asyncio
  2. import json
  3.  
  4. import niquests
  5.  
  6.  
  7. async def get() -> dict:
  8.     async with niquests.AsyncSession() as session:
  9.         response = await session.get(
  10.             "https://stream.wikimedia.org/v2/stream/recentchange", stream=True
  11.         )
  12.         async for chunk in await response.iter_content():
  13.             try:
  14.                 data = chunk.decode()
  15.             except UnicodeDecodeError:
  16.                 continue
  17.  
  18.             if data.startswith("data:"):
  19.                 try:
  20.                     # it may fail here
  21.                     result = json.loads(data.removeprefix("data:"))
  22.                     # if it fails, it does not return
  23.                     # instead iterate
  24.                     return result
  25.                 except json.JSONDecodeError:
  26.                     # error
  27.                     pass
  28.  
  29.  
  30. async def main():
  31.     try:
  32.         return await asyncio.wait_for(get(), 60)
  33.     except asyncio.TimeoutError:
  34.         print("Got a timeout")
  35.  
  36.  
  37. asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement