Files
dashboard_test/poll_services.py
2025-08-29 22:36:35 +02:00

42 lines
1.1 KiB
Python

from mem import services, service
import httpx
import urllib3
import asyncio
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
async def check_service(client: httpx.AsyncClient, s: service):
try:
r = await client.head(
url=s.url,
follow_redirects=True,
timeout=1,
)
print(r.status_code)
s.set_online(r.status_code == 200)
except httpx.RequestError as e:
print(e)
s.set_online(False)
def start_async_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.run_coroutine_threadsafe(update_services(loop=loop), loop=loop)
loop.run_forever()
async def update_services(loop: asyncio.AbstractEventLoop):
print("Starting service updates...")
async with httpx.AsyncClient() as public_client, httpx.AsyncClient(
verify=False
) as local_client:
while True:
tasks = [
check_service(public_client if s.public else local_client, s)
for s in services
]
await asyncio.gather(*tasks)
await asyncio.sleep(2)