Tutoriais

Construindo uma fila de resolução CAPTCHA em Python com CaptchaAI

Ao fazer scraping em escala, você precisa de mais do que uma solução CAPTCHA de cada vez. Um sistema de fila separa o envio de CAPTCHA da recuperação de resultados, permitindo a resolução paralela de centenas de tarefas.


Por que usar uma fila?

A resolução de CAPTCHA de solicitação única desperdiça tempo de espera. Um sistema de filas:

  • Envia todos os CAPTCHAs imediatamente
  • Pesquisa vários IDs de tarefas em paralelo
  • Tentativas falhadas resolvem automaticamente
  • Controla a simultaneidade para respeitar os limites de taxa da API
  • Fornece rastreamento de progresso e retornos de chamada

Fila de threading básica

import time
import threading
import requests
from queue import Queue, Empty

API_KEY = "YOUR_API_KEY"


class CaptchaQueue:
    """Thread-based CAPTCHA solving queue."""

    def __init__(self, api_key, max_workers=10):
        self.api_key = api_key
        self.task_queue = Queue()
        self.result_queue = Queue()
        self.max_workers = max_workers
        self.workers = []

    def submit(self, method, callback=None, **params):
        """Add a CAPTCHA task to the queue."""
        task = {
            "method": method,
            "params": params,
            "callback": callback,
        }
        self.task_queue.put(task)

    def start(self):
        """Start worker threads."""
        for _ in range(self.max_workers):
            t = threading.Thread(target=self._worker, daemon=True)
            t.start()
            self.workers.append(t)

    def wait(self):
        """Wait for all tasks to complete."""
        self.task_queue.join()

    def get_results(self):
        """Get all available results."""
        results = []
        while not self.result_queue.empty():
            try:
                results.append(self.result_queue.get_nowait())
            except Empty:
                break
        return results

    def _worker(self):
        while True:
            try:
                task = self.task_queue.get(timeout=1)
            except Empty:
                continue

            try:
                result = self._solve(task["method"], **task["params"])
                entry = {"status": "solved", "result": result, "task": task}
                self.result_queue.put(entry)
                if task["callback"]:
                    task["callback"](result)
            except Exception as e:
                entry = {"status": "error", "error": str(e), "task": task}
                self.result_queue.put(entry)
            finally:
                self.task_queue.task_done()

    def _solve(self, method, **params):
        submit = requests.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key, "method": method, "json": 1, **params,
        }, timeout=30).json()

        if submit.get("status") != 1:
            raise Exception(f"Submit error: {submit.get('request')}")

        task_id = submit["request"]
        for _ in range(30):
            time.sleep(5)
            result = requests.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key, "action": "get", "id": task_id, "json": 1,
            }, timeout=30).json()
            if result.get("status") == 1:
                return result["request"]
            if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
                raise Exception("CAPTCHA unsolvable")
        raise TimeoutError("Solve timed out")


# Usage
queue = CaptchaQueue(API_KEY, max_workers=5)
queue.start()

# Submit multiple CAPTCHAs
urls_and_sitekeys = [
    ("https://example.com/page1", "SITEKEY_1"),
    ("https://example.com/page2", "SITEKEY_2"),
    ("https://example.com/page3", "SITEKEY_3"),
]

for url, sitekey in urls_and_sitekeys:
    queue.submit("userrecaptcha", googlekey=sitekey, pageurl=url)

queue.wait()
results = queue.get_results()
print(f"Solved {len(results)} CAPTCHAs")
for r in results:
    print(f"  {r['status']}: {r.get('result', r.get('error', ''))[:50]}")

Fila assíncrona com assíncrono

import asyncio
import aiohttp

API_KEY = "YOUR_API_KEY"


class AsyncCaptchaQueue:
    """Async CAPTCHA solving queue with concurrency control."""

    def __init__(self, api_key, max_concurrent=10):
        self.api_key = api_key
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results = []

    async def solve_batch(self, tasks):
        """Solve a batch of CAPTCHA tasks concurrently."""
        coros = [self._solve_task(task) for task in tasks]
        self.results = await asyncio.gather(*coros, return_exceptions=True)
        return self.results

    async def _solve_task(self, task):
        async with self.semaphore:
            return await self._solve(task["method"], **task["params"])

    async def _solve(self, method, **params):
        async with aiohttp.ClientSession() as session:
            # Submit
            async with session.post("https://ocr.captchaai.com/in.php", data={
                "key": self.api_key, "method": method, "json": 1, **params,
            }) as resp:
                data = await resp.json(content_type=None)
                if data.get("status") != 1:
                    raise Exception(f"Submit error: {data.get('request')}")
                task_id = data["request"]

            # Poll
            for _ in range(30):
                await asyncio.sleep(5)
                async with session.get("https://ocr.captchaai.com/res.php", params={
                    "key": self.api_key, "action": "get", "id": task_id, "json": 1,
                }) as resp:
                    result = await resp.json(content_type=None)
                    if result.get("status") == 1:
                        return result["request"]
                    if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
                        raise Exception("CAPTCHA unsolvable")

            raise TimeoutError("Solve timed out")


# Usage
async def main():
    queue = AsyncCaptchaQueue(API_KEY, max_concurrent=5)

    tasks = [
        {"method": "userrecaptcha", "params": {"googlekey": f"SITEKEY_{i}", "pageurl": f"https://example.com/page{i}"}}
        for i in range(10)
    ]

    results = await queue.solve_batch(tasks)
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i}: ERROR — {result}")
        else:
            print(f"Task {i}: {result[:50]}...")


asyncio.run(main())

Padrão produtor-consumidor

Para cargas de trabalho de raspagem contínua em que as páginas são descobertas dinamicamente:

import asyncio
import aiohttp

API_KEY = "YOUR_API_KEY"


class ProducerConsumerQueue:
    """Continuous CAPTCHA solving with producer-consumer pattern."""

    def __init__(self, api_key, queue_size=100, num_consumers=5):
        self.api_key = api_key
        self.queue = asyncio.Queue(maxsize=queue_size)
        self.num_consumers = num_consumers
        self.solved_count = 0
        self.error_count = 0
        self.running = True

    async def produce(self, tasks):
        """Producer: feed CAPTCHA tasks into the queue."""
        for task in tasks:
            await self.queue.put(task)
        # Signal consumers to stop
        for _ in range(self.num_consumers):
            await self.queue.put(None)

    async def consume(self, result_handler):
        """Consumer: solve CAPTCHAs and call result handler."""
        async with aiohttp.ClientSession() as session:
            while True:
                task = await self.queue.get()
                if task is None:
                    self.queue.task_done()
                    break

                try:
                    result = await self._solve(session, task["method"], **task["params"])
                    self.solved_count += 1
                    if result_handler:
                        await result_handler(task, result)
                except Exception as e:
                    self.error_count += 1
                    print(f"Error: {e}")
                finally:
                    self.queue.task_done()

    async def run(self, tasks, result_handler=None):
        """Run the producer-consumer pipeline."""
        # Start producer
        producer = asyncio.create_task(self.produce(tasks))

        # Start consumers
        consumers = [
            asyncio.create_task(self.consume(result_handler))
            for _ in range(self.num_consumers)
        ]

        # Wait for everything to finish
        await producer
        await asyncio.gather(*consumers)

        print(f"Complete: {self.solved_count} solved, {self.error_count} errors")

    async def _solve(self, session, method, **params):
        async with session.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key, "method": method, "json": 1, **params,
        }) as resp:
            data = await resp.json(content_type=None)
            if data.get("status") != 1:
                raise Exception(f"Submit: {data.get('request')}")
            task_id = data["request"]

        for _ in range(30):
            await asyncio.sleep(5)
            async with session.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key, "action": "get", "id": task_id, "json": 1,
            }) as resp:
                result = await resp.json(content_type=None)
                if result.get("status") == 1:
                    return result["request"]
        raise TimeoutError("Timed out")


# Usage
async def handle_result(task, token):
    url = task["params"]["pageurl"]
    print(f"Solved for {url}: {token[:30]}...")


async def main():
    queue = ProducerConsumerQueue(API_KEY, num_consumers=5)

    tasks = [
        {"method": "userrecaptcha", "params": {"googlekey": f"SITEKEY_{i}", "pageurl": f"https://example.com/page{i}"}}
        for i in range(20)
    ]

    await queue.run(tasks, result_handler=handle_result)


asyncio.run(main())

Fila prioritária

Quando alguns CAPTCHAs são mais importantes que outros:

import asyncio
from dataclasses import dataclass, field

API_KEY = "YOUR_API_KEY"


@dataclass(order=True)
class PriorityTask:
    priority: int
    task: dict = field(compare=False)


class PriorityCaptchaQueue:
    """CAPTCHA queue with priority levels."""

    def __init__(self, api_key, num_workers=5):
        self.api_key = api_key
        self.queue = asyncio.PriorityQueue()
        self.num_workers = num_workers
        self.results = {}

    async def submit(self, task_id, method, priority=5, **params):
        """Submit with priority (lower number = higher priority)."""
        await self.queue.put(PriorityTask(
            priority=priority,
            task={"id": task_id, "method": method, "params": params},
        ))

    async def process(self):
        """Process all queued tasks by priority."""
        workers = [asyncio.create_task(self._worker()) for _ in range(self.num_workers)]

        # Wait for queue to drain
        await self.queue.join()

        # Cancel workers
        for w in workers:
            w.cancel()

        return self.results

    async def _worker(self):
        import aiohttp
        async with aiohttp.ClientSession() as session:
            while True:
                item = await self.queue.get()
                task = item.task
                try:
                    result = await self._solve(session, task["method"], **task["params"])
                    self.results[task["id"]] = {"status": "solved", "token": result}
                except Exception as e:
                    self.results[task["id"]] = {"status": "error", "error": str(e)}
                finally:
                    self.queue.task_done()

    async def _solve(self, session, method, **params):
        import aiohttp
        async with session.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key, "method": method, "json": 1, **params,
        }) as resp:
            data = await resp.json(content_type=None)
            if data.get("status") != 1:
                raise Exception(data.get("request"))
            task_id = data["request"]

        for _ in range(30):
            await asyncio.sleep(5)
            async with session.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key, "action": "get", "id": task_id, "json": 1,
            }) as resp:
                result = await resp.json(content_type=None)
                if result.get("status") == 1:
                    return result["request"]
        raise TimeoutError()


# Usage
async def main():
    pq = PriorityCaptchaQueue(API_KEY, num_workers=3)

    # High priority — checkout pages
    await pq.submit("checkout_1", "turnstile", priority=1, sitekey="KEY", pageurl="https://shop.com/checkout")

    # Normal priority — product pages
    for i in range(5):
        await pq.submit(f"product_{i}", "userrecaptcha", priority=5, googlekey="KEY", pageurl=f"https://shop.com/p/{i}")

    # Low priority — info pages
    for i in range(3):
        await pq.submit(f"info_{i}", "userrecaptcha", priority=10, googlekey="KEY", pageurl=f"https://shop.com/info/{i}")

    results = await pq.process()
    for task_id, result in results.items():
        print(f"{task_id}: {result['status']}")


asyncio.run(main())

Monitoramento e relatórios

import time
from dataclasses import dataclass, field


@dataclass
class QueueMetrics:
    submitted: int = 0
    solved: int = 0
    failed: int = 0
    total_solve_time: float = 0.0
    start_time: float = field(default_factory=time.time)

    @property
    def avg_solve_time(self):
        return self.total_solve_time / self.solved if self.solved else 0

    @property
    def success_rate(self):
        total = self.solved + self.failed
        return (self.solved / total * 100) if total else 0

    @property
    def throughput(self):
        elapsed = time.time() - self.start_time
        return self.solved / elapsed * 60 if elapsed > 0 else 0

    def report(self):
        return (
            f"Submitted: {self.submitted} | "
            f"Solved: {self.solved} | "
            f"Failed: {self.failed} | "
            f"Avg time: {self.avg_solve_time:.1f}s | "
            f"Success: {self.success_rate:.1f}% | "
            f"Throughput: {self.throughput:.0f}/min"
        )

Solução de problemas

Sintoma Causa Correção
A fila aumenta, mas as tarefas não são concluídas Muitos trabalhadores sobrecarregam a API Reduzir max_workers/max_concurrent
ERROR_NO_SLOT_AVAILABLE Limite de simultaneidade da API atingido Adicionar atraso entre envios
Tarefas presas na fila Threads de trabalho morreram na exceção Envolva o loop de trabalho em try/except
A memória cresce com o tempo Resultados não consumidos Ligue para get_results() periodicamente
Blocos de fila assíncronos Faltando await Certifique-se de que todas as chamadas assíncronas sejam aguardadas

Perguntas frequentes

Quantas soluções simultâneas posso executar?

CaptchaAI lida com simultaneidade no lado do servidor. Comece com 10 trabalhadores simultâneos e aumente com base nos limites do seu plano. Verifique ERROR_NO_SLOT_AVAILABLE para saber quando acelerar.

Devo usar threading ou assíncio?

Use asyncio para novos projetos - ele lida com a resolução de I/O-bound CAPTCHA com mais eficiência. Use threading se estiver integrando ao código síncrono existente.

Como lidar com os limites de taxa da API?

Use um semáforo (assíncrono) ou uma fila limitada (threading) para limitar solicitações simultâneas. Adicione um pequeno atraso entre os envios se você clicar em ERROR_NO_SLOT_AVAILABLE.


Resumo

Uma fila de resolução CAPTCHA separa o envio da pesquisa, permitindo a resolução paralela comCaptchaAI. Escolha threading para código síncrono, asyncio para Python moderno e produtor-consumidor para cargas de trabalho contínuas.

Artigos relacionados

Os comentários estão desativados para este artigo.