Convergent Solution Banner

Documentation

Concurrent

This program is async-safe and designed to run multiple analyses simultaneously. This enables powerful patterns like ensemble voting, batch processing, and comparative analysis.

Why Run Multiple Analyses?

When using smaller models, no single run should be relied upon for critical decisions. Running multiple analyses lets you:

  • Find consensus: If most analyses agree, confidence is high
  • Identify edge cases: Divergent results reveal difficult aspects
  • Explore variations: Different temperatures produce different perspectives
  • Validate important decisions: Multiple confirmations reduce error risk

Single Analysis (Baseline)

For reference, here's a simple single analysis:

Python
import asyncio
from agent import run_analysis

async def main():
    result = await run_analysis(
        query="What are the key factors affecting renewable energy adoption?",
        groq_api_key="your-groq-api-key",
    )
    print(f"Answer: {result.final_answer}")

asyncio.run(main())

Running Multiple Concurrent Analyses

For concurrent analyses, create shared resources to coordinate API access:

  1. ONE shared API client (connection pooling)
  2. ONE shared state manager (rate limit coordination)
Python
import asyncio
from openai import AsyncOpenAI, DefaultAioHttpClient
from agent import create_agent
from api_clients.groq_api_openai_client_shared_state import GroqAPIOpenAIClientSharedState

async def run_ensemble_analysis(query: str, num_runs: int = 5):
    """Run multiple analyses and collect results for ensemble voting."""
    
    # Step 1: Create ONE shared client (connection pooling)
    shared_client = AsyncOpenAI(
        api_key="your-groq-api-key",
        base_url="https://api.groq.com/openai/v1",
        timeout=120,
        http_client=DefaultAioHttpClient(),
    )
    
    # Step 2: Create ONE shared state manager (rate limit coordination)
    shared_state = GroqAPIOpenAIClientSharedState()
    
    async def single_analysis(run_id: int):
        """Execute a single analysis run."""
        agent = await create_agent(
            agent_id=f"run{run_id:02d}",
            api_client=shared_client,
            shared_api_state=shared_state,
            params={
                "mode": "balanced",
                "temperature_offset": 0.1 * run_id,  # Vary creativity
            }
        )
        try:
            return await agent.analyze(query)
        finally:
            await agent.cleanup()
    
    # Step 3: Run all analyses concurrently
    tasks = [single_analysis(i) for i in range(num_runs)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Step 4: Collect successful results
    successful = [r for r in results if not isinstance(r, Exception)]
    print(f"Completed {len(successful)}/{num_runs} analyses")
    
    return successful

# Example: Run 5 analyses and find consensus
asyncio.run(run_ensemble_analysis("Is quantum computing a threat to current encryption?", 5))

The Shared State Manager

The GroqAPIOpenAIClientSharedState provides critical coordination for concurrent API access.

Shared State Manager Architecture

How Concurrent Analyses Share Resources

Features

Feature Benefit
Rate Limit Tracking Parses retry-after delays from API errors
Delay Accumulation Adds delays when multiple rate limits hit
Success Decay Reduces delays by 10% on successful calls
Per-Model Tracking Separate delay tracking for each model
Async-Safe Uses asyncio.Lock for thread-safe access

When to Use

Scenario Recommendation
Single analysis Not needed (created automatically)
Multiple sequential analyses Optional (helps with rate limits)
Multiple concurrent analyses Required for coordination
Multi-agent systems Required for efficiency
⚠️

Warning

Without shared state, concurrent analyses may hit rate limits repeatedly, wasting time and potentially failing.

Ensemble Voting Pattern

The most common use case for concurrent analysis is finding consensus:

%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#1a1a2e', 'primaryTextColor': '#fff', 'primaryBorderColor': '#4a4a6a', 'lineColor': '#6c63ff', 'fontFamily': 'JetBrains Mono, monospace'}}}%%
flowchart TD
    subgraph INPUT["Input"]
        Q[/"Query"/]
        SC["Shared Client"]
        SS["Shared State"]
    end

    subgraph PARALLEL["Parallel Analyses"]
        A1["Analysis 1
temp: 0.0"] A2["Analysis 2
temp: 0.1"] A3["Analysis 3
temp: 0.2"] A4["Analysis 4
temp: 0.3"] A5["Analysis 5
temp: 0.4"] end subgraph AGGREGATE["Aggregation"] V["Vote Counter"] C{"Consensus
Check"} end subgraph OUTPUT["Output"] R1["High Confidence
4-5 agree"] R2["Medium Confidence
3 agree"] R3["Low Confidence
split vote"] end Q --> A1 & A2 & A3 & A4 & A5 SC -.-> A1 & A2 & A3 & A4 & A5 SS -.-> A1 & A2 & A3 & A4 & A5 A1 & A2 & A3 & A4 & A5 --> V V --> C C -->|"Strong agreement"| R1 C -->|"Majority"| R2 C -->|"Divergent"| R3 style Q fill:#4a4a6a,stroke:#6c63ff,stroke-width:2px,color:#fff style SC fill:#0f3460,stroke:#6c63ff,stroke-dasharray: 5 5,color:#fff style SS fill:#0f3460,stroke:#6c63ff,stroke-dasharray: 5 5,color:#fff style C fill:#0f3460,stroke:#e94560,stroke-width:2px,color:#fff style R1 fill:#0d7377,stroke:#14ffec,stroke-width:2px,color:#fff style R2 fill:#4a4a6a,stroke:#6c63ff,color:#fff style R3 fill:#6b2737,stroke:#e94560,color:#fff

Ensemble Voting Pattern

Python
async def find_consensus(query: str, runs: int = 3):
    """Run multiple analyses and find the most common answer."""
    results = await run_ensemble_analysis(query, runs)
    
    # For focused answers (yes/no, true/false, etc.)
    focused_answers = [r.focused_answer for r in results if r.focused_answer]
    
    if focused_answers:
        from collections import Counter
        answer_counts = Counter(focused_answers)
        consensus = answer_counts.most_common(1)[0]
        print(f"Consensus: {consensus[0]} ({consensus[1]}/{runs} agreement)")
        return consensus[0]
    
    return results[0].final_answer  # Fall back to first result

Example with Yes/No Questions

Python
async def confident_yes_no(query: str, context: str = ""):
    """Get a high-confidence yes/no answer through ensemble voting."""
    
    shared_client = AsyncOpenAI(
        api_key="your-groq-api-key",
        base_url="https://api.groq.com/openai/v1",
        timeout=120,
        http_client=DefaultAioHttpClient(),
    )
    shared_state = GroqAPIOpenAIClientSharedState()
    
    async def single_run(i):
        agent = await create_agent(
            agent_id=f"vote{i:02d}",
            api_client=shared_client,
            shared_api_state=shared_state,
            params={
                "mode": "balanced",
                "focused_answer_type": "yes/no",
                "temperature_offset": 0.05 * i,
            }
        )
        try:
            return await agent.analyze(query, context)
        finally:
            await agent.cleanup()
    
    # Run 5 analyses
    tasks = [single_run(i) for i in range(5)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Count votes
    votes = [r.focused_answer_display.answer_value 
             for r in results 
             if not isinstance(r, Exception) and r.success]
    
    from collections import Counter
    vote_counts = Counter(votes)
    
    if vote_counts:
        winner, count = vote_counts.most_common(1)[0]
        confidence = count / len(votes)
        return {
            "answer": winner,
            "confidence": confidence,
            "votes": dict(vote_counts),
            "total_runs": len(votes),
        }
    
    return {"answer": None, "confidence": 0, "error": "All analyses failed"}

# Usage
result = await confident_yes_no(
    "Should we proceed with the merger based on the financial analysis?",
    context="[financial data here]"
)
print(f"Answer: {result['answer']} (confidence: {result['confidence']:.0%})")
print(f"Vote breakdown: {result['votes']}")

Interpreting Convergence and Divergence

When running multiple analyses, the pattern of results tells you about confidence:

Convergence and Divergence Guide

Guide for Interpreting Ensemble Analysis Results

Result Pattern Interpretation Action
All agree High confidence in answer Proceed with confidence
Most agree (e.g., 4/5) Strong signal, possible edge case Review the outlier for insights
Split (e.g., 3/2) Ambiguous or difficult question Consider reformulating query
All different Question may be too vague Add context or use larger model

What Divergence Tells You

When analyses diverge significantly:

  1. The question may be ambiguous — Add more context or be more specific
  2. The problem is genuinely difficult — Consider using GPT-OSS 120B
  3. There's legitimate uncertainty — The answer may depend on assumptions
  4. The model lacks relevant knowledge — Consider if the question is outside training data
💡

Tip

Divergent results aren't failures—they're information. A split vote on "Will this investment succeed?" tells you the outcome is genuinely uncertain, which is valuable to know.

Varying Parameters Across Runs

Different parameter variations can provide different perspectives:

Temperature Variation

Python
# Each run uses slightly higher temperature
for i in range(5):
    params = {
        "mode": "balanced",
        "temperature_offset": 0.1 * i,  # 0.0, 0.1, 0.2, 0.3, 0.4
        "focused_answer_type": "yes/no",
    }

Lower temperatures give more consistent, focused answers. Higher temperatures explore more possibilities.

Model Variation

Python
# Compare models
models = ["openai/gpt-oss-20b", "openai/gpt-oss-120b"]
for model in models:
    params = {"model": model, "mode": "thorough"}

If both models agree, confidence is high. If they disagree, the larger model's answer is usually more reliable.

Mode Variation

Python
# Different analysis depths
modes = ["quick", "balanced", "thorough"]
for mode in modes:
    params = {"mode": mode}

If quick and thorough analyses agree, the answer is probably straightforward. Disagreement suggests the thorough analysis found nuances.

Batch Processing

For processing many different queries efficiently:

Python
async def batch_analyze(queries: list[str], groq_api_key: str):
    """Process multiple different queries concurrently."""
    
    shared_client = AsyncOpenAI(
        api_key=groq_api_key,
        base_url="https://api.groq.com/openai/v1",
        timeout=120,
        http_client=DefaultAioHttpClient(),
    )
    shared_state = GroqAPIOpenAIClientSharedState()
    
    async def analyze_one(i, query):
        agent = await create_agent(
            agent_id=f"batch{i:03d}",
            api_client=shared_client,
            shared_api_state=shared_state,
            params={"mode": "balanced"}
        )
        try:
            result = await agent.analyze(query)
            return {"query": query, "answer": result.final_answer, "success": True}
        except Exception as e:
            return {"query": query, "error": str(e), "success": False}
        finally:
            await agent.cleanup()
    
    tasks = [analyze_one(i, q) for i, q in enumerate(queries)]
    return await asyncio.gather(*tasks)

# Usage
queries = [
    "What are the benefits of remote work?",
    "How does inflation affect housing prices?",
    "What factors drive employee retention?",
]
results = await batch_analyze(queries, "your-api-key")

Best Practices

Do

  • Create shared client and state once at the start
  • Use unique agent_ids for each concurrent analysis
  • Handle exceptions from asyncio.gather with return_exceptions=True
  • Clean up agents in finally blocks
  • Vary temperature slightly across ensemble runs

Don't

  • Don't create new clients for each analysis — wastes connections
  • Don't skip shared state for concurrent runs — causes rate limit issues
  • Don't ignore failed analyses — check why they failed
  • Don't use identical parameters for ensemble voting — you want variation

Rate Limit Handling

The shared state manager handles rate limits automatically, but for heavy usage:

Python
# Add small delays between starting analyses
async def staggered_ensemble(query: str, num_runs: int = 5, delay: float = 0.5):
    tasks = []
    for i in range(num_runs):
        tasks.append(asyncio.create_task(single_analysis(i)))
        await asyncio.sleep(delay)  # Stagger start times
    
    return await asyncio.gather(*tasks, return_exceptions=True)

Cost Considerations

Concurrent analyses multiply costs:

Runs Approximate Cost Multiplier
1 1x (baseline)
3 ~3x
5 ~5x

Mitigation strategies:

  1. Use quick mode for ensemble voting on simple questions
  2. Use gpt-oss-20b for initial ensemble, upgrade to 120b only if results diverge
  3. Start with 3 runs; add more only if results are split
  4. Use candidate_plans=1 for ensemble runs to reduce per-run cost