Dagger Functions
Base.fetchBase.waitDagger.Sch.add_thunk!Dagger.Sch.exec!Dagger.Sch.get_dag_idsDagger.Sch.halt!Dagger.Sch.sch_handleDagger.addprocs!Dagger.constrainDagger.default_enabledDagger.default_optionDagger.execute!Dagger.get_optionsDagger.get_parentDagger.get_processorsDagger.get_tlsDagger.in_taskDagger.iscompatibleDagger.moveDagger.mutableDagger.rmprocs!Dagger.scopeDagger.set_distributed_package!Dagger.set_tls!Dagger.shardDagger.spawnDagger.spawn_datadepsDagger.task_processorDagger.tochunkDagger.with_optionsDagger.@mutableDagger.@optionDagger.@shardDagger.@spawn
Task Functions/Macros
Dagger.@spawn — MacroDagger.@spawn [option=value]... f(args...; kwargs...) -> DTaskSpawns a Dagger DTask that will call f(args...; kwargs...). This DTask is like a Julia Task, and has many similarities:
- The
DTaskcan bewait'd on andfetch'd from to see its final result - By default, the
DTaskwill be automatically run on the first available compute resource - If all dependencies are satisfied, the
DTaskwill be run as soon as possible - The
DTaskmay be run in parallel with otherDTasks, and the scheduler will automatically manage dependencies - If a
DTaskthrows an exception, it will be propagated to any calls tofetch, but not to calls towait
However, the DTask also has many key differences from a Task:
- The
DTaskmay run on any thread of any Julia process, and even on a remote machine, in your cluster (seeDistributed.addprocs) - The
DTaskmight automatically utilize GPUs or other accelerators, if available - If arguments to a
DTaskare alsoDTasks, then the scheduler will execute those arguments'DTasks first, before running the "downstream" task - If an argument to a
DTaskt2is aDTaskt1, then the result oft1(gotten viafetch(t1)) will be passed tot2(no need fort2to callfetch!) DTasks are generally expected to be defined "functionally", meaning that they should not mutate global state, mutate their arguments, or have side effectsDTasks are function call-focused, meaning thatDagger.@spawnexpects a single function call, and not a block of code- All
DTaskarguments are expected to be safe to serialize and send to other Julia processes; if not, use thescopeoption orDagger.@mutableto control execution location
Options to the DTask can be set before the call to f with key-value syntax, e.g. Dagger.@spawn myopt=2 do_something(1, 3.0), which would set the option myopt to 2 for this task. Multiple options may be provided, which are specified like Dagger.@spawn myopt=2 otheropt=4 do_something(1, 3.0).
These options control a variety of properties of the resulting DTask:
scope: The execution "scope" of the task, which determines where the task will run. By default, the task will run on the first available compute resource. If you have multiple compute resources, you can specify a scope to run the task on a specific resource. For example,Dagger.@spawn scope=Dagger.scope(worker=2) do_something(1, 3.0)would rundo_something(1, 3.0)on worker 2.meta: Iftrue, instead of the scheduler automatically fetching values from other tasks, the rawChunkobjects will be passed tof. Useful for doing manual fetching or manipulation ofChunkreferences. Non-Chunkarguments are still passed as-is.
Other options exist; see Dagger.Options for the full list.
This macro is a semi-thin wrapper around Dagger.spawn - it creates a call to Dagger.spawn on f with arguments args and keyword arguments kwargs, and also passes along any options in an Options struct. For example, Dagger.@spawn myopt=2 do_something(1, 3.0) would essentially become Dagger.spawn(do_something, Dagger.Options(;myopt=2), 1, 3.0).
Dagger.spawn — FunctionDagger.spawn(f, args...; kwargs...) -> DTaskSpawns a DTask that will call f(args...; kwargs...). Also supports passing a Dagger.Options struct as the first argument to set task options. See Dagger.@spawn for more details on DTasks.
Task Options Functions/Macros
Dagger.with_options — Functionwith_options(f, options::NamedTuple) -> Any
with_options(f; options...) -> AnySets one or more scoped options to the given values, executes f(), resets the options to their previous values, and returns the result of f(). This is the recommended way to set scoped options, as it only affects tasks spawned within its scope. Note that setting an option here will propagate its value across Julia or Dagger tasks spawned by f() or its callees (i.e. the options propagate).
Dagger.get_options — Functionget_options(key::Symbol, default) -> Any
get_options(key::Symbol) -> AnyReturns the value of the scoped option named key. If option does not have a value set, then an error will be thrown, unless default is set, in which case it will be returned instead of erroring.
get_options() -> NamedTupleReturns a NamedTuple of all scoped option key-value pairs.
Dagger.@option — Macro@option name myfunc(A, B, C) = valueA convenience macro for defining default_option. For example:
Dagger.@option single mylocalfunc(Int) = 1The above call will set the single option to 1 for any Dagger task calling mylocalfunc(Int) with an Int argument.
Dagger.default_option — Functiondefault_option(::Val{name}, Tf, Targs...) where name = valueDefines the default value for option name to value when Dagger is preparing to execute a function with type Tf with the argument types Targs. Users and libraries may override this to set default values for tasks.
An easier way to define these defaults is with @option.
Note that the actual task's argument values are not passed, as it may not always be possible or efficient to gather all Dagger task arguments on one worker.
This function may be executed within the scheduler, so it should generally be made very cheap to execute. If the function throws an error, the scheduler will use whatever the global default value is for that option instead.
Data Management Functions
Dagger.tochunk — Functiontochunk(x, proc::Processor, scope::AbstractScope; device=nothing, rewrap=false, kwargs...) -> ChunkCreate a chunk from data x which resides on proc and which has scope scope.
device specifies a MemPool.StorageDevice (which is itself wrapped in a Chunk) which will be used to manage the reference contained in the Chunk generated by this function. If device is nothing (the default), the data will be inspected to determine if it's safe to serialize; if so, the default MemPool storage device will be used; if not, then a MemPool.CPURAMDevice will be used.
If rewrap==true and x isa Chunk, then the Chunk will be rewrapped in a new Chunk.
All other kwargs are passed directly to MemPool.poolset.
Dagger.mutable — Functionmutable(f::Base.Callable; worker, processor, scope) -> ChunkCalls f() on the specified worker or processor, returning a Chunk referencing the result with the specified scope scope.
Dagger.@mutable — Macro@mutable [worker=1] [processor=OSProc()] [scope=ProcessorScope()] f()Helper macro for mutable().
Dagger.@shard — MacroCreates a Shard. See Dagger.shard for details.
Dagger.shard — Functionshard(f; kwargs...) -> Chunk{Shard}Executes f on all workers in workers, wrapping the result in a process-scoped Chunk, and constructs a Chunk{Shard} containing all of these Chunks on the current worker.
Keyword arguments:
procs– The list of processors to create pieces on. May be any iterable container ofProcessors.workers– The list of workers to create pieces on. May be any iterable container ofIntegers.per_thread::Bool=false– Iftrue, creates a piece per each thread, rather than a piece per each worker.
Data Dependencies Functions
Dagger.spawn_datadeps — Functionspawn_datadeps(f::Base.Callable; traversal::Symbol=:inorder)Constructs a "datadeps" (data dependencies) region and calls f within it. Dagger tasks launched within f may wrap their arguments with In, Out, or InOut to indicate whether the task will read, write, or read+write that argument, respectively. These argument dependencies will be used to specify which tasks depend on each other based on the following rules:
- Dependencies across unrelated arguments are independent; only dependencies on arguments which overlap in memory synchronize with each other
InOutis the same asInandOutapplied simultaneously, and synchronizes with the union of theInandOuteffects- Any two or more
Independencies do not synchronize with each other, and may execute in parallel - An
Outdependency synchronizes with any previousInandOutdependencies - An
Independency synchronizes with any previousOutdependencies - If unspecified, an
Independency is assumed
In general, the result of executing tasks following the above rules will be equivalent to simply executing tasks sequentially and in order of submission. Of course, if dependencies are incorrectly specified, undefined behavior (and unexpected results) may occur.
Unlike other Dagger tasks, tasks executed within a datadeps region are allowed to write to their arguments when annotated with Out or InOut appropriately.
At the end of executing f, spawn_datadeps will wait for all launched tasks to complete, rethrowing the first error, if any. The result of f will be returned from spawn_datadeps.
The keyword argument traversal controls the order that tasks are launched by the scheduler, and may be set to :bfs or :dfs for Breadth-First Scheduling or Depth-First Scheduling, respectively. All traversal orders respect the dependencies and ordering of the launched tasks, but may provide better or worse performance for a given set of datadeps tasks. This argument is experimental and subject to change.
Scope Functions
Dagger.scope — Functionscope(scs...) -> AbstractScope
scope(;scs...) -> AbstractScopeConstructs an AbstractScope from a set of scope specifiers. Each element in scs is a separate specifier; if scs is empty, an empty UnionScope() is produced; if scs has one element, then exactly one specifier is constructed; if scs has more than one element, a UnionScope of the scopes specified by scs is constructed. A variety of specifiers can be passed to construct a scope:
:any- Constructs anAnyScope():default- Constructs aDefaultScope()(scs...,)- Constructs aUnionScopeof scopes, each specified byscsthread=tidorthreads=[tids...]- Constructs anExactScopeorUnionScopecontaining allDagger.ThreadProcs with thread IDtid/tidsacross all workers.worker=widorworkers=[wids...]- Constructs aProcessScopeorUnionScopecontaining allDagger.ThreadProcs with worker IDwid/widsacross all threads.thread=tid/threads=tidsandworker=wid/workers=wids- Constructs anExactScope,ProcessScope, orUnionScopecontaining allDagger.ThreadProcs with worker IDwid/widsand threadstid/tids.
Aside from the worker and thread specifiers, it's possible to add custom specifiers for scoping to other kinds of processors (like GPUs) or providing different ways to specify a scope. Specifier selection is determined by a precedence ordering: by default, all specifiers have precedence 0, which can be changed by defining scope_key_precedence(::Val{spec}) = precedence (where spec is the specifier as a Symbol). The specifier with the highest precedence in a set of specifiers is used to determine the scope by calling to_scope(::Val{spec}, sc::NamedTuple) (where sc is the full set of specifiers), which should be overriden for each custom specifier, and which returns an AbstractScope. For example:
# Setup a GPU specifier
Dagger.scope_key_precedence(::Val{:gpu}) = 1
Dagger.to_scope(::Val{:gpu}, sc::NamedTuple) = ExactScope(MyGPUDevice(sc.worker, sc.gpu))
# Generate an `ExactScope` for `MyGPUDevice` on worker 2, device 3
Dagger.scope(gpu=3, worker=2)Dagger.constrain — Functionconstraint(x::AbstractScope, y::AbstractScope) -> ::AbstractScopeConstructs a scope that is the intersection of scopes x and y.
Processor Functions
Dagger.execute! — Functionexecute!(proc::Processor, f, args...; kwargs...) -> AnyExecutes the function f with arguments args and keyword arguments kwargs on processor proc. This function can be overloaded by Processor subtypes to allow executing function calls differently than normal Julia.
Dagger.iscompatible — Functioniscompatible(proc::Processor, opts, f, Targs...) -> BoolIndicates whether proc can execute f over Targs given opts. Processor subtypes should overload this function to return true if and only if it is essentially guaranteed that f(::Targs...) is supported. Additionally, iscompatible_func and iscompatible_arg can be overriden to determine compatibility of f and Targs individually. The default implementation returns false.
Dagger.default_enabled — Functiondefault_enabled(proc::Processor) -> BoolReturns whether processor proc is enabled by default. The default value is false, which is an opt-out of the processor from execution when not specifically requested by the user, and true implies opt-in, which causes the processor to always participate in execution when possible.
Dagger.get_processors — Functionget_processors(proc::Processor) -> Set{<:Processor}Returns the set of processors contained in proc, if any. Processor subtypes should overload this function if they can contain sub-processors. The default method will return a Set containing proc itself.
Dagger.get_parent — Functionget_parent(proc::Processor) -> ProcessorReturns the parent processor for proc. The ultimate parent processor is an OSProc. Processor subtypes should overload this to return their most direct parent.
Dagger.move — Functionmove(from_proc::Processor, to_proc::Processor, x)Moves and/or converts x such that it's available and suitable for usage on the to_proc processor. This function can be overloaded by Processor subtypes to transport arguments and convert them to an appropriate form before being used for exection. Subtypes of Processor wishing to implement efficient data movement should provide implementations where x::Chunk.
Dagger.get_tls — Functionget_tls() -> DTaskTLSGets all Dagger TLS variable as a DTaskTLS.
Dagger.set_tls! — Functionset_tls!(tls::NamedTuple)Sets all Dagger TLS variables from tls, which may be a DTaskTLS or a NamedTuple.
Context Functions
Dagger.addprocs! — Functionaddprocs!(ctx::Context, xs)Add new workers xs to ctx.
Workers will typically be assigned new tasks in the next scheduling iteration if scheduling is ongoing.
Workers can be either Processors or the underlying process IDs as Integers.
Dagger.rmprocs! — Functionrmprocs!(ctx::Context, xs)Remove the specified workers xs from ctx.
Workers will typically finish all their assigned tasks if scheduling is ongoing but will not be assigned new tasks after removal.
Workers can be either Processors or the underlying process IDs as Integers.
Distributed Package Selection Functions
Dagger.set_distributed_package! — Functionset_distributed_package!(value[="Distributed|DistributedNext"])Set a preference for using either the Distributed.jl stdlib or DistributedNext.jl. You will need to restart Julia after setting a new preference.
DTask Execution Environment Functions
These functions are used within the function called by a DTask.
Dagger.in_task — Functionin_task() -> BoolReturns true if currently executing in a DTask, else false.
Dagger.task_processor — Functiontask_processor() -> ProcessorGet the current processor executing the current DTask.
Dynamic Scheduler Control Functions
These functions query and control the scheduler remotely.
Dagger.Sch.sch_handle — FunctionGets the scheduler handle for the currently-executing thunk.
Dagger.Sch.add_thunk! — FunctionAdds a new Thunk to the DAG.
Base.fetch — FunctionBase.fetch(c::DArray)If a DArray tree has a Thunk in it, make the whole thing a big thunk.
Waits on a thunk to complete, and fetches its result.
Base.wait — FunctionWaits on a thunk to complete.
Dagger.Sch.exec! — FunctionExecutes an arbitrary function within the scheduler, returning the result.
Dagger.Sch.halt! — FunctionCommands the scheduler to halt execution immediately.
Dagger.Sch.get_dag_ids — FunctionReturns all Thunks IDs as a Dict, mapping a Thunk to its downstream dependents.