Computer Science Histogram Algorithm Using Mpi and Openmpi
Question :
This is coding assignment. Need to implement 3 functions for Parallel sorting using Histogram algoritham using MPI and OpenMPI and c++.
Answer :
#include "solution.h"
#include
#include
#include
#include
#include
#include
#include "basic_defs.h"
#include "databasics.h"
MPI_Datatype dtype = MPI_UNSIGNED_LONG_LONG;
// Get midpoint of two numbers
uint64_t getMid(const uint64_t &a, const uint64_t &b) {
uint64_t ans = a / 2 + b / 2;
if (((a % 2) + (b % 2)) == 2) ans++;
return ans;
}
// Find the number of items in sorted data which are less than equal to key
uint64_t getHist(const dist_sort_t *data, const dist_sort_size_t n,
const dist_sort_t key) {
uint64_t a, b, mid;
a = 0;
b = n;
while (b > a) {
mid = getMid(a, b);
if (key == data[mid])
break;
else if (key < data[mid])
b = mid;
else
a = mid + 1;
}
return mid + 1;
}
// Check if the hist is valid splitter, if it is then return the splitter index
uint64_t idealSplitter(dist_sort_size_t dataPerProc, dist_sort_t hist) {
uint64_t thresh = dataPerProc / 200;
uint64_t offset = hist % dataPerProc;
// printf("hist: %lu, offset: %lu, threash: %lu\n", hist, offset, thresh);
if (offset < thresh) {
return (int)(hist / dataPerProc) - 1;
} else if ((dataPerProc - offset) < thresh) {
return hist / dataPerProc;
} else
return -1;
}
void rebalance(const dist_sort_t *data, const dist_sort_size_t myDataCount,
dist_sort_t **rebalancedData, dist_sort_size_t *rCount) {
// Get number of processes
int nProcs;
MPI_Comm_size(MPI_COMM_WORLD, &nProcs);
// Get rank of the process
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
dist_sort_size_t global_N;
// Perform MPI all reduce to sum up all local_N's and get global_N
MPI_Allreduce(&myDataCount, &global_N, 1, dtype, MPI_SUM, MPI_COMM_WORLD);
uint64_t dataPerProc, limit;
// Datacount per process
dataPerProc = global_N / nProcs;
// no of processes with extra data
limit = global_N % nProcs;
// if (rank == 0)
// printf("limit %lu, dataperproc %lu, global %lu\n", limit, dataPerProc,
// global_N);
// assign datasize for curr process
dist_sort_size_t myCount = dataPerProc + (rank < limit ? 1 : 0);
// allocate array for output
dist_sort_t *balanced =
(dist_sort_t *)malloc(myCount * sizeof(dist_sort_t));
// Global starting and ending index of this rank
uint64_t myStartGlobal, myEnd;
MPI_Exscan(&myDataCount, &myStartGlobal, 1, dtype, MPI_SUM, MPI_COMM_WORLD);
if (rank == 0) myStartGlobal = 0;
myEnd = myStartGlobal + myDataCount - 1;
MPI_Win win;
// create window for one way communication
MPI_Win_create(balanced, myCount * sizeof(dist_sort_t), sizeof(dist_sort_t),
MPI_INFO_NULL, MPI_COMM_WORLD, &win);
MPI_Win_fence(MPI_MODE_NOPRECEDE, win);
uint64_t next = myStartGlobal;
while (next <= myEnd) {
uint64_t destProcs = dataPerProc + 1; // data count in destination rank
uint64_t dest = next / destProcs; // destination rank
uint64_t disp; // offset in destination rank
uint64_t size; // size to write to destination rank
if (!(dest < limit)) {
dest = (next - limit) / --destProcs;
disp = (next - limit) % destProcs;
size = std::min(destProcs * (dest + 1), myEnd + 1) - (next - limit);
} else {
disp = next % destProcs;
size = std::min(destProcs * (dest + 1), myEnd + 1) - next;
}
// printf("\ndest %lu, next %lu\n", dest, next);
// writing to destination rank
MPI_Put(&data[next - myStartGlobal], size, dtype, dest, disp, size,
dtype, win);
next += size;
}
MPI_Win_fence(0, win);
MPI_Win_fence(MPI_MODE_NOSUCCEED, win);
// assigning rebalanced data
*rebalancedData = balanced;
*rCount = myCount;
}
void findSplitters(const dist_sort_t *data, const dist_sort_size_t data_size,
dist_sort_t *splitters, dist_sort_size_t *counts,
int numSplitters) {
// Get number of processes
int nProcs;
MPI_Comm_size(MPI_COMM_WORLD, &nProcs);
// Get rank of the process
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
dist_sort_size_t global_N;
// Perform MPI all reduce to sum up all local_N's and get global_N
MPI_Allreduce(&data_size, &global_N, 1, dtype, MPI_SUM, MPI_COMM_WORLD);
uint64_t dataPerProc;
// Datacount per process
dataPerProc = global_N / nProcs;
// number of probes to use
uint64_t k = 1000 * numSplitters;
// number of unsatisfied splitters
uint64_t splittersLeft = numSplitters;
dist_sort_size_t prefixCounts[numSplitters];
// assigning final splitter
splitters[numSplitters - 1] = DIST_SORT_MAX;
prefixCounts[numSplitters - 1] = global_N;
// indicates satisfied splitters
bool selected[numSplitters];
splittersLeft--;
#pragma omp parallel for
for (int i = 0; i < numSplitters - 1; i++) {
selected[i] = false;
}
selected[numSplitters - 1] = true;
// keys for probes
dist_sort_t probes[k];
// local histogram and global histogram
dist_sort_t localHist[k], globalHist[k];
if (0 == rank)
// initialise probes
#pragma omp parallel for
for (int i = 0; i < k; i++)
probes[i] = (DIST_SORT_MAX / (k + 1)) * (i + 1);
while (splittersLeft) {
// printf("%ld ", splittersLeft);
MPI_Barrier(MPI_COMM_WORLD);
// send selected probes to all ranks
MPI_Bcast(probes, k, dtype, 0, MPI_COMM_WORLD);
#pragma omp parallel for
for (int i = 0; i < k; i++) {
// calculate histogram for all probes
localHist[i] = getHist(data, data_size, probes[i]);
// printf("rank: %d, localhist: %lu\n", rank, localHist[i]);
}
MPI_Barrier(MPI_COMM_WORLD);
// calculate global histogram for all probes
MPI_Reduce(&localHist, &globalHist, k, dtype, MPI_SUM, 0,
MPI_COMM_WORLD);
// int x;
// std::cin >> x;
if (0 == rank) {
#pragma omp parallel for
for (int i = 0; i < k; i++) {
int64_t idx = idealSplitter(dataPerProc, globalHist[i]);
// std::cout << idx << std::endl;
// std::cout << (idx > -1) << std::endl;
// if (idx > -1) std::cout << splitters[idx] << std::endl;
// std::cin >> x;
// for each probe check if it is satisfied splitter
if (idx > -1 && !selected[idx]) {
// printf("selected\n");
splitters[idx] = probes[i];
prefixCounts[idx] = globalHist[i];
selected[idx] = true;
}
}
splittersLeft = 0;
for (int i = 0; i < numSplitters; i++)
if (!selected[i]) splittersLeft++;
if (splittersLeft) {
// if splitters are left unsatisfied, get new set of probes
int a = -1, probeIdx = 0;
bool running = false;
for (int i = 0; i < numSplitters; i++) {
if (running) {
if (splitters[i] != -1) {
int numProbes = (i - a - 1) / splittersLeft * k;
dist_sort_t start = (a == -1) ? 0 : splitters[a];
dist_sort_t end = splitters[i];
for (int j = 0; j < numProbes && probeIdx < k;
j++, probeIdx++) {
probes[probeIdx] = start + (end - start) /
(numProbes + 1) *
(j + 1);
}
running = false;
a = i;
}
} else {
if (splitters[i] != -1) {
a = i;
} else {
running = true;
}
}
}
}
}
MPI_Barrier(MPI_COMM_WORLD);
// send unsatisfied splitters count to all ranks
MPI_Bcast(&splittersLeft, 1, dtype, 0, MPI_COMM_WORLD);
}
if (0 == rank) {
// printf("nloops: %d\n", nloops);
// set counts for each bin
#pragma omp parallel for
for (int i = numSplitters - 1; i > 0; i--) {
counts[i] = prefixCounts[i] - prefixCounts[i - 1];
}
counts[0] = prefixCounts[0];
}
MPI_Barrier(MPI_COMM_WORLD);
// send splitters to each rank
MPI_Bcast(splitters, numSplitters, dtype, 0, MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD);
// send count to each rank
MPI_Bcast(counts, numSplitters, dtype, 0, MPI_COMM_WORLD);
}
void moveData(const dist_sort_t *const sendData,
const dist_sort_size_t sDataCount, dist_sort_t **recvData,
dist_sort_size_t *rDataCount, const dist_sort_t *const splitters,
const dist_sort_t *const counts, int numSplitters) {
// Get number of processes
int nProcs;
MPI_Comm_size(MPI_COMM_WORLD, &nProcs);
// Get rank of the process
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
// number of local data points in each bin
dist_sort_size_t bins[numSplitters], prefixBins[numSplitters];
#pragma omp parallel for
for (int i = 0; i < numSplitters; i++)
prefixBins[i] = getHist(sendData, sDataCount, splitters[i]);
#pragma omp parallel for
for (int i = numSplitters - 1; i > 0; i--)
bins[i] = prefixBins[i] - prefixBins[i - 1];
bins[0] = prefixBins[0];
// number of datapoints before this rank for each bin
dist_sort_size_t myStartGlobal[numSplitters];
MPI_Exscan(&bins, &myStartGlobal, numSplitters, dtype, MPI_SUM,
MPI_COMM_WORLD);
if (rank == 0)
#pragma omp parallel for
for (int i = 0; i < numSplitters; i++) myStartGlobal[i] = 0;
*recvData = (dist_sort_t *)malloc(counts[rank] * sizeof(dist_sort_t));
*rDataCount = counts[rank];
MPI_Win win;
MPI_Win_create(*recvData, counts[rank] * sizeof(dist_sort_t),
sizeof(dist_sort_t), MPI_INFO_NULL, MPI_COMM_WORLD, &win);
MPI_Win_fence(MPI_MODE_NOPRECEDE, win);
uint64_t start = 0;
for (int i = 0; i < numSplitters; i++) {
uint64_t size = bins[i];
// send data to respective rank
MPI_Put(&sendData[start], size, dtype, i, myStartGlobal[i], size, dtype,
win);
start += bins[i];
}
MPI_Win_fence(0, win);
MPI_Win_fence(MPI_MODE_NOSUCCEED, win);
}
void sort(dist_sort_t *data, dist_sort_size_t size) {
// You are welcome to use this sort function.
// If you wish to implement your own, you can do that too.
// Don't use bubblesort.
std::sort(data, data + size);
}