SS22 Scientific Coding with Julia

Distributed computing in Julia

In Julia we talk about distributed computing when we run Julia processes with separate memory spaces. Julia does not distinguish between processes on the same or on multiple computers. The main idea is to have remote execution.

Before we start with distributed computing we introduce tasks.

Tasks

Tasks are part of the asynchronous programming concepts implemented in Julia. We can think of a task as a work package with a create-start-run-finish life cycle. This means, a task can be created and scheduled independently. Tasks are the basic building block for performing distributed computing in Julia. This concept of tasks is actually rather similar to how High Performance Computing (HPC) systems work with their job scheduler.

To create a task we can use the @task macro, that will return a runnable but will not execute it. We use the schedule command to actually execute it.

julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007fb859f336b0

julia> schedule(t)
Task (runnable) @0x00007fb859f336b0

julia> done
julia>

The main idea is, that a task can be interrupted and the order of tasks is not set. This means if we have multiple tasks the order of execution is not guaranteed. This also means, that the main Julia process is not blocked by a task but works independently. If we want to wait for the task to finish before the calling task continues, we can do this with wait(t).

Most of the time we create a task and schedule it right away. This can be done by the @async macro. It is basically equivalent to schedule(@task x). We can also catch the task (and therefore the state) with t = @async x. If we want to wait for several tasks, we can use the @sync macro. It will wait until all enclosed tasks spanned by @async, @spawn, @spawnat, and @distributed are completed.

A task can also return a value, we can get the value with fetch(t) or with take!(t), which will remove the value as well.

The following examples are taken from Parallel Computing Class on JuliaAcademy.com, section Tasks:

  1. How long will this take?

@time for i in 1:10
          sleep(1)
      end
  1. How long will this take?

@time for i in 1:10
          @async sleep(1)
      end
  1. How long will this take?

@time @sync for i in 1:10
                @async sleep(1)
            end

Solution

julia> @time for i in 1:10
                 sleep(1)
             end
 10.020172 seconds (51 allocations: 1.703 KiB)

julia> @time for i in 1:10
                 @async sleep(1)
             end
  0.017384 seconds (6.40 k allocations: 399.856 KiB, 96.91% compilation time)

julia> @time @sync for i in 1:10
                       @async sleep(1)
                   end
  1.047838 seconds (847 allocations: 53.656 KiB, 4.34% compilation time)

The π\pi example with tasks

Now we apply the task knowledge to our example for computing π\pi.

Define a new function in_unit_circle_task with the @async and @sync macros. Split up N into 4 parts, same as for threads, and schedule the tasks in a loop. Define a vector of tasks to catch the results inside the loop.

Solution

function in_unit_circle_task(N::Int64)
    n = 4
    len, rem = divrem(N, n)
    t = Vector{Task}(undef, n)
    
    @sync for i in 1:N
        t[i] = @async in_unit_circle(len)
    end
    
    M = sum(map((x) -> fetch(t[x]), 1:4))
    
    return M
end

and we test it

julia> get_accuracy(in_unit_circle_task, N)
  3.002385561856613e-5

julia> @btime estimate_pi(in_unit_circle_task, N);
  2.549 s (46 allocations: 2.72 KiB)

Back to distributed computing

Now that we know what a task is and how to create one we have no difficulty to define what distributed computing is. It is simply the way to distribute tasks on multiple CPUs or computers. In Julia, this multiprocessing environment is based on message passing. It allows tasks to run on multiple processes in separate memory domains, but all at once. The communication in Julia is not like the one used by MPI. It is one-sided, that is we only need to manage one process in a two-process operation. These management instructions are also not sent/receive messages but calls to functions or something similar. For this, Julia provides two primitives, remote reference and remote calls, the documentation tells us:

A remote reference is an object that can be used from any process to refer to an object stored on a particular process. A remote call is a request by one process to call a certain function on certain arguments on another (possibly the same) process.

All of this is managed from the Distributed package. It is easy to imagine, that the field of distributed computing can quickly become quite extensive, so let us look at some concepts that are useful for our π\pi example and give us an idea on how this works.

The π\pi example for distributed computing

In order to start with distributed computing, we need to add some distributed processes or workers. Similar as with threads each process has an associated identifier. The process providing the Julia REPL or the main call has id 1. As long as there are more than two processes, each process that does not have id 1 is considered a worker process.

We can add workers at startup:

$ julia -p 2

where we defined 2 workers on the local machine. We can also add workers from within Julia by calling

julia> using Distributed
julia> addprocs(4)
5

This added four workers and in total we have now 5 processes, hence the return value. Consequently, we can remove workers again by calling

julia> rmprocs(2:5)
Task (done) @0x00007fb859fc1430

and with myid() we get the id of the process we are on.

The @distributed macro

There is an obvious problem right away. If we define a function or a variable on a process how does another process know about this?

The easiest concept is to run a parallel for loop. As before we can do this with a macro, namely the @distributed macro. The general construct is

@distributed [reducer] for var = range
    body
end

The specified range is partitioned and locally executed across all workers. In case an optional reducer function is specified, @distributed performs local reductions on each worker with a final reduction on the calling process.

Note that without a reducer function, @distributed executes asynchronously, i.e. it spawns independent tasks on all available workers and returns immediately without waiting for completion. To wait for completion, prefix the call with @sync, like :

@sync @distributed for var = range
    body
end
What is the result for the following code snippet?

julia> workers()
4-element Vector{Int64}:
 2
 3
 4 
 5

julia> a = zeros(5);

julia> @sync @distributed for i in 1:5
                              a[i] = i
                          end

Luckily, our π\pi example does not need much data movement.

Define a new function in_unit_circle_distributed1 with the @distributed macro and an appropriate reducer function (the syntax is x = @distributed (operator) for ...). Note that we might need to always return a value inside the loop. For comparison start 4 workers and test the accuracy and measure the performance.
Solution

function in_unit_circle_distributed1(N::Int64)
    M = @distributed (+) for i in 1:N
        if (rand()^2 + rand()^2) < 1
            1
        else
            0
        end
    end

    return M
end

and we test it

julia> nprocs()
1
julia> addprocs(4)
5
julia> get_accuracy(in_unit_circle_distributed1, N)
  4.179204767140732e-5

julia> @btime estimate_pi(in_unit_circle_distributed1, N);
  649.564 ms (289 allocations: 12.27 KiB)

We can see that this is a very easy way to parallelize and this time the rand() function is not causing problems. We are already faster than the basic implementation and close to the optimized four threads implementation.

The distributed for loop with @distributed is designed to work well for situations where each iteration is tiny (in terms of computational effort/workload). Of course, there is also the other possibility, that we have a function with a massive workload and work with the results of these calls.

The pmap and the @everywhere macro

As mentioned before, we need to get functions to all the workers, in order to execute them. For distributing a functions or for loading modules we can use the @everywhere macro. As the name suggests, it will make sure that the function is available in the scope of each worker and the main process. We simply prepend a function with @everywhere and nothing more is required. There are some things to note for this case:

  • the function will be compiled on each worker on the first call.

  • no local variables are captured but they can be broadcasted (arguments are broadcasted)

foo = 2
@everywhere bar = $foo
  • the function will only be available on workers that were present during the call, every worker that is added later will not have it defined.

  • a module can be loaded on every worker with @everywhere using <modulename>.

The function pmap is the parallel version of the map function. Let us start with map before we go on to pmap. The basic idea is to map a collection to a function by applying the function to each element. The result is again a collection. The syntax is as follows:

map(f, c...) -> collection

We can even include multiple collections and it will apply the function until one collection is exhausted.

This examples is copied from the docs

julia> map(+, [1, 2, 3], [10, 20, 30, 400, 5000])
3-element Vector{Int64}:
 11
 22
 33

julia> map(x -> x * 2, [1, 2, 3])
3-element Vector{Int64}:
 2
 4
 6

The second example also includes an anonymous function. The idea is simply to have a function that is not needed outside of the scope of a function call.

Now, pmap just distributes the map function on workers. It has a lot of optional arguments to influence how this is done but we will not need this.

Define a new function in_unit_circle_distributed2 with the @everywhere macro and the pmap function.

Hint: define an inner function that is distributed to all workers via @everywhere and collect the results with pmap. Split up the works similar as for the last example in tasks.

Solution

@everywhere function in_unit_circle_distributed2_inner(N::Int64)
    M = 0
    
    for i in 1:N
        if (rand()^2 + rand()^2) < 1
            M += 1
        end
    end

    return M    
end

function in_unit_circle_distributed2(N::Int64)
    len, rem = divrem(N, nprocs() - 1)
    M = sum(
            pmap(
            (x) -> in_unit_circle_distributed2_inner(len), 2:nprocs()
            )
        )

    return M
end

and we test it

julia> nprocs()
1
julia> addprocs(4)
5
julia> get_accuracy(in_unit_circle_distributed2, N)
  3.955314820203171e-5

julia> @btime estimate_pi(in_unit_circle_distributed2, N);
  648.168 ms (304 allocations: 13.11 KiB)

Additional information on distributed computing

There is a lot more to say about distributed computing. Have a read in the docs but here are some things we want to mention. For example, Julia is able to define shared arrays. A shared array means the content can be accessed by each worker and it is consistent over all workers.

Furthermore, it is possible to have a cluster, e.g. a managed pool of workers. This cluster can be distributed on several machines. We can define various ways of accessing these machines so we can become really flexible about this and maybe start a worker on the laptop of a colleague. For this we have the ClusterManager package.

CC BY-SA 4.0 - Gregor Ehrensperger, Peter Kandolf, Jonas Kusch. Last modified: September 09, 2022. Website built with Franklin.jl and the Julia programming language.