Skip to content

Workflow Engine

Use Workflows to chain multiple agents together sequentially.

antilogix.workflows.engine.Workflow

Chains multiple agents or functions together. Output of Step 1 -> Input of Step 2.

Source code in antilogix\workflows\engine.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class Workflow:
    """
    Chains multiple agents or functions together.
    Output of Step 1 -> Input of Step 2.
    """
    def __init__(self, name: str):
        self.name = name
        self.steps: List[Union[BaseAgent, Callable]] = []

    def add_step(self, step: Union[BaseAgent, Callable]):
        """
        Add an Agent or a Function to the pipeline.
        """
        self.steps.append(step)
        return self # Allow chaining: wf.add_step(a).add_step(b)

    async def run(self, initial_input: str) -> str:
        """
        Executes the pipeline sequentially.
        """
        print(f"🔄 Starting Workflow: {self.name}")
        current_data = initial_input

        for i, step in enumerate(self.steps):
            print(f"   ➡ Step {i+1} starting...")

            # CASE A: It's an Agent
            if isinstance(step, BaseAgent):
                # Agents use the 'think' method
                current_data = await step.think(current_data)

            # CASE B: It's a standard Function
            elif callable(step):
                # Check if it's async or sync
                if hasattr(step, '__call__') and  step.__code__.co_flags & 0x80: # Check for coroutine
                     current_data = await step(current_data)
                else:
                    current_data = step(current_data)

            print(f"   ✅ Step {i+1} complete.")

        print(f"🏁 Workflow {self.name} finished.")
        return current_data

add_step(step)

Add an Agent or a Function to the pipeline.

Source code in antilogix\workflows\engine.py
13
14
15
16
17
18
def add_step(self, step: Union[BaseAgent, Callable]):
    """
    Add an Agent or a Function to the pipeline.
    """
    self.steps.append(step)
    return self # Allow chaining: wf.add_step(a).add_step(b)

run(initial_input) async

Executes the pipeline sequentially.

Source code in antilogix\workflows\engine.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
async def run(self, initial_input: str) -> str:
    """
    Executes the pipeline sequentially.
    """
    print(f"🔄 Starting Workflow: {self.name}")
    current_data = initial_input

    for i, step in enumerate(self.steps):
        print(f"   ➡ Step {i+1} starting...")

        # CASE A: It's an Agent
        if isinstance(step, BaseAgent):
            # Agents use the 'think' method
            current_data = await step.think(current_data)

        # CASE B: It's a standard Function
        elif callable(step):
            # Check if it's async or sync
            if hasattr(step, '__call__') and  step.__code__.co_flags & 0x80: # Check for coroutine
                 current_data = await step(current_data)
            else:
                current_data = step(current_data)

        print(f"   ✅ Step {i+1} complete.")

    print(f"🏁 Workflow {self.name} finished.")
    return current_data