Streaming

Dagger tasks have a limited lifetime - they are created, execute, finish, and are eventually destroyed when they're no longer needed. Thus, if one wants to run the same kind of computations over and over, one might re-create a similar set of tasks for each unit of data that needs processing.

This might be fine for computations which take a long time to run (thus dwarfing the cost of task creation, which is quite small), or when working with a limited set of data, but this approach is not great for doing lots of small computations on a large (or endless) amount of data. For example, processing image frames from a webcam, reacting to messages from a message bus, reading samples from a software radio, etc. All of these tasks are better suited to a "streaming" model of data processing, where data is simply piped into a continuously-running task (or DAG of tasks) forever, or until the data runs out.

Thankfully, if you have a problem which is best modeled as a streaming system of tasks, Dagger has you covered! Building on its support for Task Queues, Dagger provides a means to convert an entire DAG of tasks into a streaming DAG, where data flows into and out of each task asynchronously, using the spawn_streaming function:

Dagger.spawn_streaming() do # enters a streaming region
  vals = Dagger.@spawn rand()
  print_vals = Dagger.@spawn println(vals)
end # exits the streaming region, and starts the DAG running

In the above example, vals is a Dagger task which has been transformed to run in a streaming manner - instead of just calling rand() once and returning its result, it will re-run rand() endlessly, continuously producing new random values. In typical Dagger style, print_vals is a Dagger task which depends on vals, but in streaming form - it will continuously println the random values produced from vals. Both tasks will run forever, and will run efficiently, only doing the work necessary to generate, transfer, and consume values.

As the comments point out, spawn_streaming creates a streaming region, during which vals and print_vals are created and configured. Both tasks are halted until spawn_streaming returns, allowing large DAGs to be built all at once, without any task losing a single value. If desired, streaming regions can be connected, although some values might be lost while tasks are being connected:

vals = Dagger.spawn_streaming() do
    Dagger.@spawn rand()
end

# Some values might be generated by `vals` but thrown away
# before `print_vals` is fully setup and connected to it

print_vals = Dagger.spawn_streaming() do
    Dagger.@spawn println(vals)
end

More complicated streaming DAGs can be easily constructed, without doing anything different. For example, we can generate multiple streams of random numbers, write them all to their own files, and print the combined results:

Dagger.spawn_streaming() do
    all_vals = [Dagger.spawn(rand) for i in 1:4]
    all_vals_written = map(1:4) do i
        Dagger.spawn(all_vals[i]) do val
            open("results_$i.txt"; write=true, create=true, append=true) do io
                println(io, repr(val))
            end
            return val
        end
    end
    Dagger.spawn(all_vals_written...) do all_vals_written...
        vals_sum = sum(all_vals_written)
        println(vals_sum)
    end
end

If you want to stop the streaming DAG and tear it all down, you can call Dagger.cancel!(all_vals[1]) (or with any other task in the streaming DAG) to terminate all streaming tasks.

Alternatively, tasks can stop themselves from the inside with finish_stream, optionally returning a value that can be fetch'd. Let's do this when our randomly-drawn number falls within some arbitrary range:

vals = Dagger.spawn_streaming() do
    Dagger.spawn() do
        x = rand()
        if x < 0.001
            # That's good enough, let's be done
            return Dagger.finish_stream("Finished!")
        end
        return x
    end
end
fetch(vals)

In this example, the call to fetch will hang (while random numbers continue to be drawn), until a drawn number is less than 0.001; at that point, fetch will return with "Finished!", and the task vals will have terminated.