# examples/05-job_schedule.jl
# This example demonstrates a job scheduling through adding the
# number 100 to every component of the vector data. The root
# assigns one element to each worker to compute the operation.
# When the worker is finished, the root sends another element
# until each element is added 100
# Inspired by https://www.hpc.ntnu.no/vilje/software/mpi-and-mpi-io-training-tutorial/
# https://www.hpc.ntnu.no/vilje/software/mpi-and-mpi-io-training-tutorial/basic-mpi/job-queue/
# an updated job_queue.c is available in the basic_mpi/04_job_queue/src subdirectory of
# the extracted https://www.hpc.ntnu.no/wp-content/uploads/2019/09/mpiexamples.tar.gz
using MPI
function job_queue(data,f)
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
world_size = MPI.Comm_size(comm)
nworkers = world_size - 1
root = 0
MPI.Barrier(comm)
T = eltype(data)
N = size(data)[1]
send_mesg = Array{T}(undef, 1)
recv_mesg = Array{T}(undef, 1)
if rank == root # I am root
idx_recv = 0
idx_sent = 1
new_data = Array{T}(undef, N)
# Array of workers requests
sreqs_workers = Array{MPI.Request}(undef,nworkers)
# -1 = start, 0 = channel not available, 1 = channel available
status_workers = ones(nworkers).*-1
# Send message to workers
for dst in 1:nworkers
if idx_sent > N
break
end
send_mesg[1] = data[idx_sent]
sreq = MPI.Isend(send_mesg, comm; dest=dst, tag=dst+32)
idx_sent += 1
sreqs_workers[dst] = sreq
status_workers[dst] = 0
print("Root: Sent number $(send_mesg[1]) to Worker $dst\n")
end
# Send and receive messages until all elements are added
while idx_recv != N
# Check to see if there is an available message to receive
for dst in 1:nworkers
if status_workers[dst] == 0
flag = MPI.Test(sreqs_workers[dst])
if flag
status_workers[dst] = 1
end
end
end
for dst in 1:nworkers
if status_workers[dst] == 1
ismessage = MPI.Iprobe(comm; source=dst, tag=dst+32)
if ismessage
# Receives message
MPI.Recv!(recv_mesg, comm; source=dst, tag=dst+32)
idx_recv += 1
new_data[idx_recv] = recv_mesg[1]
print("Root: Received number $(recv_mesg[1]) from Worker $dst\n")
if idx_sent <= N
send_mesg[1] = data[idx_sent]
# Sends new message
sreq = MPI.Isend(send_mesg, comm; dest=dst, tag=dst+32)
idx_sent += 1
sreqs_workers[dst] = sreq
status_workers[dst] = 1
print("Root: Sent number $(send_mesg[1]) to Worker $dst\n")
end
end
end
end
end
for dst in 1:nworkers
# Termination message to worker
send_mesg[1] = -1
sreq = MPI.Isend(send_mesg, comm; dest=dst, tag=dst+32)
sreqs_workers[dst] = sreq
status_workers[dst] = 0
print("Root: Finish Worker $dst\n")
end
MPI.Waitall(sreqs_workers)
print("Root: New data = $new_data\n")
else # If rank == worker
# -1 = start, 0 = channel not available, 1 = channel available
status_worker = -1
while true
sreqs_workers = Array{MPI.Request}(undef,1)
ismessage = MPI.Iprobe(comm; source=root, tag=rank+32)
if ismessage
# Receives message
MPI.Recv!(recv_mesg, comm; source=root, tag=rank+32)
# Termination message from root
if recv_mesg[1] == -1
print("Worker $rank: Finish\n")
break
end
print("Worker $rank: Received number $(recv_mesg[1]) from root\n")
# Apply function (add number 100) to array
send_mesg = f(recv_mesg)
sreq = MPI.Isend(send_mesg, comm; dest=root, tag=rank+32)
sreqs_workers[1] = sreq
status_worker = 0
end
# Check to see if there is an available message to receive
if status_worker == 0
flag = MPI.Test(sreqs_workers[1])
if flag
status_worker = 1
end
end
end
end
MPI.Barrier(comm)
MPI.Finalize()
end
f = x -> x.+100
data = collect(1:10)
job_queue(data,f)
> mpiexecjl -n 4 julia examples/05-job_schedule.jl
Root: Sent number 1 to Worker 1
Root: Sent number 2 to Worker 2
Root: Sent number 3 to Worker 3
Root: Received number 101 from Worker 1
Root: Sent number 4 to Worker 1
Root: Received number 104 from Worker 1
Root: Sent number 5 to Worker 1
Root: Received number 105 from Worker 1
Root: Sent number 6 to Worker 1
Root: Received number 106 from Worker 1
Root: Sent number 7 to Worker 1
Root: Received number 107 from Worker 1
Root: Sent number 8 to Worker 1
Root: Received number 108 from Worker 1
Root: Sent number 9 to Worker 1
Root: Received number 109 from Worker 1
Root: Sent number 10 to Worker 1
Root: Received number 110 from Worker 1
Worker 1: Received number 1 from root
Worker 1: Received number 4 from root
Worker 1: Received number 5 from root
Worker 1: Received number 6 from root
Worker 1: Received number 7 from root
Worker 1: Received number 8 from root
Worker 1: Received number 9 from root
Worker 1: Received number 10 from root
Worker 3: Received number 3 from root
Worker 2: Received number 2 from root
Root: Received number 103 from Worker 3
Root: Received number 102 from Worker 2
Root: Finish Worker 1
Worker 1: Finish
Root: Finish Worker 2
Worker 2: Finish
Root: Finish Worker 3
Worker 3: Finish
Root: New data = [101, 104, 105, 106, 107, 108, 109, 110, 103, 102]