Skip to content

GraphWorkflow

A powerful workflow orchestration system that creates directed graphs of agents for complex multi-agent collaboration and task execution.

Overview

The GraphWorkflow class is a sophisticated workflow management system that enables the creation and execution of complex multi-agent workflows. It represents workflows as directed graphs where nodes are agents and edges represent data flow and dependencies between agents. The system supports parallel execution, automatic compilation optimization, and comprehensive visualization capabilities.

Key features:

Feature Description
Agent-based nodes Each node represents an agent that can process tasks
Directed graph structure Edges define the flow of data between agents
Dual backend support Choose between NetworkX (compatibility) or Rustworkx (performance) backends
Parallel execution Multiple agents can run simultaneously within layers
Automatic compilation Optimizes workflow structure for efficient execution
Rich visualization Generate visual representations using Graphviz
Serialization Save and load workflows as JSON
Pattern detection Automatically identifies parallel processing patterns

Architecture

graph TB
    subgraph "GraphWorkflow Architecture"
        A[GraphWorkflow] --> B[Node Collection]
        A --> C[Edge Collection]
        A --> D[Graph Backend]
        A --> E[Execution Engine]

        B --> F[Agent Nodes]
        C --> G[Directed Edges]
        D --> H[NetworkX Backend]
        D --> I[Rustworkx Backend]
        D --> J[Topological Sort]
        E --> K[Parallel Execution]
        E --> L[Layer Processing]

        subgraph "Node Types"
            F --> M[Agent Node]
            M --> N[Agent Instance]
            M --> O[Node Metadata]
        end

        subgraph "Edge Types"
            G --> P[Simple Edge]
            G --> Q[Fan-out Edge]
            G --> R[Fan-in Edge]
            G --> S[Parallel Chain]
        end

        subgraph "Execution Patterns"
            K --> T[Thread Pool]
            K --> U[Concurrent Futures]
            L --> V[Layer-by-layer]
            L --> W[Dependency Resolution]
        end
    end

Class Reference

Parameter Type Description Default
id Optional[str] Unique identifier for the workflow Auto-generated UUID
name Optional[str] Human-readable name for the workflow "Graph-Workflow-01"
description Optional[str] Detailed description of the workflow Generic description
nodes Optional[Dict[str, Node]] Initial collection of nodes {}
edges Optional[List[Edge]] Initial collection of edges []
entry_points Optional[List[str]] Node IDs that serve as starting points []
end_points Optional[List[str]] Node IDs that serve as ending points []
max_loops int Maximum number of execution loops 1
task Optional[str] The task to be executed by the workflow None
auto_compile bool Whether to automatically compile the workflow True
verbose bool Whether to enable detailed logging False
backend str Graph backend to use ("networkx" or "rustworkx") "networkx"

Graph Backends

GraphWorkflow supports two graph backend implementations, each with different performance characteristics:

NetworkX Backend (Default)

The NetworkX backend is the default and most widely compatible option. It provides:

Feature Description
✅ Full compatibility Works out of the box with no additional dependencies
✅ Mature ecosystem Well-tested and stable
✅ Rich features Comprehensive graph algorithms and operations
✅ Python-native Pure Python implementation

Use NetworkX when:

  • You need maximum compatibility

  • Working with small to medium-sized graphs (< 1000 nodes)

  • You want zero additional dependencies

Rustworkx Backend (High Performance)

The Rustworkx backend provides significant performance improvements for large graphs:

Feature Description
⚡ High performance Rust-based implementation for faster operations
⚡ Memory efficient Optimized for large-scale graphs
⚡ Scalable Better performance with graphs containing 1000+ nodes
⚡ Same API Drop-in replacement with identical interface

Use Rustworkx when:

  • Working with large graphs (1000+ nodes)

  • Performance is critical

  • You can install additional dependencies

Installation:

pip install rustworkx

Note: If rustworkx is not installed and you specify backend="rustworkx", GraphWorkflow will automatically fall back to NetworkX with a warning.

Backend Selection

Both backends implement the same GraphBackend interface, ensuring complete API compatibility. You can switch between backends without changing your code:

# Use NetworkX (default)
workflow = GraphWorkflow(backend="networkx")

# Use Rustworkx for better performance
workflow = GraphWorkflow(backend="rustworkx")

The backend choice is transparent to the rest of the API - all methods work identically regardless of which backend is used.

Core Methods

add_node(agent: Agent, **kwargs)

Adds an agent node to the workflow graph.

Parameter Type Description
agent Agent The agent to add as a node
**kwargs Any Additional keyword arguments for the node

Raises:

  • ValueError: If a node with the same ID already exists

Example:

workflow = GraphWorkflow()
agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
workflow.add_node(agent, metadata={"priority": "high"})

add_nodes(agents: List[Agent], batch_size: int = 10, **kwargs)

Adds multiple agents to the workflow graph concurrently in batches for improved performance.

Parameter Type Description
agents List[Agent] List of agents to add to the workflow
batch_size int Number of agents to add concurrently in a batch (default: 10)
**kwargs Any Additional keyword arguments for each node

Features:

  • Concurrent batch processing for faster agent addition
  • Automatic thread pool management
  • Maintains graph consistency across batches
  • Particularly beneficial for large-scale workflows with many agents

Example:

from swarms import Agent, GraphWorkflow

# Create multiple agents
agents = [
    Agent(agent_name="ResearchAgent", model_name="gpt-4o-mini"),
    Agent(agent_name="AnalysisAgent", model_name="gpt-4o-mini"),
    Agent(agent_name="SynthesisAgent", model_name="gpt-4o-mini"),
]

# Add all agents at once with batch processing
workflow = GraphWorkflow(backend="rustworkx")
workflow.add_nodes(agents, batch_size=10)

add_edge(edge_or_source, target=None, **kwargs)

Adds an edge to connect nodes in the workflow.

Parameter Type Description
edge_or_source Edge or str Either an Edge object or source node ID
target str Target node ID (required if edge_or_source is not an Edge)
**kwargs Any Additional keyword arguments for the edge

Raises:

  • ValueError: If source or target nodes don't exist

Example:

# Using Edge object
edge = Edge(source="agent1", target="agent2")
workflow.add_edge(edge)

# Using node IDs
workflow.add_edge("agent1", "agent2", metadata={"priority": "high"})

add_edges_from_source(source, targets, **kwargs)

Creates a fan-out pattern where one source connects to multiple targets.

Parameter Type Description
source str Source node ID
targets List[str] List of target node IDs
**kwargs Any Additional keyword arguments for all edges

Returns:

  • List[Edge]: List of created Edge objects

Example:

workflow.add_edges_from_source(
    "DataCollector",
    ["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"]
)

add_edges_to_target(sources, target, **kwargs)

Creates a fan-in pattern where multiple sources connect to one target.

Parameter Type Description
sources List[str] List of source node IDs
target str Target node ID
**kwargs Any Additional keyword arguments for all edges

Returns:

  • List[Edge]: List of created Edge objects

Example:

workflow.add_edges_to_target(
    ["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"],
    "SynthesisAgent"
)

add_parallel_chain(sources, targets, **kwargs)

Creates a full mesh connection between multiple sources and targets.

Parameter Type Description
sources List[str] List of source node IDs
targets List[str] List of target node IDs
**kwargs Any Additional keyword arguments for all edges

Returns:

  • List[Edge]: List of created Edge objects

Example:

workflow.add_parallel_chain(
    ["DataCollector1", "DataCollector2"],
    ["Analyst1", "Analyst2", "Analyst3"]
)

Execution Methods

run(task: str = None, img: Optional[str] = None, *args, **kwargs) -> Dict[str, Any]

Executes the workflow with optimized parallel agent execution.

Parameter Type Description
task str Task to execute (uses self.task if not provided)
img Optional[str] Image path for vision-enabled agents
*args Any Additional positional arguments
**kwargs Any Additional keyword arguments

Returns:

  • Dict[str, Any]: Execution results from all nodes

Example:

results = workflow.run(
    task="Analyze market trends for cryptocurrency",
    max_loops=2
)

arun(task: str = None, *args, **kwargs) -> Dict[str, Any]

Async version of run for better performance with I/O bound operations.

Parameter Type Description
task str Task to execute
*args Any Additional positional arguments
**kwargs Any Additional keyword arguments

Returns:

  • Dict[str, Any]: Execution results from all nodes

Example:

import asyncio
results = await workflow.arun("Process large dataset")

Compilation and Optimization

compile()

Pre-computes expensive operations for faster execution.

Example:

workflow.compile()
status = workflow.get_compilation_status()
print(f"Compiled: {status['is_compiled']}")

get_compilation_status() -> Dict[str, Any]

Returns detailed compilation status information.

Returns:

  • Dict[str, Any]: Compilation status including cache state and performance metrics

Example:

status = workflow.get_compilation_status()
print(f"Layers: {status['cached_layers_count']}")
print(f"Max workers: {status['max_workers']}")

Visualization Methods

visualize(format: str = "png", view: bool = True, engine: str = "dot", show_summary: bool = False) -> str

Generates a visual representation of the workflow using Graphviz.

Parameter Type Description Default
format str Output format ('png', 'svg', 'pdf', 'dot') "png"
view bool Whether to open the visualization True
engine str Graphviz layout engine "dot"
show_summary bool Whether to print parallel processing summary False

Returns:

  • str: Path to the generated visualization file

Example:

output_file = workflow.visualize(
    format="svg",
    show_summary=True
)
print(f"Visualization saved to: {output_file}")

visualize_simple() -> str

Generates a simple text-based visualization.

Returns:

  • str: Text representation of the workflow

Example:

text_viz = workflow.visualize_simple()
print(text_viz)

Serialization Methods

to_json(fast: bool = True, include_conversation: bool = False, include_runtime_state: bool = False) -> str

Serializes the workflow to JSON format.

Parameter Type Description Default
fast bool Whether to use fast JSON serialization True
include_conversation bool Whether to include conversation history False
include_runtime_state bool Whether to include runtime state False

Returns:

  • str: JSON representation of the workflow

Example:

json_data = workflow.to_json(
    include_conversation=True,
    include_runtime_state=True
)

from_json(json_str: str, restore_runtime_state: bool = False) -> GraphWorkflow

Deserializes a workflow from JSON format.

Parameter Type Description Default
json_str str JSON string representation Required
restore_runtime_state bool Whether to restore runtime state False

Returns:

  • GraphWorkflow: A new GraphWorkflow instance

Example:

workflow = GraphWorkflow.from_json(json_data, restore_runtime_state=True)

save_to_file(filepath: str, include_conversation: bool = False, include_runtime_state: bool = False, overwrite: bool = False) -> str

Saves the workflow to a JSON file.

Parameter Type Description Default
filepath str Path to save the JSON file Required
include_conversation bool Whether to include conversation history False
include_runtime_state bool Whether to include runtime state False
overwrite bool Whether to overwrite existing files False

Returns:

  • str: Path to the saved file

Example:

filepath = workflow.save_to_file(
    "my_workflow.json",
    include_conversation=True
)

load_from_file(filepath: str, restore_runtime_state: bool = False) -> GraphWorkflow

Loads a workflow from a JSON file.

Parameter Type Description Default
filepath str Path to the JSON file Required
restore_runtime_state bool Whether to restore runtime state False

Returns:

  • GraphWorkflow: Loaded workflow instance

Example:

workflow = GraphWorkflow.load_from_file("my_workflow.json")

Utility Methods

export_summary() -> Dict[str, Any]

Generates a human-readable summary of the workflow.

Returns:

  • Dict[str, Any]: Comprehensive workflow summary

Example:

summary = workflow.export_summary()
print(f"Workflow has {summary['structure']['nodes']} nodes")
print(f"Compilation status: {summary['compilation_status']['is_compiled']}")

set_entry_points(entry_points: List[str])

Sets the entry points for the workflow.

Parameter Type Description
entry_points List[str] List of node IDs to serve as entry points

Example:

workflow.set_entry_points(["DataCollector", "ResearchAgent"])

set_end_points(end_points: List[str])

Sets the end points for the workflow.

Parameter Type Description
end_points List[str] List of node IDs to serve as end points

Example:

workflow.set_end_points(["SynthesisAgent", "ReportGenerator"])

Class Methods

from_spec(agents, edges, entry_points=None, end_points=None, task=None, **kwargs) -> GraphWorkflow

Constructs a workflow from a list of agents and connections.

Parameter Type Description Default
agents List List of agents or Node objects Required
edges List List of edges or edge tuples Required
entry_points List[str] List of entry point node IDs None
end_points List[str] List of end point node IDs None
task str Task to be executed by the workflow None
**kwargs Any Additional keyword arguments (e.g., backend, verbose, auto_compile) {}

Returns:

  • GraphWorkflow: A new GraphWorkflow instance

Example:

# Using NetworkX backend (default)
workflow = GraphWorkflow.from_spec(
    agents=[agent1, agent2, agent3],
    edges=[
        ("agent1", "agent2"),
        ("agent2", "agent3"),
        ("agent1", ["agent2", "agent3"])  # Fan-out
    ],
    task="Analyze market data"
)

# Using Rustworkx backend for better performance
workflow = GraphWorkflow.from_spec(
    agents=[agent1, agent2, agent3],
    edges=[
        ("agent1", "agent2"),
        ("agent2", "agent3"),
    ],
    task="Analyze market data",
    backend="rustworkx"  # Specify backend via kwargs
)

Examples

Using Rustworkx Backend for Performance

from swarms import Agent, GraphWorkflow

# Create agents
research_agent = Agent(
    agent_name="ResearchAgent",
    model_name="gpt-4o-mini",
    max_loops=1
)

analysis_agent = Agent(
    agent_name="AnalysisAgent",
    model_name="gpt-4o-mini",
    max_loops=1
)

synthesis_agent = Agent(
    agent_name="SynthesisAgent",
    model_name="gpt-4o-mini",
    max_loops=1
)

# Build workflow with rustworkx backend for 5-10x better performance
workflow = GraphWorkflow(
    name="High-Performance-Workflow",
    backend="rustworkx",  # Use rustworkx for 5-10x speedup on large graphs
    verbose=True
)

# Add all agents at once using batch processing (faster than individual add_node calls)
workflow.add_nodes([research_agent, analysis_agent, synthesis_agent])

# Define the workflow edges
workflow.add_edge("ResearchAgent", "AnalysisAgent")
workflow.add_edge("AnalysisAgent", "SynthesisAgent")

# Execute - backend is transparent to the API
results = workflow.run("What are the latest trends in renewable energy technology?")
print(results)

Performance Benefits: - 5-10x faster graph operations compared to NetworkX on large workflows - Batch agent addition with add_nodes() for efficient graph construction - Rust-based implementation for lower memory usage - Same API as NetworkX - just change the backend parameter

Note: Make sure to install rustworkx first: pip install rustworkx

Basic Sequential Workflow

from swarms import Agent, GraphWorkflow
from swarms.prompts.multi_agent_collab_prompt import MULTI_AGENT_COLLAB_PROMPT_TWO

# Create agents
research_agent = Agent(
    agent_name="ResearchAgent",
    model_name="gpt-4",
    system_prompt=MULTI_AGENT_COLLAB_PROMPT_TWO,
    max_loops=1
)

analysis_agent = Agent(
    agent_name="AnalysisAgent", 
    model_name="gpt-4",
    system_prompt=MULTI_AGENT_COLLAB_PROMPT_TWO,
    max_loops=1
)

# Build workflow
workflow = GraphWorkflow(name="Research-Analysis-Workflow")
workflow.add_node(research_agent)
workflow.add_node(analysis_agent)
workflow.add_edge("ResearchAgent", "AnalysisAgent")

# Execute
results = workflow.run("What are the latest trends in AI?")
print(results)

Parallel Processing Workflow

from swarms import Agent, GraphWorkflow

# Create specialized agents
data_collector = Agent(agent_name="DataCollector", model_name="gpt-4")
technical_analyst = Agent(agent_name="TechnicalAnalyst", model_name="gpt-4")
fundamental_analyst = Agent(agent_name="FundamentalAnalyst", model_name="gpt-4")
sentiment_analyst = Agent(agent_name="SentimentAnalyst", model_name="gpt-4")
synthesis_agent = Agent(agent_name="SynthesisAgent", model_name="gpt-4")

# Build parallel workflow
workflow = GraphWorkflow(name="Market-Analysis-Workflow")

# Add all agents using batch processing
workflow.add_nodes([data_collector, technical_analyst, fundamental_analyst,
                   sentiment_analyst, synthesis_agent])

# Create fan-out pattern: data collector feeds all analysts
workflow.add_edges_from_source(
    "DataCollector",
    ["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"]
)

# Create fan-in pattern: all analysts feed synthesis agent
workflow.add_edges_to_target(
    ["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"],
    "SynthesisAgent"
)

# Execute
results = workflow.run("Analyze Bitcoin market trends")
print(results)

Complex Multi-Layer Workflow

from swarms import Agent, GraphWorkflow

# Create agents for different stages
data_collectors = [
    Agent(agent_name=f"DataCollector{i}", model_name="gpt-4")
    for i in range(1, 4)
]

analysts = [
    Agent(agent_name=f"Analyst{i}", model_name="gpt-4")
    for i in range(1, 4)
]

validators = [
    Agent(agent_name=f"Validator{i}", model_name="gpt-4")
    for i in range(1, 3)
]

synthesis_agent = Agent(agent_name="SynthesisAgent", model_name="gpt-4")

# Build complex workflow
workflow = GraphWorkflow(name="Complex-Research-Workflow")

# Add all agents using batch processing
all_agents = data_collectors + analysts + validators + [synthesis_agent]
workflow.add_nodes(all_agents)

# Layer 1: Data collectors feed all analysts in parallel
workflow.add_parallel_chain(
    [agent.agent_name for agent in data_collectors],
    [agent.agent_name for agent in analysts]
)

# Layer 2: Analysts feed validators
workflow.add_parallel_chain(
    [agent.agent_name for agent in analysts],
    [agent.agent_name for agent in validators]
)

# Layer 3: Validators feed synthesis agent
workflow.add_edges_to_target(
    [agent.agent_name for agent in validators],
    "SynthesisAgent"
)

# Visualize and execute
workflow.visualize(show_summary=True)
results = workflow.run("Comprehensive analysis of renewable energy markets")

Workflow with Custom Metadata

from swarms import Agent, GraphWorkflow, Edge

# Create agents with specific roles
research_agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
analysis_agent = Agent(agent_name="AnalysisAgent", model_name="gpt-4")

# Build workflow with metadata
workflow = GraphWorkflow(
    name="Metadata-Workflow",
    description="Workflow demonstrating metadata usage"
)

workflow.add_node(research_agent, metadata={"priority": "high", "timeout": 300})
workflow.add_node(analysis_agent, metadata={"priority": "medium", "timeout": 600})

# Add edge with metadata
edge = Edge(
    source="ResearchAgent",
    target="AnalysisAgent",
    metadata={"data_type": "research_findings", "priority": "high"}
)
workflow.add_edge(edge)

# Execute with custom parameters
results = workflow.run(
    "Analyze the impact of climate change on agriculture",
    max_loops=2
)

Workflow Serialization and Persistence

from swarms import Agent, GraphWorkflow

# Create workflow
research_agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
analysis_agent = Agent(agent_name="AnalysisAgent", model_name="gpt-4")

workflow = GraphWorkflow(name="Persistent-Workflow")
workflow.add_node(research_agent)
workflow.add_node(analysis_agent)
workflow.add_edge("ResearchAgent", "AnalysisAgent")

# Execute and get conversation
results = workflow.run("Research quantum computing applications")

# Save workflow with conversation history
filepath = workflow.save_to_file(
    "quantum_research_workflow.json",
    include_conversation=True,
    include_runtime_state=True
)

# Load workflow later
loaded_workflow = GraphWorkflow.load_from_file(
    filepath,
    restore_runtime_state=True
)

# Continue execution
new_results = loaded_workflow.run("Continue with quantum cryptography analysis")

Large-Scale Workflow with Rustworkx

from swarms import Agent, GraphWorkflow

# Create a large workflow with many agents
# Rustworkx backend provides better performance for large graphs
workflow = GraphWorkflow(
    name="Large-Scale-Workflow",
    backend="rustworkx",  # Use rustworkx for better performance
    verbose=True
)

# Create many agents (e.g., for parallel data processing)
agents = []
for i in range(50):
    agent = Agent(
        agent_name=f"Processor{i}",
        model_name="gpt-4",
        max_loops=1
    )
    agents.append(agent)

# Add all agents at once using batch processing (much faster than individual adds)
workflow.add_nodes(agents, batch_size=10)

# Create complex interconnections
# Rustworkx handles this efficiently
for i in range(0, 50, 10):
    source_agents = [f"Processor{j}" for j in range(i, min(i+10, 50))]
    target_agents = [f"Processor{j}" for j in range(i+10, min(i+20, 50))]
    if target_agents:
        workflow.add_parallel_chain(source_agents, target_agents)

# Compile and execute
workflow.compile()
status = workflow.get_compilation_status()
print(f"Compiled workflow with {status['cached_layers_count']} layers")

results = workflow.run("Process large dataset in parallel")

Advanced Pattern Detection

from swarms import Agent, GraphWorkflow

# Create a complex workflow with multiple patterns
workflow = GraphWorkflow(name="Pattern-Detection-Workflow", verbose=True)

# Create agents
agents = {
    "collector": Agent(agent_name="DataCollector", model_name="gpt-4"),
    "tech_analyst": Agent(agent_name="TechnicalAnalyst", model_name="gpt-4"),
    "fund_analyst": Agent(agent_name="FundamentalAnalyst", model_name="gpt-4"),
    "sentiment_analyst": Agent(agent_name="SentimentAnalyst", model_name="gpt-4"),
    "risk_analyst": Agent(agent_name="RiskAnalyst", model_name="gpt-4"),
    "synthesis": Agent(agent_name="SynthesisAgent", model_name="gpt-4"),
    "validator": Agent(agent_name="Validator", model_name="gpt-4")
}

# Add all agents using batch processing
workflow.add_nodes(list(agents.values()))

# Create complex patterns
# Fan-out from collector
workflow.add_edges_from_source(
    "DataCollector",
    ["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst", "RiskAnalyst"]
)

# Fan-in to synthesis
workflow.add_edges_to_target(
    ["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst", "RiskAnalyst"],
    "SynthesisAgent"
)

# Final validation step
workflow.add_edge("SynthesisAgent", "Validator")

# Compile and get status
workflow.compile()
status = workflow.get_compilation_status()

print(f"Compilation status: {status}")
print(f"Layers: {status['cached_layers_count']}")
print(f"Max workers: {status['max_workers']}")

# Visualize with pattern detection
workflow.visualize(show_summary=True, format="png")

Error Handling and Recovery

from swarms import Agent, GraphWorkflow
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)

# Create workflow with error handling
workflow = GraphWorkflow(
    name="Error-Handling-Workflow",
    verbose=True,
    max_loops=1
)

# Create agents
try:
    research_agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
    analysis_agent = Agent(agent_name="AnalysisAgent", model_name="gpt-4")

    workflow.add_node(research_agent)
    workflow.add_node(analysis_agent)
    workflow.add_edge("ResearchAgent", "AnalysisAgent")

    # Execute with error handling
    try:
        results = workflow.run("Analyze market trends")
        print("Workflow completed successfully")
        print(results)

    except Exception as e:
        print(f"Workflow execution failed: {e}")

        # Get workflow summary for debugging
        summary = workflow.export_summary()
        print(f"Workflow state: {summary['structure']}")

except Exception as e:
    print(f"Workflow setup failed: {e}")

Conclusion

The GraphWorkflow class provides a powerful and flexible framework for orchestrating complex multi-agent workflows. Its key benefits include:

Benefits

Benefit Description
Scalability Supports workflows with hundreds of agents through efficient parallel execution
Flexibility Multiple connection patterns (sequential, fan-out, fan-in, parallel chains)
Performance Automatic compilation and optimization for faster execution; rustworkx backend for large-scale graphs
Backend Choice Choose between NetworkX (compatibility) or Rustworkx (performance) based on your needs
Visualization Rich visual representations for workflow understanding and debugging
Persistence Complete serialization and deserialization capabilities
Error Handling Comprehensive error handling and recovery mechanisms
Monitoring Detailed logging and status reporting

Use Cases

Use Case Description
Research Workflows Multi-stage research with data collection, analysis, and synthesis
Content Generation Parallel content creation with validation and refinement
Data Processing Complex ETL pipelines with multiple processing stages
Decision Making Multi-agent decision systems with voting and consensus
Quality Assurance Multi-stage validation and verification processes
Automated Testing Complex test orchestration with parallel execution

Best Practices

Best Practice Description
Use meaningful agent names Helps with debugging and visualization
Leverage parallel patterns Use fan-out and fan-in for better performance
Choose the right backend Use rustworkx for large graphs (1000+ nodes), networkx for smaller graphs
Compile workflows Always compile before execution for optimal performance
Monitor execution Use verbose mode and status reporting for debugging
Save important workflows Use serialization for workflow persistence
Handle errors gracefully Implement proper error handling and recovery
Visualize complex workflows Use visualization to understand and debug workflows

Backend Performance Considerations

When choosing between NetworkX and Rustworkx backends:

Graph Size Recommended Backend Reason
< 100 nodes NetworkX Minimal overhead, no extra dependencies
100-1000 nodes NetworkX or Rustworkx Both perform well, choose based on dependency preferences
1000+ nodes Rustworkx Significant performance benefits for large graphs
Very large graphs (10k+ nodes) Rustworkx Essential for acceptable performance

Performance Tips: - Rustworkx provides 5-10x speedup for graph operations on large workflows - Topological sorting and dependency resolution are significantly faster with Rustworkx - Both backends support the same features and API with complete compatibility - You can switch backends without code changes - just change the backend parameter - Rustworkx uses less memory for large graphs due to Rust's efficient memory management - Combine Rustworkx backend with add_nodes() batch processing for maximum performance

Performance Characteristics:

Rustworkx delivers significant performance improvements for graph operations, particularly on large workflows:

Operation Small Graphs (< 100 nodes) Large Graphs (1000+ nodes)
Graph construction & modification ~1-2x faster 5-8x faster
Topological sorting ~2-3x faster 6-10x faster
Dependency resolution ~1-2x faster 5-8x faster
Overall workflow execution ~1-2x faster 5-10x faster

Note: Actual performance gains depend on workflow complexity, graph topology, and hardware. Benefits are most pronounced with complex graphs containing 1000+ nodes.


Troubleshooting

Common Issues and Solutions

Rustworkx Installation Issues

Problem: ImportError when using backend="rustworkx"

# Error message
ImportError: rustworkx is not installed. Install it with: pip install rustworkx

Solution:

# Install rustworkx
pip install rustworkx

# If installation fails due to missing Rust compiler
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
pip install rustworkx

Fallback: GraphWorkflow automatically falls back to NetworkX if rustworkx is not available:

workflow = GraphWorkflow(backend="rustworkx")
# Warning: rustworkx is not available, falling back to networkx

Performance Not Improving with Rustworkx

Problem: Not seeing expected performance gains with Rustworkx backend

Diagnosis checklist:

  1. Graph is too small - Rustworkx benefits are minimal for small graphs (< 100 nodes)
  2. Workflow not compiled - Call workflow.compile() before execution
  3. Not using batch operations - Use add_nodes() instead of multiple add_node() calls

Solution:

# ✅ Optimal configuration for performance
workflow = GraphWorkflow(
    backend="rustworkx",
    auto_compile=True,  # Enable auto-compilation
    verbose=True  # See performance metrics
)

# Use batch operations
workflow.add_nodes([agent1, agent2, agent3])  # Better
# Instead of:
# workflow.add_node(agent1)  # Less efficient
# workflow.add_node(agent2)
# workflow.add_node(agent3)

Graph Compilation Failures

Problem: Graph contains cycles and cannot be compiled

# Error message
Exception: Graph contains cycles and cannot be topologically sorted

Solution:

# Validate graph before execution
try:
    workflow.compile()
except Exception as e:
    print(f"Compilation failed: {e}")
    # Check for cycles
    cycles = workflow.graph_backend.simple_cycles()
    if cycles:
        print(f"Found {len(cycles)} cycle(s) in graph:")
        for cycle in cycles:
            print(f"  Cycle: {' -> '.join(cycle)}")

Memory Issues with Large Workflows

Problem: High memory usage with large graphs (1000+ nodes)

Solution:

# Use Rustworkx backend for better memory efficiency
workflow = GraphWorkflow(backend="rustworkx")

# Clear conversation history if not needed
workflow.conversation = Conversation()

# Monitor memory usage
import psutil
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
print(f"Memory usage: {memory_mb:.1f} MB")

Slow Execution Despite Compilation

Problem: Workflow still runs slowly even after compilation

Diagnosis:

# Check compilation status
status = workflow.get_compilation_status()
print(f"Compiled: {status['is_compiled']}")
print(f"Layers: {status['cached_layers_count']}")
print(f"Max workers: {status['max_workers']}")

# If max_workers is low, check CPU cores
from swarms.utils.get_cpu_cores import get_cpu_cores
print(f"Available CPU cores: {get_cpu_cores()}")

Solution:

# Ensure compilation is enabled
workflow.compile()

# For large graphs, use Rustworkx
if len(workflow.nodes) > 100:
    # Rebuild with Rustworkx backend
    new_workflow = GraphWorkflow(
        backend="rustworkx",
        auto_compile=True
    )

Backend Compatibility Issues

Problem: Code works with NetworkX but fails with Rustworkx

Common causes:

  1. Relying on NetworkX-specific methods
  2. Direct access to internal graph structure
  3. Using features not yet supported by Rustworkx backend

Solution:

# ✅ Use GraphWorkflow's abstracted methods (works with both backends)
workflow.add_node(agent)
workflow.add_edge("agent1", "agent2")
workflow.add_nodes([agent1, agent2, agent3])

# ❌ Avoid direct backend access
# workflow.graph_backend.graph.add_node(...)  # Backend-specific

Best Practices for Production

Category Best Practice Description
Performance Optimization Choose the right backend Use Rustworkx for graphs with 100+ nodes
Enable auto-compilation Set auto_compile=True for automatic optimization
Use batch operations Add multiple agents with add_nodes() instead of individual add_node() calls
Compile before execution Call workflow.compile() before running large workflows
Monitor performance Enable verbose=True to track execution metrics
Reliability Validate graphs Check for cycles before execution
Handle agent failures Implement error handling in agent prompts
Test with both backends Ensure compatibility between NetworkX and Rustworkx
Use version control Track workflow definitions with to_json() and save_to_file()
Scalability Start with NetworkX Develop and test with the default backend
Switch to Rustworkx Migrate to Rustworkx when scaling to production
Monitor memory Watch memory usage on large graphs
Optimize batch size Tune batch_size parameter in add_nodes() for your workload