Table of Contents

Introduction

DotNetPipe is a fluent pipeline builder with minimal overhead and a flexible mutation API. It targets two main use cases:

  • Library/framework authors define default pipelines with linear and conditional steps
  • End users (library consumers) adjust behavior by attaching mutators to named steps without changing the original code

This page describes step types, pipeline kinds, and provides usage + mutator examples for each.

Step types

  • Handler: terminal step that handles the current input
  • Linear: transforms input and forwards it to the next step
  • If: routes to a true sub-pipeline or continues the main flow
  • IfElse: routes to a true or false sub-pipeline
  • Switch: selects a case sub-pipeline by name (with default)
  • Fork: splits into two branches, each with its own sub-pipeline
  • MultiFork: splits into N named branches with a default

Below each step we show a minimal Universal (ValueTask) snippet and a matching mutator.

Handler

using K1vs.DotNetPipe;
using K1vs.DotNetPipe.Universal;
using K1vs.DotNetPipe.Mutations;

var pipeline = Pipelines.CreatePipeline<int>("P")
    .StartWithHandler("H", async v => { Console.WriteLine(v); await ValueTask.CompletedTask; })
    .BuildPipeline()
    .Compile();

await pipeline(1);
// Mutator: add 1 before handler
cfg.Configure(space =>
{
    var step = space.GetRequiredHandlerStep<int, int>("P", "H");
    step.Mutators.AddMutator(
        new StepMutator<Handler<int>>("H+1", 1, handler => async input => await handler(input + 1)),
        AddingMode.ExactPlace);
});

Linear

var pipeline = Pipelines.CreatePipeline<int>("P")
    .StartWithLinear<int>("L", async (input, next) => await next(input + 10))
    .HandleWith("H", async v => await ValueTask.CompletedTask)
    .BuildPipeline()
    .Compile();
// Mutator: multiply input by 2 before original linear step
cfg.Configure(space =>
{
    var step = space.GetRequiredLinearStep<int, int, int>("P", "L");
    step.Mutators.AddMutator(
        new StepMutator<Pipe<int, int>>("L*2", 1, pipe => async (i, n) => await pipe(i * 2, n)),
        AddingMode.ExactPlace);
});

If

var pipeline = Pipelines.CreatePipeline<string>("P")
    .StartWithLinear<string>("Trim", async (s, next) => await next(s.Trim()))
    .ThenIf<string, int>("CheckInt", async (s, ifNext, next) =>
    {
        if (int.TryParse(s, out var iv)) await next(iv);
        else await ifNext(s);
    }, space => space.CreatePipeline<string>("FloatPath")
        .StartWithLinear<double>("ParseFloat", async (s, n) => { if (double.TryParse(s, out var f)) await n(f); })
        .ThenLinear<int>("Round", async (d, n) => await n((int)Math.Round(d)))
        .BuildOpenPipeline())
    .HandleWith("H", async v => await ValueTask.CompletedTask)
    .BuildPipeline()
    .Compile();
// Mutator: always prefer FloatPath
cfg.Configure(space =>
{
    var step = space.GetRequiredIfStep<string, string, string, int>("P", "CheckInt");
    step.Mutators.AddMutator(
        new StepMutator<IfSelector<string, string, int>>("AlwaysFloat", 1, sel => async (s, ifNext, next) => await ifNext(s)),
        AddingMode.ExactPlace);
});

IfElse

var pipeline = Pipelines.CreatePipeline<string>("P")
    .StartWithLinear<string>("Trim", async (s, n) => await n(s.Trim()))
    .ThenIfElse<string, int, int>("CheckIntOrFloat", async (s, tNext, fNext) =>
    {
        if (int.TryParse(s, out var iv)) await fNext(iv); else await tNext(s);
    },
    space => space.CreatePipeline<string>("FloatPath").StartWithLinear<double>("ParseFloat", async (s, n) => { if (double.TryParse(s, out var f)) await n(f); }).ThenLinear<int>("Round", async (d, n) => await n((int)Math.Round(d))).BuildOpenPipeline(),
    space => space.CreatePipeline<int>("IntPath").StartWithLinear<int>("Mul2", async (i, n) => await n(i * 2)).BuildOpenPipeline())
    .HandleWith("H", async v => await ValueTask.CompletedTask)
    .BuildPipeline()
    .Compile();
// Mutator: swap branches
cfg.Configure(space =>
{
    var step = space.GetRequiredIfElseStep<string, string, string, int, int>("P", "CheckIntOrFloat");
    step.Mutators.AddMutator(
        new StepMutator<IfElseSelector<string, string, int>>("Swap", 1, sel => async (s, t, f) => await sel(s, f, t)),
        AddingMode.ExactPlace);
});

Switch

var space = Pipelines.CreateSpace();
var def = space.CreatePipeline<int>("Default").StartWithLinear<int>("Id", async (i, n) => await n(i)).BuildOpenPipeline();
var pipeline = space.CreatePipeline<string>("P")
    .StartWithLinear<string>("Trim", async (s, n) => await n(s.Trim()))
    .ThenSwitch<int, int, int>("Range", async (s, cases, d) =>
    {
        if (int.TryParse(s, out var n))
        {
            if (n > 100) await cases["Gt100"](n);
            else if (n > 0) await cases["Between"](n);
            else await cases["LeZero"](n);
        }
        else await d(s.Length);
    },
    sp => new Dictionary<string, OpenPipeline<int, int>>
    {
        ["Gt100"] = sp.CreatePipeline<int>("Mul3").StartWithLinear<int>("Mul", async (i, n) => await n(i * 3)).BuildOpenPipeline(),
        ["Between"] = sp.CreatePipeline<int>("Add2").StartWithLinear<int>("Add", async (i, n) => await n(i + 2)).BuildOpenPipeline(),
        ["LeZero"] = sp.CreatePipeline<int>("Mul2").StartWithLinear<int>("Mul", async (i, n) => await n(i * 2)).BuildOpenPipeline(),
    }.AsReadOnly(),
    def)
    .HandleWith("H", async v => await ValueTask.CompletedTask)
    .BuildPipeline()
    .Compile();
// Mutator: change threshold to > 50
cfg.Configure(space =>
{
    var step = space.GetRequiredSwitchStep<string, string, int, int, int>("P", "Range");
    step.Mutators.AddMutator(
        new StepMutator<SwitchSelector<string, int, int>>(">50", 1, sel => async (s, cases, d) =>
        {
            if (int.TryParse(s, out var n)) { if (n > 50) await cases["Gt100"](n); else if (n > 0) await cases["Between"](n); else await cases["LeZero"](n); }
            else await d(s.Length);
        }),
        AddingMode.ExactPlace);
});

Fork

var pipeline = Pipelines.CreatePipeline<string>("P")
    .StartWithLinear<string>("Trim", async (s, n) => await n(s.Trim()))
    .ThenFork<string, string>("DigitsOrOther", async (s, a, b) => { if (s.All(char.IsDigit)) await a(s); else await b(s); },
        sp => sp.CreatePipeline<string>("Digits").StartWithHandler("IntH", async s => await ValueTask.CompletedTask).BuildPipeline(),
        sp => sp.CreatePipeline<string>("Other").StartWithHandler("StrH", async s => await ValueTask.CompletedTask).BuildPipeline())
    .BuildPipeline()
    .Compile();
// Mutator: send strings with length > 3 to digits branch
cfg.Configure(space =>
{
    var step = space.GetRequiredForkStep<string, string, string, string>("P", "DigitsOrOther");
    step.Mutators.AddMutator(
        new StepMutator<ForkSelector<string, string, string>>("Len>3", 1, sel => async (s, a, b) => { if (s.Length > 3) await a(s); else await b(s); }),
        AddingMode.ExactPlace);
});

MultiFork

var space = Pipelines.CreateSpace();
space.CreatePipeline<string>("Digits").StartWithHandler("IntH", async s => await ValueTask.CompletedTask).BuildPipeline();
space.CreatePipeline<string>("Letters").StartWithHandler("StrH", async s => await ValueTask.CompletedTask).BuildPipeline();
space.CreatePipeline<string>("Special").StartWithHandler("CharH", async s => await ValueTask.CompletedTask).BuildPipeline();
var def = space.CreatePipeline<char[]>("Default").StartWithHandler("DefH", async arr => await ValueTask.CompletedTask).BuildPipeline();

var pipeline = space.CreatePipeline<string>("P")
    .StartWithLinear<string>("Trim", async (s, n) => await n(s.Trim()))
    .ThenMultiFork<string, char[]>("Classify", async (s, branches, d) =>
    {
        if (s.All(char.IsDigit)) await branches["Digits"](s);
        else if (s.All(char.IsLetter)) await branches["Letters"](s);
        else if (s.All(c => !char.IsLetterOrDigit(c) && !char.IsWhiteSpace(c))) await branches["Special"](s);
        else await d(s.ToCharArray());
    },
    sp => new Dictionary<string, Pipeline<string>>
    {
        ["Digits"] = sp.GetPipeline<string>("Digits")!,
        ["Letters"] = sp.GetPipeline<string>("Letters")!,
        ["Special"] = sp.GetPipeline<string>("Special")!
    }.AsReadOnly(),
    sp => sp.GetPipeline<char[]>("Default")!)
    .BuildPipeline()
    .Compile();
// Mutator: treat everything that is not digits-only as Special
cfg.Configure(space =>
{
    var step = space.GetRequiredMultiForkStep<string, string, string, char[]>("P", "Classify");
    step.Mutators.AddMutator(
        new StepMutator<MultiForkSelector<string, string, char[]>>("PreferSpecial", 1, sel => async (s, branches, d) =>
        {
            if (s.All(char.IsDigit)) await branches["Digits"](s); else await branches["Special"](s);
        }),
        AddingMode.ExactPlace);
});

Pipeline kinds

  • Universal: ValueTask-based (default in examples above)
  • Async: Task-based
  • Sync: synchronous (void)

Each has cancellable variants (accepting CancellationToken) and returning variants (pipelines that return a result instead of void/ValueTask).

Returning example (Universal)

using K1vs.DotNetPipe.Returning;

var space = Pipelines.CreateReturningSpace();
var compiled = space.CreatePipeline<int, int>("P")
    .StartWithLinear<int, int>("AddConst", async (v, next) => await next(v + 10))
    .HandleWith("H", async v => await Task.FromResult(v))
    .BuildPipeline().Compile();

var result = await compiled(5); // 15
// Mutator: add +5 before linear step
cfg.Configure(s =>
{
    var step = s.GetRequiredLinearStep<int, int, int>("P", "AddConst");
    step.Mutators.AddMutator(new StepMutator<K1vs.DotNetPipe.Returning.Pipe<int, int>>("Add+5", 1, pipe => async (i, n) => await pipe(i + 5, n)), AddingMode.ExactPlace);
});

Async and Sync variants

The same patterns apply to Async (K1vs.DotNetPipe.Async) and Sync (K1vs.DotNetPipe.Sync) namespaces. Replace delegate return types accordingly (Task or void), and use cancellable variants when you need a CancellationToken.