Source code for shift.mpiutils.mpiclass

import numpy as np

from typing import Any, List, Tuple, Optional, Union

from . import loops


[docs] class MPI: def __init__(self): """ Initialises MPI. """ from mpi4py import MPI as mpi self.mpi = mpi self.comm = mpi.COMM_WORLD self.rank = self.comm.Get_rank() self.size = self.comm.Get_size() self.loop_size = None self.mpi_info = "Proc " + str(self.rank + 1) + " of " + str(self.size)
[docs] def wait(self): """ Tells all jobs to wait -- to ensure jobs are synchronised. """ self.comm.Barrier()
[docs] def set_loop(self, loop_size: int) -> int: """ Sets the size of a distributed loop. Parameters ---------- loop_size : int Yields ------ Size of the MPI_loop. """ self.loop_size = loop_size return loops.get_MPI_loop_size(loop_size, self.size)
[docs] def mpi_ind2ind(self, mpi_ind: int) -> int: """ Converts the MPI_ind of a distributed loop to the index of a full loop. Parameters ---------- mpi_ind : int Yields ------ Index of a full loop. """ return loops.MPI_ind2ind(mpi_ind, self.rank, self.size, self.loop_size)
[docs] def clean_loop(self): """ Gets ride of loop_size definition. """ self.loop_size = None
[docs] def split( self, length: int, size: Optional[int] = None ) -> Tuple[np.ndarray, np.ndarray]: """ For splitting an array across nodes. Parameters ---------- Length: int Length of the array to be split. size: int Size of the MPI.size (i.e. MPI task), if set then this will be used to be split for other reasons. Returns ------- split1 : array_like The indices of the first element of the split array. split2 : array_like The indices of the last element of the split array. """ if size is None: split_equal = length / self.size else: split_equal = length / size split_floor = np.floor(split_equal) split_remain = split_equal - split_floor if size is None: counts = split_floor * np.ones(self.size) counts[: int(np.round(split_remain * self.size, decimals=0))] += 1 else: counts = split_floor * np.ones(size) counts[: int(np.round(split_remain * size, decimals=0))] += 1 counts = counts.astype("int") if size is None: splits = np.zeros(self.size + 1, dtype="int") else: splits = np.zeros(size + 1, dtype="int") splits[1:] = np.cumsum(counts) split1 = splits[:-1] split2 = splits[1:] return split1, split2
[docs] def split_array(self, array: np.ndarray) -> np.ndarray: """ Returns the values of the split array. Parameters ---------- array : array_like Array to be split. Yields ------ Split array. """ split1, split2 = self.split(len(array)) return array[split1[self.rank] : split2[self.rank]]
[docs] def check_partition( self, NDshape: List[int], NDshape_split: List[int] ) -> np.ndarray: """ Returns a boolean array showing the axes that an array will be split along. Parameters ---------- NDshape : list The shape of the N-dimensional array. NDshape_split : list The shape of the N-dimensional split array. Yields ------ Boolean array showing whether array will not be split along a said axes. """ return np.array([NDshape[i] == NDshape_split[i] for i in range(len(NDshape))])
# TODO: Remove these two functions -- pretty certain these are defunct and no longer used elsewhere. # def create_split_ndarray(self, arrays_nd: np.ndarray, whichaxis: List[bool]) -> np.ndarray: # """ # Splits a list of 1D arrays based on the data partitioning scheme. To be used with # create_split_ngrid. # Parameters # ---------- # arrays_nd : array_like # List of 1D arrays to be split. # whichaxis : array_like # Boolean array showing whether array will not be split along a said axes. # Returns # ------- # split_arrays : array_like # Split list of 1D array. # """ # split_arrays = [] # for i in range(0, len(arrays_nd)): # _array = arrays_nd[i] # if whichaxis[i] == False: # _array = self.split_array(_array) # split_arrays.append(_array) # else: # split_arrays.append(_array) # return split_arrays # def create_split_ndgrid(self, arrays_nd: np.ndarray, whichaxis: List[bool]) -> np.ndarray: # """ # Creates a partitioned gridded data set. # Parameters # ---------- # arrays_nd : array_like # List of arrays to be split. # whichaxis : array_like # Boolean array showing whether array will not be split along a said axes. # Returns # ------- # split_grid : array_like # N-dimensional split array. # """ # split_arrays = self.create_split_ndarray(arrays_nd, whichaxis) # split_grid = np.meshgrid(*split_arrays, indexing='ij') # return split_grid
[docs] def mpi_print(self, *value: Any) -> None: """ Python print function using flush=True so print statements are outputed immediately in an MPI setting. """ print(*value, flush=True)
[docs] def mpi_print_zero(self, *value: Any) -> None: """ Prints only at node rank = 0. """ if self.rank == 0: self.mpi_print(*value)
[docs] def send( self, data: np.ndarray, to_rank: Optional[int] = None, tag: int = 11 ) -> None: """ Sends data from current core to other specified or all cores. Parameters ---------- data : array Data to send. to_rank : int, optional Specify rank to send data to, or leave as None to send to all cores. tag : int, optional Sending tag to ensure the right data is being transfered. """ if to_rank is not None: self.comm.send(data, dest=to_rank, tag=tag) else: for i in range(0, self.size): if i != self.rank: self.comm.send(data, dest=i, tag=tag)
[docs] def recv(self, from_rank: int, tag: int = 11) -> np.ndarray: """ Receive data from another node. Parameters ---------- from_rank : int Source of the data. tag : int Sending tag to ensure the right data is being transfered. Returns ------- data : array Data received. """ data = self.comm.recv(source=from_rank, tag=tag) return data
[docs] def broadcast(self, data: Any) -> Any: """ Broadcast data from rank=0 to all nodes. """ if self.rank == 0: self.send(data, tag=11) else: data = self.recv(0, tag=11) self.wait() return data
[docs] def send_up(self, data: Any) -> Any: # pragma: no cover """ Send data from each node to the node above. """ datain = np.copy(data) if self.rank < self.size - 1: self.send(datain, to_rank=self.rank + 1, tag=10 + self.rank) if self.rank > 0: dataout = self.recv(self.rank - 1, tag=10 + self.rank - 1) self.wait() if self.rank == self.size - 1: self.send(datain, to_rank=0, tag=10 + self.size) if self.rank == 0: dataout = self.recv(self.size - 1, tag=10 + self.size) self.wait() return dataout
[docs] def isend_up(self, data: Any) -> Any: """ Send data from each node to the node above (rank+1, wrapping around). """ datain = np.copy(data) # Non-blocking receive from below if self.rank > 0: req_recv = self.comm.irecv(source=self.rank - 1, tag=10 + self.rank - 1) else: req_recv = self.comm.irecv(source=self.size - 1, tag=10 + self.size) # Non-blocking send upward if self.rank < self.size - 1: req_send = self.comm.isend( obj=datain, dest=self.rank + 1, tag=10 + self.rank ) else: req_send = self.comm.isend(obj=datain, dest=0, tag=10 + self.size) # Wait for receive and send to complete dataout = req_recv.wait() req_send.wait() return dataout
[docs] def send_down(self, data: Any) -> Any: # pragma: no cover """ Send data from each node to the node below. """ datain = np.copy(data) if self.rank > 0: self.send(datain, to_rank=self.rank - 1, tag=20 + self.rank) if self.rank < self.size - 1: dataout = self.recv(self.rank + 1, tag=20 + self.rank + 1) self.wait() if self.rank == self.size - 1: dataout = self.recv(0, tag=20 + self.size) if self.rank == 0: self.send(datain, to_rank=self.size - 1, tag=20 + self.size) self.wait() return dataout
[docs] def isend_down(self, data: Any) -> Any: """ Send data from each node to the node below (rank-1, wrapping around). """ datain = np.copy(data) # Non-blocking receive from above if self.rank < self.size - 1: req_recv = self.comm.irecv(source=self.rank + 1, tag=20 + self.rank + 1) else: req_recv = self.comm.irecv(source=0, tag=20 + self.size) # Non-blocking send downward if self.rank > 0: req_send = self.comm.isend( obj=datain, dest=self.rank - 1, tag=20 + self.rank ) else: req_send = self.comm.isend( obj=datain, dest=self.size - 1, tag=20 + self.size ) # Wait for receive and send to complete dataout = req_recv.wait() req_send.wait() return dataout
[docs] def collect(self, data: np.ndarray, outlist: bool = False) -> np.ndarray: """ Collects a distributed data to the processor with rank=0. Parameters ---------- data : array Distributed data set. outlist : bool, optional If outlist is False, we collect and concatenate, if True then we do not concatenate the list. """ if np.isscalar(data): data = np.array([data]) if self.rank == 0: datas = [data] for i in range(1, self.size): _data = self.recv(i, tag=10 + i) datas.append(_data) if outlist is False: data = np.concatenate(datas) else: data = datas else: self.send(data, to_rank=0, tag=10 + self.rank) data = None self.wait() return data
[docs] def collect_noNone(self, data: np.ndarray) -> np.ndarray: """ Same as collect function, but removes data=None to the combined data set. Parameters ---------- data : array Distributed data set. """ _datas = self.collect(data, outlist=True) if self.rank == 0: datas = [] for i in range(0, len(_datas)): if _datas[i] is not None: datas.append(_datas[i]) datas = np.concatenate(datas) else: datas = None return datas
[docs] def distribute(self, data: np.ndarray) -> np.ndarray: """ Distribute and split data from rank 0. Parameters ---------- data : array Full data set which will be split across the cores. """ if self.rank == 0: split1, split2 = self.split(len(data)) for i in range(1, len(split1)): self.send(data[split1[i] : split2[i]], to_rank=i, tag=10 + i) data = data[split1[0] : split2[0]] else: data = self.recv(0, tag=10 + self.rank) self.wait() return data
[docs] def sum( self, data: Union[np.ndarray, int, float] ) -> Union[np.ndarray, int, float, None]: """ Sums a distributed data set to the processor with rank=0. Parameters ---------- data : array distributed data set. """ if self.rank == 0: for i in range(1, self.size): _data = self.recv(i, tag=10 + i) data += _data else: self.send(data, to_rank=0, tag=10 + self.rank) data = None self.wait() return data
[docs] def mean(self, data: np.ndarray) -> Union[float, int]: """ Finds the mean of a distributed data set, which is broadcasted to all nodes. Parameters ---------- data : array distributed data set. """ total_data = self.sum(np.sum(data)) self.wait() total_elem = self.sum(len(data.flatten())) self.wait() if self.rank == 0: mean = total_data / total_elem self.send(mean, tag=11) else: mean = self.recv(0, tag=11) self.wait() return mean
[docs] def min(self, data: np.ndarray) -> Union[float, int]: """ Finds the minimum of a distributed data set, which is broadcasted to all nodes. Parameters ---------- data : array distributed data set. """ mins = self.collect(np.min(data)) if self.rank == 0: minval = np.min(mins) self.send(minval, tag=11) else: minval = self.recv(0, tag=11) self.wait() return minval
[docs] def max(self, data: np.ndarray) -> Union[float, int]: """ Finds the maximum of a distributed data set, which is broadcasted to all nodes. Parameters ---------- data : array distributed data set. """ maxs = self.collect(np.max(data)) if self.rank == 0: maxval = np.max(maxs) self.send(maxval, tag=11) else: maxval = self.recv(0, tag=11) self.wait() return maxval
[docs] def end(self): # pragma: no cover """ Ends MPI environment. """ self.mpi.Finalize()