Scopes

Sometimes you will have data that is only meaningful in a certain location, such as within a single Julia process, a given server, or even for a specific Dagger processor. We call this location a "scope" in Dagger, denoting the bounds within which the data is meaningful and valid. For example, C pointers are typically scoped to a process, file paths are scoped to one or more servers dependent on filesystem configuration, etc. By default, Dagger doesn't recognize this; it treats everything passed into a task, or generated from a task, as inherently safe to transfer anywhere else. When this is not the case, Dagger provides optional scopes to instruct the scheduler where data is considered valid.

Scope Basics

Let's take the example of a webcam handle generated by VideoIO.jl. This handle is a C pointer, and thus has process scope. We can open the handle on a given process, and set the scope of the resulting data to be locked to the current process with Dagger.scope to construct a ProcessScope:

using VideoIO, Distributed

function get_handle()
    handle = VideoIO.opencamera()
    proc = Dagger.task_processor()
    scope = Dagger.scope(worker=myid()) # constructs a `ProcessScope`
    return Dagger.tochunk(handle, proc, scope)
end

cam_handle = Dagger.@spawn get_handle()

Now, wherever cam_handle is passed, Dagger will ensure that any computations on the handle only happen within its defined scope. For example, we can read from the camera:

cam_frame = Dagger.@spawn read(cam_handle)

The cam_frame task is executed within any processor on the same process that the cam_handle task was executed on. Of course, the resulting camera frame is not scoped to anywhere specific (denoted as AnyScope), and thus computations on it may execute anywhere.

You may also encounter situations where you want to use a callable struct (such as a closure, or a Flux.jl layer) only within a certain scope; you can specify the scope of the function pretty easily:

using Flux
m = Chain(...)
# If `m` is only safe to transfer to and execute on this process,
# we can set a `ProcessScope` on it:
result = Dagger.@spawn scope=Dagger.scope(worker=myid()) m(rand(8,8))

Setting a scope on the function treats it as a regular piece of data (like the arguments to the function), so it participates in the scoping rules described in the following sections all the same.

Scope Functions

Now, let's try out some other kinds of scopes, starting with NodeScope. This scope encompasses the server that one or more Julia processes may be running on. Say we want to use memory mapping (mmap) to more efficiently send arrays between two tasks. We can construct the mmap'd array in one task, attach a NodeScope() to it, and using the path of the mmap'd file to communicate its location, lock downstream tasks to the same server:

using Mmap

function generate()
    path = "myfile.bin"
    arr = Mmap.mmap(path, Matrix{Int}, (64,64))
    fill!(arr, 1)
    Mmap.sync!(arr)
    # Note: Dagger.scope() does not yet support node scopes
    Dagger.tochunk(path, Dagger.task_processor(), NodeScope())
end

function consume(path)
    arr = Mmap.mmap(path, Matrix{Int}, (64,64))
    sum(arr)
end

a = Dagger.@spawn generate()
@assert fetch(Dagger.@spawn consume(a)) == 64*64

Whatever server a executed on, b will also execute on it!

Finally, we come to the "lowest" scope on the scope hierarchy, the ExactScope. This scope specifies one exact processor as the bounding scope, and is typically useful in certain limited cases (such as data existing only on a specific GPU). We won't provide an example here, because you don't usually need to ever use this scope, but if you already understand the NodeScope and ProcessScope, the ExactScope should be easy to figure out.

Union Scopes

Sometimes one simple scope isn't enough! In that case, you can use the UnionScope to construct the union of two or more scopes. Say, for example, you have some sensitive data on your company's servers that you want to compute summaries of, but you'll be driving the computation from your laptop, and you aren't allowed to send the data itself outside of the company's network. You could accomplish this by constructing a UnionScope of ProcessScopes of each of the non-laptop Julia processes, and use that to ensure that the data in its original form always stays within the company network:

addprocs(4) # some local processors
procs = addprocs([("server.company.com", 4)]) # some company processors

secrets_scope = UnionScope(ProcessScope.(procs))

function generate_secrets()
    secrets = open("/shared/secret_results.txt", "r") do io
        String(read(io))
    end
    Dagger.tochunk(secrets, Dagger.task_processor(), secrets_scope)
end

summarize(secrets) = occursin("QA Pass", secrets)

# Generate the data on the first company process
sensitive_data = Dagger.@spawn single=first(procs) generate_secrets()

# We can safely call this, knowing that it will be executed on a company server
qa_passed = Dagger.@spawn summarize(sensitive_data)

Mismatched Scopes

You might now be thinking, "What if I want to run a task on multiple pieces of data whose scopes don't match up?" In such a case, Dagger will throw an error, refusing to schedule that task, since the intersection of the data scopes is an empty set (there is no feasible processor which can satisfy the scoping constraints). For example:

ps2 = ProcessScope(2)
ps3 = ProcessScope(3)

generate(scope) = Dagger.tochunk(rand(64), Dagger.task_processor(), scope)

d2 = Dagger.@spawn generate(ps2) # Run on process 2
d3 = Dagger.@spawn generate(ps3) # Run on process 3
res = Dagger.@spawn d2 * d3 # An error!

Moral of the story: only use scopes when you know you really need them, and if you aren't careful to arrange everything just right, be prepared for Dagger to refuse to schedule your tasks! Scopes should only be used to ensure correctness of your programs, and are not intended to be used to optimize the schedule that Dagger uses for your tasks, since restricting the scope of execution for tasks will necessarily reduce the optimizations that Dagger's scheduler can perform.