Dagger: A framework for out-of-core and parallel execution
Dagger.jl is a framework for parallel computing across all kinds of resources, like CPUs and GPUs, and across multiple threads and multiple servers.
Note: when using Dagger with multiple workers, make sure that the workers are initialized before importing Dagger and doing any distributed operations. This is good:
julia> using Distributed
julia> addprocs(2)
julia> using Dagger
But this will cause errors when using Dagger:
julia> using Distributed, Dagger
julia> addprocs(2)
The reason is because Distributed.jl requires packages to be loaded across all workers in order for the workers to use objects from the needed packages, and using Dagger
will load Dagger on all existing workers. If you're executing custom functions then you will also need to define those on all workers with @everywhere
, see the Distributed.jl documentation for more information.
Quickstart: Task Spawning
For more details: Task Spawning
Launch a task
If you want to call a function myfunc
with arguments arg1
, arg2
, arg3
, and keyword argument color=:red
:
function myfunc(arg1, arg2, arg3; color=:blue)
arg_total = arg1 + arg2 * arg3
printstyled(arg_total; color)
return arg_total
end
t = Dagger.@spawn myfunc(arg1, arg2, arg3; color=:red)
This will run the function asynchronously; you can fetch its result with fetch(t)
, or just wait on it to complete with wait(t)
. If the call to myfunc
throws an error, fetch(t)
will rethrow it.
If running Dagger with multiple workers, make sure to define myfunc
with @everywhere
from the Distributed
stdlib.
Launch a task with an anonymous function
It's more convenient to use Dagger.spawn
for anonymous functions. Taking the previous example, but using an anonymous function instead of myfunc
:
Dagger.spawn((arg1, arg2, arg3; color=:blue) -> begin
arg_total = arg1 + arg2 * arg3
printstyled(arg_total; color)
return arg_total
end, arg1, arg2, arg3; color=:red)
spawn
is functionally identical to @spawn
, but can be more or less convenient to use, depending on what you're trying to do.
Launch many tasks and wait on them all to complete
@spawn
participates in @sync
blocks, just like @async
and Threads.@spawn
, and will cause @sync
to wait until all the tasks have completed:
@sync for result in simulation_results
Dagger.@spawn send_result_to_database(result)
end
nresults = length(simulation_results)
wait(Dagger.@spawn update_database_result_count(nresults))
Above, update_database_result_count
will only run once all send_result_to_database
calls have completed.
Note that other APIs (including spawn
) do not participate in @sync
blocks.
Run a task on a specific Distributed worker
Dagger uses Scopes to control where tasks can execute. There's a handy constructor, Dagger.scope
, that makes defining scopes easy:
w2_only = Dagger.scope(worker=2)
Dagger.@spawn scope=w2_only myfunc(arg1, arg2, arg3; color=:red)
Now the launched task will definitely execute on worker 2 (or if it's not possible to run on worker 2, Dagger will throw an error when you try to fetch
the result).
Parallelize nested loops
Nested loops are a very common pattern in Julia, yet it's often difficult to parallelize them efficiently with @threads
or @distributed
/pmap
. Thankfully, this kind of problem is quite easy for Dagger to handle; here is an example of parallelizing a two-level nested loop, where the inner loop computations (g
) depend on an outer loop computation (f
):
@everywhere begin
using Random
Random.seed!(0)
# Some "expensive" functions that complete at different speeds
const crn = abs.(randn(20, 7))
f(i) = sleep(crn[i, 7])
g(i, j, y) = sleep(crn[i, j])
end
function nested_dagger()
@sync for i in 1:20
y = Dagger.@spawn f(i)
for j in 1:6
z = Dagger.@spawn g(i, j, y)
end
end
end
And the equivalent (and less performant) example with Threads.@threads
, either parallelizing the inner or outer loop:
function nested_threads_outer()
Threads.@threads for i in 1:20
y = f(i)
for j in 1:6
z = g(i, j, y)
end
end
end
function nested_threads_inner()
for i in 1:20
y = f(i)
Threads.@threads for j in 1:6
z = g(i, j, y)
end
end
end
Unlike Threads.@threads
(which is really only intended to be used for a single loop, unnested), Dagger.@spawn
is capable of parallelizing across both loop levels seamlessly, using the dependencies between f
and g
to determine the correct order to execute tasks.
Quickstart: Data Management
For more details: Data Management
Operate on mutable data in-place
Dagger usually assumes that you won't be modifying the arguments passed to your functions, but you can tell Dagger you plan to mutate them with @mutable
:
A = Dagger.@mutable rand(1000, 1000)
Dagger.@spawn accumulate!(+, A, A)
This will lock A
(and any tasks that use it) to the current worker. You can also lock it to a different worker by creating the data within a task:
A = Dagger.spawn() do
Dagger.@mutable rand(1000, 1000)
end
or by specifying the worker
argument to @mutable
:
A = Dagger.@mutable worker=2 rand(1000, 1000)
Operate on distributed data
Often we want to work with more than one piece of data; the common case of wanting one piece of data per worker is easy to do by using @shard
:
X = Dagger.@shard myid()
This will execute myid()
independently on every worker in your Julia cluster, and place references to each within a Shard
object called X
. We can then use X
in task spawning, but we'll only get the result of myid()
that corresponds to the worker that the task is running on:
for w in workers()
@show fetch(Dagger.@spawn scope=Dagger.scope(worker=w) identity(X))
end
The above should print the result of myid()
for each worker in worker()
, as identity(X)
receives only the value of X
specific to that worker.
Reducing over distributed data
Reductions are often parallelized by reducing a set of partitions on each worker, and then reducing those intermediate reductions on a single worker. Dagger supports this easily with @shard
:
A = Dagger.@shard rand(1:20, 10000)
temp_bins = Dagger.@shard zeros(20)
hist! = (bins, arr) -> for elem in arr
bins[elem] += 1
end
wait.([Dagger.@spawn scope=Dagger.scope(;worker) hist!(temp_bins, A) for worker in procs()])
final_bins = sum(map(b->fetch(Dagger.@spawn copy(b)), temp_bins); dims=1)[1]
Here, A
points to unique random arrays, one on each worker, and temp_bins
points to a set of histogram bins on each worker. When we @spawn hist!
, Dagger passes in the random array and bins for only the specific worker that the task is run on; i.e. a call to hist!
that runs on worker 2 will get a different A
and temp_bins
from a call to hist!
on worker 3. All of the calls to hist!
may run in parallel.
By using map
on temp_bins
, we then make a copy of each worker's bins that we can safely return back to our current worker, and sum them together to get our total histogram.
Quickstart: File IO
Dagger has support for loading and saving files that integrates seamlessly with its task system, in the form of Dagger.File
and Dagger.tofile
.
These functions are not yet fully tested, so please make sure to take backups of any files that you load with them.
Loading files from disk
In order to load one or more files from disk, Dagger provides the File
function, which creates a lazy reference to a file:
f = Dagger.File("myfile.jls")
f
is now a lazy reference to "myfile.jls"
, and its contents can be loaded automatically by just passing the object to a task:
wait(Dagger.@spawn println(f))
# Prints the loaded contents of the file
By default, File
assumes that the file uses Julia's Serialization format; this can be easily changed to assume Arrow format, for example:
using Arrow
f = Dagger.File("myfile.arrow"; serialize=Arrow.write, deserialize=Arrow.Table)
Writing data to disk
Saving data to disk is as easy as loading it; tofile
provides this capability in a similar manner to File
:
A = rand(1000)
f = Dagger.tofile(A, "mydata.jls")
Like File
, f
can still be used to reference the file's data in tasks. It is likely most useful to use tofile
at the end of a task to save results:
function make_data()
A = rand(1000)
return Dagger.tofile(A, "mydata.jls")
end
fetch(Dagger.@spawn make_data())
# Data was also written to "mydata.jls"
tofile
takes the same keyword arguments as File
, allowing the format of data on disk to be specified as desired.
Quickstart: Distributed Arrays
Dagger's DArray
type represents a distributed array, where a single large array is implemented as a set of smaller array partitions, which may be distributed across a Julia cluster.
For more details: Distributed Arrays
Distribute an existing array
Distributing any kind of array into a DArray
is easy, just use distribute
, and specify the partitioning you desire with Blocks
. For example, to distribute a 16 x 16 matrix in 4 x 4 partitions:
A = rand(16, 16)
DA = distribute(A, Blocks(4, 4))
Allocate a distributed array directly
To allocate a DArray
, just pass your Blocks
partitioning object into the appropriate allocation function, such as rand
, ones
, or zeros
:
rand(Blocks(20, 20), 100, 100)
ones(Blocks(20, 100), 100, 2000)
zeros(Blocks(50, 20), 300, 200)
Convert a DArray
back into an Array
To get back an Array
from a DArray
, just call collect
:
DA = rand(Blocks(32, 32), 256, 128)
collect(DA) # returns a `Matrix{Float64}`
Quickstart: Datadeps
Datadeps is a feature in Dagger.jl that facilitates parallelism control within designated regions, allowing tasks to write to their arguments while ensuring dependencies are respected. For more details: Datadeps (Data Dependencies)
Syntax
The Dagger.spawn_datadeps() function is used to create a "datadeps region" where tasks are executed with parallelism controlled by specified dependencies:
Dagger.spawn_datadeps() do
Dagger.@spawn func!(Out(var_name_x), In(var_name_y))
end
Argument Annotation
In(X)
: Indicates that the variableX
is only read by the task (an "input").Out(X)
: Indicates that the variableX
is only written to by the task (an "output").InOut(X)
: Indicates that the variableX
is both read from and written to by the task (an "input" and "output" simultaneously).
Example with Datadeps
X = [4,5,6,7,1,2,3,9,8]
C = zeros(10)
Dagger.spawn_datadeps() do
Dagger.@spawn sort!(InOut(X))
Dagger.@spawn copyto!(Out(C), In(X))
end
# C = [1,2,3,4,5,6,7,8,9]
In this example, the sort!
function operates on array X
, while the copyto!
task reads from array X
and reads from array C
. By specifying dependencies using argument annotations, the tasks are executed in a controlled parallel manner, resulting in a sorted C
array.
Example without Datadeps
X = [4,5,6,7,1,2,3,9,8]
C = zeros(10)
Dagger.@spawn sort!(X)
Dagger.@spawn copyto!(C, X)
# C = [4,5,6,7,1,2,3,9,8]
In contrast to the previous example, here, the tasks are executed without argument annotations. As a result, there is a possibility of the copyto!
task being executed before the sort!
task, leading to unexpected results in the output array C
.
Quickstart: Streaming
Dagger.jl provides a streaming API that allows you to process data in a streaming fashion, where data is processed as it becomes available, rather than waiting for the entire dataset to be loaded into memory.
For more details: Streaming
Syntax
The Dagger.spawn_streaming()
function is used to create a streaming region, where tasks are executed continuously, processing data as it becomes available:
# Open a file to write to on this worker
f = Dagger.@mutable open("output.txt", "w")
t = Dagger.spawn_streaming() do
# Generate random numbers continuously
val = Dagger.@spawn rand()
# Write each random number to a file
Dagger.@spawn (f, val) -> begin
if val < 0.01
# Finish streaming when the random number is less than 0.01
Dagger.finish_stream()
end
println(f, val)
end
end
# Wait for all values to be generated and written
wait(t)
The above example demonstrates a streaming region that generates random numbers continuously and writes each random number to a file. The streaming region is terminated when a random number less than 0.01 is generated, which is done by calling Dagger.finish_stream()
(this terminates the current task, and will also terminate all streaming tasks launched by spawn_streaming
).