Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import json
- import niquests
- async def get() -> dict:
- async with niquests.AsyncSession() as session:
- response = await session.get(
- "https://stream.wikimedia.org/v2/stream/recentchange", stream=True
- )
- async for chunk in await response.iter_content():
- try:
- data = chunk.decode()
- except UnicodeDecodeError:
- continue
- if data.startswith("data:"):
- try:
- # it may fail here
- result = json.loads(data.removeprefix("data:"))
- # if it fails, it does not return
- # instead iterate
- return result
- except json.JSONDecodeError:
- # error
- pass
- async def main():
- try:
- return await asyncio.wait_for(get(), 60)
- except asyncio.TimeoutError:
- print("Got a timeout")
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement