Job Scheduling

# examples/05-job_schedule.jl
# This example demonstrates a job scheduling through adding the
# number 100 to every component of the vector data. The root
# assigns one element to each worker to compute the operation.
# When the worker is finished, the root sends another element
# until each element is added 100
# Inspired on
# https://www.hpc.ntnu.no/ntnu-hpc-group/vilje/user-guide/software/mpi-and-mpi-io-training-tutorial/basic-mpi/job-queue

using MPI

function job_queue(data,f)
    MPI.Init()

    comm = MPI.COMM_WORLD
    rank = MPI.Comm_rank(comm)
    world_size = MPI.Comm_size(comm)
    nworkers = world_size - 1

    root = 0

    MPI.Barrier(comm)
    T = eltype(data)
    N = size(data)[1]
    send_mesg = Array{T}(undef, 1)
    recv_mesg = Array{T}(undef, 1)

    if rank == root # I am root

        idx_recv = 0
        idx_sent = 1

        new_data = Array{T}(undef, N)
        # Array of workers requests
        sreqs_workers = Array{MPI.Request}(undef,nworkers)
        # -1 = start, 0 = channel not available, 1 = channel available
        status_workers = ones(nworkers).*-1

        # Send message to workers
        for dst in 1:nworkers
            if idx_sent > N
                break
            end
            send_mesg[1] = data[idx_sent]
            sreq = MPI.Isend(send_mesg, comm; dest=dst, tag=dst+32)
            idx_sent += 1
            sreqs_workers[dst] = sreq
            status_workers[dst] = 0
            print("Root: Sent number $(send_mesg[1]) to Worker $dst\n")
        end

        # Send and receive messages until all elements are added
        while idx_recv != N
            # Check to see if there is an available message to receive
            for dst in 1:nworkers
                if status_workers[dst] == 0
                    flag = MPI.Test(sreqs_workers[dst])
                    if flag
                        status_workers[dst] = 1
                    end
                end
            end
            for dst in 1:nworkers
                if status_workers[dst] == 1
                    ismessage = MPI.Iprobe(comm; source=dst, tag=dst+32)
                    if ismessage
                        # Receives message
                        MPI.Recv!(recv_mesg, comm; source=dst, tag=dst+32)
                        idx_recv += 1
                        new_data[idx_recv] = recv_mesg[1]
                        print("Root: Received number $(recv_mesg[1]) from Worker $dst\n")
                        if idx_sent <= N
                            send_mesg[1] = data[idx_sent]
                            # Sends new message
                            sreq = MPI.Isend(send_mesg, comm; dest=dst, tag=dst+32)
                            idx_sent += 1
                            sreqs_workers[dst] = sreq
                            status_workers[dst] = 1
                            print("Root: Sent number $(send_mesg[1]) to Worker $dst\n")
                        end
                    end
                end
            end
        end

        for dst in 1:nworkers
            # Termination message to worker
            send_mesg[1] = -1
            sreq = MPI.Isend(send_mesg, comm; dest=dst, tag=dst+32)
            sreqs_workers[dst] = sreq
            status_workers[dst] = 0
            print("Root: Finish Worker $dst\n")
        end

        MPI.Waitall(sreqs_workers)
        print("Root: New data = $new_data\n")
    else # If rank == worker
        # -1 = start, 0 = channel not available, 1 = channel available
        status_worker = -1
        while true
            sreqs_workers = Array{MPI.Request}(undef,1)
            ismessage = MPI.Iprobe(comm; source=root, tag=rank+32)

            if ismessage
                # Receives message
                MPI.Recv!(recv_mesg, comm; source=root, tag=rank+32)
                # Termination message from root
                if recv_mesg[1] == -1
                    print("Worker $rank: Finish\n")
                    break
                end
                print("Worker $rank: Received number $(recv_mesg[1]) from root\n")
                # Apply function (add number 100) to array
                send_mesg = f(recv_mesg)
                sreq = MPI.Isend(send_mesg, comm; dest=root, tag=rank+32)
                sreqs_workers[1] = sreq
                status_worker = 0
            end
            # Check to see if there is an available message to receive
            if status_worker == 0
                flag = MPI.Test(sreqs_workers[1])
                if flag
                    status_worker = 1
                end
            end
        end
    end
    MPI.Barrier(comm)
    MPI.Finalize()
end

f = x -> x.+100
data = collect(1:10)
job_queue(data,f)
> mpiexecjl -n 4 julia examples/05-job_schedule.jl
Root: Sent number 1 to Worker 1
Worker 1: Received number 1 from root
Root: Sent number 2 to Worker 2
Root: Sent number 3 to Worker 3
Root: Received number 101 from Worker 1
Root: Sent number 4 to Worker 1
Worker 1: Received number 4 from root
Root: Received number 104 from Worker 1
Root: Sent number 5 to Worker 1
Worker 1: Received number 5 from root
Root: Received number 105 from Worker 1
Root: Sent number 6 to Worker 1
Worker 1: Received number 6 from root
Root: Received number 106 from Worker 1
Root: Sent number 7 to Worker 1
Worker 1: Received number 7 from root
Root: Received number 107 from Worker 1
Root: Sent number 8 to Worker 1
Worker 1: Received number 8 from root
Root: Received number 108 from Worker 1
Root: Sent number 9 to Worker 1
Worker 1: Received number 9 from root
Root: Received number 109 from Worker 1
Root: Sent number 10 to Worker 1
Worker 1: Received number 10 from root
Root: Received number 110 from Worker 1
Worker 2: Received number 2 from root
Root: Received number 102 from Worker 2
Worker 3: Received number 3 from root
Root: Received number 103 from Worker 3
Root: Finish Worker 1
Worker 1: Finish
Root: Finish Worker 2
Worker 2: Finish
Root: Finish Worker 3
Worker 3: Finish
Root: New data = [101, 104, 105, 106, 107, 108, 109, 110, 102, 103]