All Toolsโ€บWorkflow Trigger CLI
๐Ÿ”ง AI Connectors for Workflow AutomationMarch 25, 2026โœ… Tests passing

Workflow Trigger CLI

This CLI tool enables developers to trigger and monitor automated workflows in Claude AI using predefined connectors. It provides an interface to initiate workflows, pass inputs, and retrieve results, all while supporting asynchronous execution and retry mechanisms for robust automation.

What It Does

  • Trigger Claude AI workflows via CLI
  • Pass custom input payloads in JSON format
  • Support for asynchronous task execution
  • Retry mechanism for handling failures
  • Real-time logging and status updates

Installation

1. Clone the repository:

git clone <repository-url>
   cd workflow_trigger_cli

2. Install dependencies:

pip install -r requirements.txt

Usage

To trigger a workflow, use the following command:

python workflow_trigger_cli.py --workflow <workflow_name> --input <input_file_path> [--retries <num_retries>] [--delay <retry_delay_seconds>]

Example

python workflow_trigger_cli.py --workflow my_workflow --input input.json --retries 3 --delay 5
  • --workflow: Name of the workflow to trigger.
  • --input: Path to a JSON file containing the input payload.
  • --retries: (Optional) Number of retries on failure. Default is 3.
  • --delay: (Optional) Delay between retries in seconds. Default is 5.

Source Code

import os
import json
import time
import requests
import click
from rich.console import Console
from rich.progress import Progress

console = Console()

API_BASE_URL = "https://api.claude.ai/workflows"

def trigger_workflow(workflow_name, input_payload, retries, delay):
    """Trigger a workflow and optionally retry on failure."""
    url = f"{API_BASE_URL}/{workflow_name}/trigger"
    headers = {"Content-Type": "application/json"}

    for attempt in range(retries + 1):
        try:
            response = requests.post(url, headers=headers, json=input_payload)
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            console.print(f"[red]Attempt {attempt + 1} failed: {e}[/red]")
            if attempt < retries:
                time.sleep(delay)
            else:
                raise

def monitor_workflow(workflow_id):
    """Monitor the status of a triggered workflow."""
    url = f"{API_BASE_URL}/{workflow_id}/status"
    headers = {"Content-Type": "application/json"}

    with Progress() as progress:
        task = progress.add_task("[cyan]Monitoring workflow...", total=None)

        while not progress.finished:
            try:
                response = requests.get(url, headers=headers)
                response.raise_for_status()
                status_data = response.json()
                status = status_data.get("status")

                if status == "completed":
                    progress.update(task, completed=100)
                    progress.stop()
                    return status_data
                elif status == "failed":
                    progress.stop()
                    raise Exception("Workflow execution failed.")

                time.sleep(2)
            except requests.RequestException as e:
                progress.stop()
                raise Exception(f"Error monitoring workflow: {e}")

@click.command()
@click.option('--workflow', required=True, help='Name of the workflow to trigger.')
@click.option('--input', 'input_file', required=True, type=click.Path(exists=True), help='Path to the JSON input file.')
@click.option('--retries', default=3, show_default=True, help='Number of retries on failure.')
@click.option('--delay', default=5, show_default=True, help='Delay between retries in seconds.')
def main(workflow, input_file, retries, delay):
    """CLI tool to trigger and monitor Claude AI workflows."""
    try:
        with open(input_file, 'r') as f:
            input_payload = json.load(f)

        console.print(f"[green]Triggering workflow '{workflow}' with input from {input_file}...[/green]")
        response = trigger_workflow(workflow, input_payload, retries, delay)
        workflow_id = response.get("workflow_id")

        if not workflow_id:
            console.print("[red]Failed to retrieve workflow ID from response.[/red]")
            return

        console.print(f"[green]Workflow triggered successfully. ID: {workflow_id}[/green]")
        console.print("[green]Monitoring workflow execution...[/green]")
        result = monitor_workflow(workflow_id)

        console.print("[green]Workflow completed successfully![/green]")
        console.print(json.dumps(result, indent=2))

    except Exception as e:
        console.print(f"[red]Error: {e}[/red]")

if __name__ == "__main__":
    main()

Community

Downloads

ยทยทยท

Rate this tool

No ratings yet โ€” be the first!

Details

Tool Name
workflow_trigger_cli
Category
AI Connectors for Workflow Automation
Generated
March 25, 2026
Tests
Passing โœ…

Quick Install

Clone just this tool:

git clone --depth 1 --filter=blob:none --sparse \
  https://github.com/ptulin/autoaiforge.git
cd autoaiforge
git sparse-checkout set generated_tools/2026-03-25/workflow_trigger_cli
cd generated_tools/2026-03-25/workflow_trigger_cli
pip install -r requirements.txt 2>/dev/null || true
python workflow_trigger_cli.py
Workflow Trigger CLI โ€” AI Tools by AutoAIForge