Building a Proxy Health Monitor for 24/7 Scraper Uptime


A step-by-step guide to building an automated proxy monitoring system that detects failures, rotates dead proxies, and alerts you before your scraper goes down.

Disclosure: This article is part of a sponsored series with Proxy-Seller and was drafted with AI assistance and edited by a human author. The promo code SPINOV15 and product mentions reflect a paid editorial relationship. Technical content, benchmarks, and code examples are based on real production experience.

Introduction

Your scraper is only as reliable as your proxies. A single dead proxy in your rotation pool can silently kill your success rate — and you won’t know until you check the results hours later.

I learned this the hard way. A 100K-page scraping job ran overnight. When I checked in the morning, 40% of requests had failed because 3 out of 10 proxies had gone down at 2 AM. No alerts. No automatic failover. Just 40,000 wasted requests and a 12-hour delay.

That’s when I built a proxy health monitoring system. It runs alongside every scraper I deploy, automatically detects proxy failures, removes dead proxies from rotation, and sends alerts when the healthy pool drops below a threshold.

In this guide, I’ll show you how to build the same system using Python and proxies from Proxy-Seller (use code SPINOV15 for 15% off). The complete code is production-ready — I use it daily across multiple scraping projects.

Architecture Overview

The monitor has four components:

  1. Health Checker — Tests each proxy every N minutes
  2. Pool Manager — Maintains a list of healthy proxies, removes dead ones
  3. Alert System — Notifies you when proxy health drops below threshold
  4. Dashboard — Simple web UI showing real-time proxy status
┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│ Health Check │────▶│ Pool Manager │────▶│ Your Scraper│
│  (every 5m) │     │  (healthy[]) │     │  (uses pool)│
└─────────────┘     └──────────────┘     └─────────────┘
       │                    │
       ▼                    ▼
┌─────────────┐     ┌──────────────┐
│   Metrics   │     │    Alerts    │
│  (SQLite)   │     │(Email/Slack) │
└─────────────┘     └──────────────┘

Step 1: The Health Checker

The health checker tests each proxy against a reliable endpoint and measures response time, success rate, and geographic location.

import asyncio
import aiohttp
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class ProxyHealth:
    proxy_url: str
    is_healthy: bool = True
    last_check: Optional[datetime] = None
    response_time_ms: float = 0
    success_count: int = 0
    fail_count: int = 0
    consecutive_failures: int = 0
    ip_address: Optional[str] = None
    country: Optional[str] = None
    
    @property
    def success_rate(self) -> float:
        total = self.success_count + self.fail_count
        return self.success_count / total if total > 0 else 0

class HealthChecker:
    """Tests proxy health using multiple endpoints."""
    
    TEST_ENDPOINTS = [
        "https://httpbin.org/ip",
        "https://api.ipify.org?format=json",
        "https://ifconfig.me/all.json",
    ]
    
    def __init__(self, timeout: int = 10):
        self.timeout = aiohttp.ClientTimeout(total=timeout)
    
    async def check_proxy(self, proxy_url: str) -> ProxyHealth:
        """Run health check on a single proxy."""
        health = ProxyHealth(proxy_url=proxy_url)
        health.last_check = datetime.now()
        
        start = time.monotonic()
        
        try:
            async with aiohttp.ClientSession(timeout=self.timeout) as session:
                endpoint = self.TEST_ENDPOINTS[0]
                async with session.get(endpoint, proxy=proxy_url) as resp:
                    if resp.status == 200:
                        data = await resp.json()
                        health.is_healthy = True
                        health.ip_address = data.get("origin") or data.get("ip")
                        health.response_time_ms = (time.monotonic() - start) * 1000
                        health.success_count += 1
                        health.consecutive_failures = 0
                    else:
                        health.is_healthy = False
                        health.fail_count += 1
                        health.consecutive_failures += 1
        except Exception as e:
            health.is_healthy = False
            health.fail_count += 1
            health.consecutive_failures += 1
            health.response_time_ms = (time.monotonic() - start) * 1000
        
        return health
    
    async def check_all(self, proxy_urls: list[str]) -> list[ProxyHealth]:
        """Check all proxies concurrently."""
        tasks = [self.check_proxy(url) for url in proxy_urls]
        return await asyncio.gather(*tasks)

Why test against multiple endpoints? If httpbin.org is down, your monitor shouldn’t mark all proxies as dead. The fallback chain handles this:

async def check_with_fallback(self, proxy_url: str) -> ProxyHealth:
    """Try multiple endpoints before declaring proxy dead."""
    for endpoint in self.TEST_ENDPOINTS:
        health = await self._single_check(proxy_url, endpoint)
        if health.is_healthy:
            return health
    # All endpoints failed → proxy is truly dead
    return health

Step 2: The Pool Manager

The pool manager maintains a list of healthy proxies and provides them to your scraper.

import threading
import logging

logger = logging.getLogger("proxy_monitor")

class ProxyPool:
    """Thread-safe proxy pool with automatic health management."""
    
    def __init__(self, proxy_urls: list[str], min_healthy: int = 3):
        self.all_proxies = {url: ProxyHealth(proxy_url=url) for url in proxy_urls}
        self.min_healthy = min_healthy
        self._lock = threading.Lock()
        self.checker = HealthChecker()
    
    def get_healthy_proxy(self) -> Optional[str]:
        """Return a random healthy proxy."""
        import random
        with self._lock:
            healthy = [url for url, h in self.all_proxies.items() if h.is_healthy]
            if not healthy:
                logger.critical("NO HEALTHY PROXIES AVAILABLE!")
                return None
            return random.choice(healthy)
    
    def get_healthy_proxies(self) -> list[str]:
        """Return all healthy proxy URLs."""
        with self._lock:
            return [url for url, h in self.all_proxies.items() if h.is_healthy]
    
    async def refresh(self):
        """Run health checks and update pool."""
        proxy_urls = list(self.all_proxies.keys())
        results = await self.checker.check_all(proxy_urls)
        
        with self._lock:
            for health in results:
                old = self.all_proxies[health.proxy_url]
                # Accumulate stats
                health.success_count += old.success_count
                health.fail_count += old.fail_count
                self.all_proxies[health.proxy_url] = health
            
            healthy_count = sum(1 for h in self.all_proxies.values() if h.is_healthy)
            total = len(self.all_proxies)
            
            logger.info(f"Pool health: {healthy_count}/{total} proxies healthy")
            
            if healthy_count < self.min_healthy:
                logger.critical(
                    f"ALERT: Only {healthy_count} healthy proxies "
                    f"(minimum: {self.min_healthy})"
                )
                return False  # Trigger alert
        
        return True
    
    def get_stats(self) -> dict:
        """Return pool statistics."""
        with self._lock:
            healths = list(self.all_proxies.values())
            healthy = [h for h in healths if h.is_healthy]
            return {
                "total": len(healths),
                "healthy": len(healthy),
                "avg_response_ms": sum(h.response_time_ms for h in healthy) / max(len(healthy), 1),
                "avg_success_rate": sum(h.success_rate for h in healths) / max(len(healths), 1),
                "last_check": max((h.last_check for h in healths if h.last_check), default=None),
            }

Integrating with Your Scraper

Replace your static proxy list with the pool manager:

# Before: Static proxy list (fragile)
proxies = ["http://user:pass@proxy1:10000", "http://user:pass@proxy2:10001"]
proxy = random.choice(proxies)  # Might pick a dead proxy!

# After: Managed pool (resilient)
pool = ProxyPool(proxy_urls=proxies, min_healthy=2)
proxy = pool.get_healthy_proxy()  # Always returns a healthy proxy

Step 3: Automated Monitoring Loop

Tie it all together with a background monitoring loop:

import asyncio
import smtplib
from email.mime.text import MIMEText

class ProxyMonitor:
    """Runs health checks on a schedule and sends alerts."""
    
    def __init__(self, pool: ProxyPool, check_interval: int = 300):
        self.pool = pool
        self.check_interval = check_interval  # seconds
        self.alert_sent = False
    
    async def run_forever(self):
        """Main monitoring loop — run as background task."""
        logger.info(f"Proxy monitor started (checking every {self.check_interval}s)")
        
        while True:
            pool_healthy = await self.pool.refresh()
            stats = self.pool.get_stats()
            
            logger.info(
                f"Check complete: {stats['healthy']}/{stats['total']} healthy, "
                f"avg response: {stats['avg_response_ms']:.0f}ms"
            )
            
            if not pool_healthy and not self.alert_sent:
                await self.send_alert(stats)
                self.alert_sent = True
            elif pool_healthy:
                self.alert_sent = False
            
            await asyncio.sleep(self.check_interval)
    
    async def send_alert(self, stats: dict):
        """Send email alert when proxy health drops."""
        subject = f"⚠️ Proxy Alert: {stats['healthy']}/{stats['total']} healthy"
        body = f"""
Proxy pool health has dropped below minimum threshold.

Healthy: {stats['healthy']}/{stats['total']}
Avg Response: {stats['avg_response_ms']:.0f}ms
Avg Success Rate: {stats['avg_success_rate']:.1%}
Last Check: {stats['last_check']}

Action: Check Proxy-Seller dashboard for IP issues.
        """
        
        # Send via SMTP (configure with your email)
        msg = MIMEText(body)
        msg["Subject"] = subject
        msg["From"] = "monitor@yourdomain.com"
        msg["To"] = "alerts@yourdomain.com"
        
        try:
            with smtplib.SMTP("smtp.gmail.com", 587) as server:
                server.starttls()
                server.login("your-email@gmail.com", "your-app-password")
                server.send_message(msg)
            logger.info("Alert email sent!")
        except Exception as e:
            logger.error(f"Failed to send alert: {e}")

# Usage: Run monitor alongside your scraper
async def main():
    proxy_urls = [
        f"http://user:pass@gate.proxy-seller.com:{10000+i}"
        for i in range(20)  # 20 Proxy-Seller proxies
    ]
    
    pool = ProxyPool(proxy_urls, min_healthy=5)
    monitor = ProxyMonitor(pool, check_interval=300)  # Check every 5 min
    
    # Run monitor in background
    monitor_task = asyncio.create_task(monitor.run_forever())
    
    # Your scraper uses the pool
    while True:
        proxy = pool.get_healthy_proxy()
        if proxy:
            # ... your scraping logic here ...
            pass
        else:
            logger.critical("No proxies available — waiting for recovery")
            await asyncio.sleep(60)

if __name__ == "__main__":
    asyncio.run(main())

Step 4: Metrics & History (SQLite)

Store health check results for trend analysis:

import sqlite3

class MetricsStore:
    def __init__(self, db_path="proxy_metrics.db"):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS checks (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                proxy_url TEXT NOT NULL,
                is_healthy BOOLEAN,
                response_time_ms REAL,
                ip_address TEXT,
                checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        self.conn.commit()
    
    def record(self, health: ProxyHealth):
        self.conn.execute(
            "INSERT INTO checks (proxy_url, is_healthy, response_time_ms, ip_address) VALUES (?, ?, ?, ?)",
            (health.proxy_url, health.is_healthy, health.response_time_ms, health.ip_address)
        )
        self.conn.commit()
    
    def get_uptime(self, proxy_url: str, hours: int = 24) -> float:
        """Get proxy uptime percentage over last N hours."""
        row = self.conn.execute("""
            SELECT 
                COUNT(CASE WHEN is_healthy THEN 1 END) * 100.0 / COUNT(*)
            FROM checks 
            WHERE proxy_url = ? 
            AND checked_at > datetime('now', ?)
        """, (proxy_url, f"-{hours} hours")).fetchone()
        return row[0] if row[0] else 0
    
    def get_avg_response(self, hours: int = 24) -> dict:
        """Get average response time by proxy over last N hours."""
        rows = self.conn.execute("""
            SELECT proxy_url, AVG(response_time_ms), COUNT(*)
            FROM checks 
            WHERE is_healthy = 1 
            AND checked_at > datetime('now', ?)
            GROUP BY proxy_url
            ORDER BY AVG(response_time_ms)
        """, (f"-{hours} hours",)).fetchall()
        return {row[0]: {"avg_ms": row[1], "checks": row[2]} for row in rows}

This gives you data to answer questions like:

  • Which proxies are most reliable over the last 7 days?
  • What time of day do proxies perform best?
  • Is Proxy-Seller’s uptime meeting the SLA?

Advanced: Auto-Scaling Proxy Pool

The basic monitor removes dead proxies. The advanced version automatically adjusts the pool based on load and health.

Dynamic Concurrency Based on Pool Health

When proxies die, reduce concurrency to avoid overloading the remaining ones:

class AdaptiveScraper:
    """Adjusts concurrency based on proxy pool health."""
    
    def __init__(self, pool: ProxyPool, base_concurrency: int = 10):
        self.pool = pool
        self.base_concurrency = base_concurrency
    
    @property
    def current_concurrency(self) -> int:
        """Scale concurrency with healthy proxy count."""
        stats = self.pool.get_stats()
        health_ratio = stats["healthy"] / max(stats["total"], 1)
        
        if health_ratio > 0.8:
            return self.base_concurrency  # Full speed
        elif health_ratio > 0.5:
            return max(self.base_concurrency // 2, 3)  # Half speed
        else:
            return 2  # Survival mode
    
    async def scrape_batch(self, urls: list[str]):
        sem = asyncio.Semaphore(self.current_concurrency)
        async with aiohttp.ClientSession() as session:
            tasks = [self._fetch(session, url, sem) for url in urls]
            return await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _fetch(self, session, url, sem):
        async with sem:
            proxy = self.pool.get_healthy_proxy()
            if not proxy:
                return {"url": url, "error": "no healthy proxies"}
            request_timeout = aiohttp.ClientTimeout(total=15)
            async with session.get(url, proxy=proxy, timeout=request_timeout) as resp:
                return {"url": url, "status": resp.status, "html": await resp.text()}

Proxy Performance Ranking

Not all healthy proxies perform equally. Rank them by response time and route critical requests through the fastest ones:

def get_ranked_proxies(self) -> list[str]:
    """Return healthy proxies sorted by response time (fastest first)."""
    with self._lock:
        healthy = [
            (url, h) for url, h in self.all_proxies.items() 
            if h.is_healthy and h.response_time_ms > 0
        ]
        healthy.sort(key=lambda x: x[1].response_time_ms)
        return [url for url, _ in healthy]

def get_fastest_proxy(self) -> Optional[str]:
    """Return the single fastest healthy proxy."""
    ranked = self.get_ranked_proxies()
    return ranked[0] if ranked else None

Use get_fastest_proxy() for time-critical requests (price checks, stock monitoring) and get_healthy_proxy() for bulk scraping where latency matters less.

Track failure rates over time to predict proxy degradation before it causes problems:

def get_failure_trend(self, proxy_url: str, window_hours: int = 6) -> str:
    """Detect if proxy is degrading, stable, or improving."""
    rows = self.conn.execute("""
        SELECT 
            CASE WHEN checked_at > datetime('now', '-3 hours') THEN 'recent' ELSE 'older' END as period,
            AVG(CASE WHEN is_healthy THEN 1.0 ELSE 0.0 END) as success_rate
        FROM checks
        WHERE proxy_url = ? AND checked_at > datetime('now', ?)
        GROUP BY period
    """, (proxy_url, f"-{window_hours} hours")).fetchall()
    
    if len(rows) < 2:
        return "insufficient_data"
    
    rates = {r[0]: r[1] for r in rows}
    recent = rates.get("recent", 0)
    older = rates.get("older", 0)
    
    if recent < older - 0.1:
        return "degrading"  # ⚠️ Getting worse
    elif recent > older + 0.1:
        return "improving"  # ✅ Getting better
    return "stable"

When a proxy shows “degrading” trend for 2+ consecutive checks, proactively move it to a quarantine list before it starts failing your scraping requests.

Quick Start: 5-Minute Setup

Here’s the minimal code to add proxy monitoring to any existing scraper:

import asyncio

# 1. Define your Proxy-Seller proxies
proxies = [f"http://user:pass@gate.proxy-seller.com:{10000+i}" for i in range(10)]

# 2. Create pool and monitor
pool = ProxyPool(proxies, min_healthy=3)
monitor = ProxyMonitor(pool, check_interval=300)

# 3. Start monitoring in background
async def start():
    asyncio.create_task(monitor.run_forever())
    
    # 4. Use pool.get_healthy_proxy() in your scraper
    while True:
        proxy = pool.get_healthy_proxy()
        print(f"Using proxy: {proxy}")
        await asyncio.sleep(1)

asyncio.run(start())

That’s it. Five minutes of setup saves hours of debugging dead proxies.

Conclusion

A proxy health monitor transforms your scraper from “works when I’m watching” to “runs reliably 24/7.” The key components — health checking, pool management, alerting, and metrics — are straightforward to build and pay for themselves the first time they prevent a failed scraping job.

With Proxy-Seller’s reliable proxy infrastructure and the monitoring system above, you get the best of both worlds: high-quality proxies with automated failover when issues occur.

Get started with Proxy-Seller proxies → — Use promo code SPINOV15 for 15% off your first order.


💡 Reader discount: Get 15% off any Proxy-Seller plan with code SPINOV15. Works on residential, datacenter, and ISP proxies.


Need help building a production scraping pipeline? I design end-to-end systems that handle millions of pages. Email me for a free architecture review.


More production scraping tips: t.me/scraping_ai