DevOps e Escalabilidade

Planejamento de recuperação de desastres para pipelines de solução de CAPTCHA

Um pipeline de resolução de CAPTCHA que perde tarefas em andamento durante uma interrupção custa dados, tempo e dinheiro. O planejamento de recuperação de desastres (DR) garante que você possa se recuperar de falhas de infraestrutura, interrupções de API ou erros de configuração com perda mínima de dados.

Objetivos de DR

Métrica Definição Alvo do pipeline CAPTCHA
RPO (objetivo de ponto de recuperação) Máxima perda de dados tolerável < 5 minutos de tarefas na fila
RTO (objetivo de tempo de recuperação) Tempo máximo para restaurar o serviço <15 minutos
MTTR (tempo médio para recuperação) Tempo médio de recuperação < 10 minutos

Cenários de falha

Scenario 1: Worker crash         → Restart workers, replay queue
Scenario 2: Queue data loss      → Restore from persistent backup
Scenario 3: Network partition    → Failover to secondary region
Scenario 4: API key compromised  → Rotate key, update workers
Scenario 5: Config corruption    → Rollback to last known good

Camada de Persistência de Tarefas

Nunca resolva CAPTCHAs a partir de uma fila somente na memória. Persista tarefas para sobreviver a falhas.

Python — Fila de Tarefas Persistentes

import os
import json
import time
import sqlite3
import threading
import requests
from datetime import datetime

API_KEY = os.environ["CAPTCHAAI_API_KEY"]


class PersistentTaskQueue:
    """SQLite-backed task queue that survives crashes."""

    def __init__(self, db_path="captcha_tasks.db"):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self.lock = threading.Lock()
        self._init_db()

    def _init_db(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS tasks (
                id TEXT PRIMARY KEY,
                payload TEXT NOT NULL,
                status TEXT DEFAULT 'pending',
                created_at TEXT DEFAULT CURRENT_TIMESTAMP,
                started_at TEXT,
                completed_at TEXT,
                result TEXT,
                attempts INTEGER DEFAULT 0
            )
        """)
        self.conn.commit()

    def enqueue(self, task_id, payload):
        with self.lock:
            self.conn.execute(
                "INSERT INTO tasks (id, payload) VALUES (?, ?)",
                (task_id, json.dumps(payload))
            )
            self.conn.commit()

    def dequeue(self):
        with self.lock:
            cursor = self.conn.execute(
                "SELECT id, payload FROM tasks "
                "WHERE status = 'pending' ORDER BY created_at LIMIT 1"
            )
            row = cursor.fetchone()
            if not row:
                return None

            task_id, payload = row
            self.conn.execute(
                "UPDATE tasks SET status = 'processing', "
                "started_at = ?, attempts = attempts + 1 WHERE id = ?",
                (datetime.utcnow().isoformat(), task_id)
            )
            self.conn.commit()
            return {"id": task_id, "payload": json.loads(payload)}

    def complete(self, task_id, result):
        with self.lock:
            self.conn.execute(
                "UPDATE tasks SET status = 'completed', "
                "completed_at = ?, result = ? WHERE id = ?",
                (datetime.utcnow().isoformat(), json.dumps(result), task_id)
            )
            self.conn.commit()

    def fail(self, task_id, error):
        with self.lock:
            # Requeue if under retry limit
            cursor = self.conn.execute(
                "SELECT attempts FROM tasks WHERE id = ?", (task_id,)
            )
            row = cursor.fetchone()
            if row and row[0] < 3:
                self.conn.execute(
                    "UPDATE tasks SET status = 'pending' WHERE id = ?",
                    (task_id,)
                )
            else:
                self.conn.execute(
                    "UPDATE tasks SET status = 'failed', "
                    "result = ? WHERE id = ?",
                    (json.dumps({"error": error}), task_id)
                )
            self.conn.commit()

    def recover_stale(self, timeout_seconds=600):
        """Reset tasks stuck in 'processing' after a crash."""
        with self.lock:
            cutoff = datetime.utcnow().timestamp() - timeout_seconds
            self.conn.execute(
                "UPDATE tasks SET status = 'pending' "
                "WHERE status = 'processing' "
                "AND started_at < datetime(?, 'unixepoch')",
                (cutoff,)
            )
            count = self.conn.total_changes
            self.conn.commit()
            return count

    @property
    def stats(self):
        cursor = self.conn.execute(
            "SELECT status, COUNT(*) FROM tasks GROUP BY status"
        )
        return dict(cursor.fetchall())


# On startup: recover tasks that were processing during a crash
queue = PersistentTaskQueue()
recovered = queue.recover_stale(timeout_seconds=600)
print(f"Recovered {recovered} stale tasks after restart")

JavaScript – Gerenciador de recuperação

const axios = require("axios");
const fs = require("fs");

const API_KEY = process.env.CAPTCHAAI_API_KEY;

class DisasterRecoveryManager {
  constructor(checkpointDir = "./dr-checkpoints") {
    this.checkpointDir = checkpointDir;
    if (!fs.existsSync(checkpointDir)) {
      fs.mkdirSync(checkpointDir, { recursive: true });
    }
  }

  checkpoint(label, data) {
    const filename = `${this.checkpointDir}/${label}-${Date.now()}.json`;
    fs.writeFileSync(filename, JSON.stringify(data, null, 2));
    this.pruneOldCheckpoints(label, 10); // Keep last 10
    return filename;
  }

  restore(label) {
    const files = fs.readdirSync(this.checkpointDir)
      .filter((f) => f.startsWith(label) && f.endsWith(".json"))
      .sort()
      .reverse();

    if (files.length === 0) return null;
    const latest = fs.readFileSync(
      `${this.checkpointDir}/${files[0]}`, "utf8"
    );
    return JSON.parse(latest);
  }

  pruneOldCheckpoints(label, keep) {
    const files = fs.readdirSync(this.checkpointDir)
      .filter((f) => f.startsWith(label) && f.endsWith(".json"))
      .sort();

    while (files.length > keep) {
      const old = files.shift();
      fs.unlinkSync(`${this.checkpointDir}/${old}`);
    }
  }

  async healthCheck() {
    try {
      const resp = await axios.get("https://ocr.captchaai.com/res.php", {
        params: { key: API_KEY, action: "getbalance", json: 1 },
        timeout: 10000,
      });
      return {
        healthy: resp.data.status === 1,
        balance: parseFloat(resp.data.request || 0),
      };
    } catch (err) {
      return { healthy: false, error: err.message };
    }
  }
}

class ResilientSolver {
  constructor() {
    this.dr = new DisasterRecoveryManager();
    this.pendingTasks = [];
  }

  async solveBatch(tasks) {
    // Checkpoint before starting
    this.dr.checkpoint("batch-pending", {
      tasks,
      startedAt: new Date().toISOString(),
    });

    const results = [];
    for (const task of tasks) {
      try {
        const result = await this.solveSingle(task);
        results.push({ taskId: task.id, ...result });
      } catch (err) {
        results.push({ taskId: task.id, error: err.message });
      }

      // Checkpoint progress periodically
      if (results.length % 10 === 0) {
        this.dr.checkpoint("batch-progress", { results, remaining: tasks.length - results.length });
      }
    }

    // Final checkpoint
    this.dr.checkpoint("batch-complete", { results });
    return results;
  }

  async recover() {
    // Check for incomplete batch
    const progress = this.dr.restore("batch-progress");
    const pending = this.dr.restore("batch-pending");

    if (progress) {
      const completedIds = new Set(progress.results.map((r) => r.taskId));
      const remaining = pending?.tasks.filter((t) => !completedIds.has(t.id));
      console.log(
        `Recovering: ${progress.results.length} done, ${remaining?.length || 0} remaining`
      );
      return remaining || [];
    }

    if (pending) {
      console.log(`Recovering full batch: ${pending.tasks.length} tasks`);
      return pending.tasks;
    }

    return [];
  }

  async solveSingle(task) {
    const resp = await axios.post("https://ocr.captchaai.com/in.php", null, {
      params: {
        key: API_KEY,
        method: "userrecaptcha",
        googlekey: task.sitekey,
        pageurl: task.pageurl,
        json: 1,
      },
    });

    if (resp.data.status !== 1) throw new Error(resp.data.request);

    const captchaId = resp.data.request;
    for (let i = 0; i < 60; i++) {
      await new Promise((r) => setTimeout(r, 5000));
      const poll = await axios.get("https://ocr.captchaai.com/res.php", {
        params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
      });
      if (poll.data.status === 1) return { solution: poll.data.request };
      if (poll.data.request !== "CAPCHA_NOT_READY")
        throw new Error(poll.data.request);
    }
    throw new Error("TIMEOUT");
  }
}

// Start with recovery check
const solver = new ResilientSolver();
solver.recover().then((remaining) => {
  if (remaining.length > 0) {
    console.log(`Resuming ${remaining.length} tasks from checkpoint`);
    solver.solveBatch(remaining);
  }
});

Modelo de runbook de DR

RUNBOOK: CAPTCHA Pipeline Recovery
====================================

1. DETECT
   - Alert fires: [PagerDuty / Slack / Email]
   - Symptom: [Queue growing / Workers offline / Error spike]

2. ASSESS
   - Check worker health: curl http://workers/health
   - Check API status: GET /res.php?action=getbalance
   - Check queue depth: SELECT COUNT(*) FROM tasks WHERE status='pending'

3. RECOVER
   If: Workers crashed
     → Restart worker containers: docker-compose up -d workers
     → Run stale task recovery: recovery.py --recover-stale

   If: Network partition
     → Failover to secondary region
     → Update DNS or load balancer routing

   If: API key compromised
     → Generate new key at captchaai.com
     → Update secret store
     → Rolling restart workers

4. VERIFY
   - Confirm solve rate > 90%
   - Confirm queue draining
   - Confirm no duplicate solves

5. POST-MORTEM
   - Document root cause
   - Update runbook if needed

Mapeamento do objetivo de recuperação

  • Defina quais dados podem ser reproduzidos posteriormente e quais fluxos voltados ao usuário precisam de recuperação imediata após falha.
  • Mapeie cada componente do pipeline para um destino RPO e RTO em vez de tratar todo o sistema como um bloco.
  • Documente quem pode acionar o failover, quem valida a recuperação e quando retomar o roteamento normal.

Solução de problemas

Problema Causa Correção
Tarefas perdidas durante falha Somente fila na memória Use fila persistente (SQLite, Redis com AOF)
Duplicar resolve após recuperação Tarefas obsoletas reprocessadas sem desduplicação Adicione chaves de idempotência; verifique se já foi resolvido
A recuperação leva > RTO Backup de banco de dados muito antigo Aumentar a frequência dos pontos de verificação
Failover para região errada TTL do DNS muito alto Reduza o TTL para 60 antes dos failovers planejados

Perguntas frequentes

Com que frequência devo fazer checkpoint?

A cada 5 a 10 tarefas concluídas ou a cada 30 segundos, o que ocorrer primeiro. Pontos de verificação mais frequentes reduzem o RPO, mas aumentam a sobrecarga do I/O.

Devo usar SQLite ou Redis para persistência de tarefas?

SQLite para configurações de nó único (mais simples, sem infraestrutura extra). Redis com persistência AOF para sistemas distribuídos (pub/sub integrado com menor latência).

E se o próprio CaptchaAI sofrer uma interrupção?

Enfileirar tarefas localmente e tentar novamente quando a API se recuperar. CaptchaAI tem alto tempo de atividade, mas seu pipeline deve lidar com a indisponibilidade temporária normalmente com disjuntores e lógica de nova tentativa.

Próximas etapas

Planeje o pior -obtenha sua chave API CaptchaAIe inclua a recuperação de desastres em seu pipeline desde o primeiro dia.

Guias relacionados:

Os comentários estão desativados para este artigo.