WHY STUDENT PREFER US?  
4.9/5

5 Star Rating

93940

Orders Deliver

3949

PhD Experts

24x7

Support

100%

Privacy

100%

Top Quality

Sample Details

Computer Science Histogram Algorithm Using Mpi and Openmpi

Number Of View : 277

Download : 0

Pages: 9

Words : 2197

Question :

 

This is coding assignment. Need to implement 3 functions for Parallel sorting using Histogram algoritham using MPI and OpenMPI and c++.

 

Answer : 

 

#include "solution.h"

 

#include

#include

#include

#include

#include

#include

 

#include "basic_defs.h"

#include "databasics.h"

 

MPI_Datatype dtype = MPI_UNSIGNED_LONG_LONG;

 

// Get midpoint of two numbers

uint64_t getMid(const uint64_t &a, const uint64_t &b) {

    uint64_t ans = a / 2 + b / 2;

    if (((a % 2) + (b % 2)) == 2) ans++;

    return ans;

}

 

// Find the number of items in sorted data which are less than equal to key

uint64_t getHist(const dist_sort_t *data, const dist_sort_size_t n,

                 const dist_sort_t key) {

    uint64_t a, b, mid;

    a = 0;

    b = n;

    while (b > a) {

        mid = getMid(a, b);

        if (key == data[mid])

            break;

        else if (key < data[mid])

            b = mid;

        else

            a = mid + 1;

    }

    return mid + 1;

}

 

// Check if the hist is valid splitter, if it is then return the splitter index

uint64_t idealSplitter(dist_sort_size_t dataPerProc, dist_sort_t hist) {

    uint64_t thresh = dataPerProc / 200;

    uint64_t offset = hist % dataPerProc;

    // printf("hist: %lu, offset: %lu, threash: %lu\n", hist, offset, thresh);

    if (offset < thresh) {

        return (int)(hist / dataPerProc) - 1;

    } else if ((dataPerProc - offset) < thresh) {

        return hist / dataPerProc;

    } else

        return -1;

}

 

void rebalance(const dist_sort_t *data, const dist_sort_size_t myDataCount,

               dist_sort_t **rebalancedData, dist_sort_size_t *rCount) {

    // Get number of processes

    int nProcs;

    MPI_Comm_size(MPI_COMM_WORLD, &nProcs);

 

    // Get rank of the process

    int rank;

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

 

    dist_sort_size_t global_N;

    // Perform MPI all reduce to sum up all local_N's and get global_N

    MPI_Allreduce(&myDataCount, &global_N, 1, dtype, MPI_SUM, MPI_COMM_WORLD);

 

    uint64_t dataPerProc, limit;

    // Datacount per process

    dataPerProc = global_N / nProcs;

    // no of processes with extra data

    limit = global_N % nProcs;

    // if (rank == 0)

    //     printf("limit %lu, dataperproc %lu, global %lu\n", limit, dataPerProc,

    //            global_N);

 

    // assign datasize for curr process

    dist_sort_size_t myCount = dataPerProc + (rank < limit ? 1 : 0);

    // allocate array for output

    dist_sort_t *balanced =

        (dist_sort_t *)malloc(myCount * sizeof(dist_sort_t));

 

    // Global starting and ending index of this rank

    uint64_t myStartGlobal, myEnd;

    MPI_Exscan(&myDataCount, &myStartGlobal, 1, dtype, MPI_SUM, MPI_COMM_WORLD);

    if (rank == 0) myStartGlobal = 0;

    myEnd = myStartGlobal + myDataCount - 1;

 

    MPI_Win win;

    // create window for one way communication

    MPI_Win_create(balanced, myCount * sizeof(dist_sort_t), sizeof(dist_sort_t),

                   MPI_INFO_NULL, MPI_COMM_WORLD, &win);

 

    MPI_Win_fence(MPI_MODE_NOPRECEDE, win);

 

    uint64_t next = myStartGlobal;

    while (next <= myEnd) {

        uint64_t destProcs = dataPerProc + 1;  // data count in destination rank

        uint64_t dest = next / destProcs;      // destination rank

        uint64_t disp;                         // offset in destination rank

        uint64_t size;  // size to write to destination rank

        if (!(dest < limit)) {

            dest = (next - limit) / --destProcs;

            disp = (next - limit) % destProcs;

            size = std::min(destProcs * (dest + 1), myEnd + 1) - (next - limit);

        } else {

            disp = next % destProcs;

            size = std::min(destProcs * (dest + 1), myEnd + 1) - next;

        }

        // printf("\ndest %lu, next %lu\n", dest, next);

        // writing to destination rank

        MPI_Put(&data[next - myStartGlobal], size, dtype, dest, disp, size,

                dtype, win);

        next += size;

    }

    MPI_Win_fence(0, win);

    MPI_Win_fence(MPI_MODE_NOSUCCEED, win);

 

    // assigning rebalanced data

    *rebalancedData = balanced;

    *rCount = myCount;

}

 

void findSplitters(const dist_sort_t *data, const dist_sort_size_t data_size,

                   dist_sort_t *splitters, dist_sort_size_t *counts,

                   int numSplitters) {

    // Get number of processes

    int nProcs;

    MPI_Comm_size(MPI_COMM_WORLD, &nProcs);

 

    // Get rank of the process

    int rank;

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

 

    dist_sort_size_t global_N;

    // Perform MPI all reduce to sum up all local_N's and get global_N

    MPI_Allreduce(&data_size, &global_N, 1, dtype, MPI_SUM, MPI_COMM_WORLD);

 

    uint64_t dataPerProc;

    // Datacount per process

    dataPerProc = global_N / nProcs;

 

    // number of probes to use

    uint64_t k = 1000 * numSplitters;

    // number of unsatisfied splitters

    uint64_t splittersLeft = numSplitters;

 

    dist_sort_size_t prefixCounts[numSplitters];

 

    // assigning final splitter

    splitters[numSplitters - 1] = DIST_SORT_MAX;

    prefixCounts[numSplitters - 1] = global_N;

    // indicates satisfied splitters

    bool selected[numSplitters];

    splittersLeft--;

 

#pragma omp parallel for

    for (int i = 0; i < numSplitters - 1; i++) {

        selected[i] = false;

    }

    selected[numSplitters - 1] = true;

    // keys for probes

    dist_sort_t probes[k];

    // local histogram and global histogram

    dist_sort_t localHist[k], globalHist[k];

    if (0 == rank)

    // initialise probes

#pragma omp parallel for

        for (int i = 0; i < k; i++)

            probes[i] = (DIST_SORT_MAX / (k + 1)) * (i + 1);

 

    while (splittersLeft) {

        // printf("%ld ", splittersLeft);

        MPI_Barrier(MPI_COMM_WORLD);

        // send selected probes to all ranks

        MPI_Bcast(probes, k, dtype, 0, MPI_COMM_WORLD);

 

#pragma omp parallel for

        for (int i = 0; i < k; i++) {

            // calculate histogram for all probes

            localHist[i] = getHist(data, data_size, probes[i]);

            // printf("rank: %d, localhist: %lu\n", rank, localHist[i]);

        }

        MPI_Barrier(MPI_COMM_WORLD);

        // calculate global histogram for all probes

        MPI_Reduce(&localHist, &globalHist, k, dtype, MPI_SUM, 0,

                   MPI_COMM_WORLD);

 

        // int x;

        // std::cin >> x;

        if (0 == rank) {

#pragma omp parallel for

            for (int i = 0; i < k; i++) {

                int64_t idx = idealSplitter(dataPerProc, globalHist[i]);

                // std::cout << idx << std::endl;

                // std::cout << (idx > -1) << std::endl;

                // if (idx > -1) std::cout << splitters[idx] << std::endl;

                // std::cin >> x;

 

                // for each probe check if it is satisfied splitter

                if (idx > -1 && !selected[idx]) {

                    // printf("selected\n");

                    splitters[idx] = probes[i];

                    prefixCounts[idx] = globalHist[i];

                    selected[idx] = true;

                }

            }

            splittersLeft = 0;

            for (int i = 0; i < numSplitters; i++)

                if (!selected[i]) splittersLeft++;

            if (splittersLeft) {

                // if splitters are left unsatisfied, get new set of probes

                int a = -1, probeIdx = 0;

                bool running = false;

                for (int i = 0; i < numSplitters; i++) {

                    if (running) {

                        if (splitters[i] != -1) {

                            int numProbes = (i - a - 1) / splittersLeft * k;

                            dist_sort_t start = (a == -1) ? 0 : splitters[a];

                            dist_sort_t end = splitters[i];

                            for (int j = 0; j < numProbes && probeIdx < k;

                                 j++, probeIdx++) {

                                probes[probeIdx] = start + (end - start) /

                                                               (numProbes + 1) *

                                                               (j + 1);

                            }

                            running = false;

                            a = i;

                        }

                    } else {

                        if (splitters[i] != -1) {

                            a = i;

                        } else {

                            running = true;

                        }

                    }

                }

            }

        }

        MPI_Barrier(MPI_COMM_WORLD);

        // send unsatisfied splitters count to all ranks

        MPI_Bcast(&splittersLeft, 1, dtype, 0, MPI_COMM_WORLD);

    }

 

    if (0 == rank) {

        // printf("nloops: %d\n", nloops);

        // set counts for each bin

#pragma omp parallel for

        for (int i = numSplitters - 1; i > 0; i--) {

            counts[i] = prefixCounts[i] - prefixCounts[i - 1];

        }

        counts[0] = prefixCounts[0];

    }

 

    MPI_Barrier(MPI_COMM_WORLD);

    // send splitters to each rank

    MPI_Bcast(splitters, numSplitters, dtype, 0, MPI_COMM_WORLD);

 

    MPI_Barrier(MPI_COMM_WORLD);

    // send count to each rank

    MPI_Bcast(counts, numSplitters, dtype, 0, MPI_COMM_WORLD);

}

 

void moveData(const dist_sort_t *const sendData,

              const dist_sort_size_t sDataCount, dist_sort_t **recvData,

              dist_sort_size_t *rDataCount, const dist_sort_t *const splitters,

              const dist_sort_t *const counts, int numSplitters) {

    // Get number of processes

    int nProcs;

    MPI_Comm_size(MPI_COMM_WORLD, &nProcs);

 

    // Get rank of the process

    int rank;

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

 

    // number of local data points in each bin

    dist_sort_size_t bins[numSplitters], prefixBins[numSplitters];

#pragma omp parallel for

    for (int i = 0; i < numSplitters; i++)

        prefixBins[i] = getHist(sendData, sDataCount, splitters[i]);

 

#pragma omp parallel for

    for (int i = numSplitters - 1; i > 0; i--)

        bins[i] = prefixBins[i] - prefixBins[i - 1];

    bins[0] = prefixBins[0];

 

    // number of datapoints before this rank for each bin

    dist_sort_size_t myStartGlobal[numSplitters];

    MPI_Exscan(&bins, &myStartGlobal, numSplitters, dtype, MPI_SUM,

               MPI_COMM_WORLD);

    if (rank == 0)

#pragma omp parallel for

        for (int i = 0; i < numSplitters; i++) myStartGlobal[i] = 0;

 

    *recvData = (dist_sort_t *)malloc(counts[rank] * sizeof(dist_sort_t));

 

    *rDataCount = counts[rank];

 

    MPI_Win win;

 

    MPI_Win_create(*recvData, counts[rank] * sizeof(dist_sort_t),

                   sizeof(dist_sort_t), MPI_INFO_NULL, MPI_COMM_WORLD, &win);

 

    MPI_Win_fence(MPI_MODE_NOPRECEDE, win);

    uint64_t start = 0;

    for (int i = 0; i < numSplitters; i++) {

        uint64_t size = bins[i];

        // send data to respective rank

        MPI_Put(&sendData[start], size, dtype, i, myStartGlobal[i], size, dtype,

                win);

        start += bins[i];

    }

    MPI_Win_fence(0, win);

    MPI_Win_fence(MPI_MODE_NOSUCCEED, win);

}

 

void sort(dist_sort_t *data, dist_sort_size_t size) {

    // You are welcome to use this sort function.

    // If you wish to implement your own, you can do that too.

    // Don't use bubblesort.

    std::sort(data, data + size);

}

 

Place Order For A Top Grade Assignment Now

We have some amazing discount offers running for the students

Order Now

Get Help Instantly

    FREE FEATURES

    Limitless Amendments

    $09.50 free

    Bibliography

    $10.50 free

    Outline

    $05.00 free

    Title page

    $07.50 free

    Formatting

    $07.50 free

    Plagiarism Report

    $10.00 free

    Get all these features for $50.00

    free

    Latest Blog Boost your grades with expert tips and tricks from our academic blog.

    Learn How to Write an Argumentative Essay Outline

    Have you been asked to write an argumentative essay by your instructor? If yes, then before you begin writing an argumentative essay, first select a good […]

    90 Outstanding Literary Research Topics and Ideas

    Are you seeking compelling literary research topics? No matter whether you are creating a master’s thesis or writing a college essay, the topic you choose […]

    10 Best Essay Writing Apps for Students [2024]

    If you are a student, then during your scholastic life you will be required to write many types of essay assignments. Since essay writing involves […]

    View More Blogs

    Let's Talk

    Enter your email, and we shall get back to you in an hour.