Tuesday, May 3, 2011

MPI. Сбор блоков в матрицу.

Приветствую. В продолжение темы о параллельном программировании и топика Умножение матрицы на вектор, я бы хотел рассказать о том, как я безуспешно боролся с MPI_Gather и MPI_Gatherv и о том, как получил собственный *_Gather*.

Если интересно, прошу.

Что?
Если вы смотрели предыдущую статью, то заметили как "мастерски" мы раздавали всем процессам блоки исходной матрицы, но как быть, если мы хотим осуществить обратный процесс, а именно сбор матрицы из блоков. Все как и прежде, ведущим процессом будет нулевой, т.е. именно он получит исходную матрицу.

Как?
Вопрос за вопросом, Да, мы имеем в распоряжении MPI_Gather и MPI_Gatherv и конечно же MPI_Type_vector в сочетании с MPI_Type_commit, но увы, или я не понял почему наш blockType (см. предыдущую статью) не сработал корректно, или может MPI_Gather* не тянет подобное?

Почему бы нет?
Исходя из того, что MPI_Gather* основан на тех же MPI_Send + MPI_Recv + обработка массивов, то почему бы не сделать что-то специализированное, но свое? На этом я и остановился. А именно, на входе блок и коммуникатор, а на выходе матрица, но только у ведущего. Как обычно, давайте сразу смотреть по порядку и параллельно:
int MPI_Gather_block(MATRIX block, MATRIX result, MPI_Comm comm) {
    int i, j, n, rank, ret, numberOfProcesses;
    MATRIX *allBlocks; // все блоки у ведущего от всех процессов
    MPI_Request *requests; // асинхронные запросы на получение блоков
    
    // как обычно
    MPI_Comm_rank(MPI_COMM_WORLD, &rank); // номер процесса
    MPI_Comm_size(MPI_COMM_WORLD, &numberOfProcesses); // кол-во рабочих процессов
    
    // если ведущий, подготовимся к получению всех блоков
    if (rank == 0) {
        // ссылки на блоки, блоков у нас столько же сколько и процессов
        allBlocks = (MATRIX*)malloc(numberOfProcesses * sizeof(MATRIX));
        if (allBlocks == NULL) { // видимо маловато памяти
            return MATRIX_ERROR_NOT_ENOUGH_MEMORY;
        }
        
        // аналогично кол-ву блоков, столько же и запросов
        requests = (MPI_Request*)malloc(numberOfProcesses * block->rows * sizeof(MPI_Request));
        if (requests == NULL) {
            free(allBlocks); // ну очень мало памяти...
            return MATRIX_ERROR_NOT_ENOUGH_MEMORY;
        }
        
        // бегаем по всем блокам и создаем блоки-матрицы
        for (i = 0; i < numberOfProcesses; i++) {
            ret = createMatrix(&allBlocks[i], block->columns, block->rows);
            if (ret != MATRIX_SUCCESS) { // ну что можно сказать, без проверок никуда
                for (j = i; j >= 0; j--) {
                    freeMatrix(&allBlocks[j]);
                }
                free(allBlocks);
                free(requests);
                return ret;
            }
            
            // запрашиваем блок с соответствующим номером процесса, разумеется асинхронно
            for (j = 0; j < block->rows; j++) {
                MPI_Irecv(&allBlocks[i]->items[j * block->columns], block->columns, MATRIX_MPI_TYPE, MPI_ANY_SOURCE, i * block->rows + j, 
                          MPI_COMM_WORLD, &requests[i * block->rows + j]);
            }
        }
    }
    
    // если интересно что за блок процесс отправляет
    // printf("%d: ", rank); printMatrix(block, "B");
    
    // шлем блок, но строками, не все сразу
    for (i = 0; i < block->rows; i++) {
        MPI_Send(&block->items[i * block->columns], block->columns, MATRIX_MPI_TYPE, 0, rank * block->rows + i, MPI_COMM_WORLD);
    }
    
    // если ведущий, то ожидаем все запросы на блоки
    if (rank == 0) {
        MPI_Waitall(numberOfProcesses * block->rows, requests, MPI_STATUSES_IGNORE);
        free(requests);
  
        // поверьте, то что дальше, было высчитано всего за какие то часы
        // "ничего особенного" здесь нет, просто строку блока с номером процесса
        // копируем в соответствующее место в результирующей матрице
        for (i = 0; i < result->columns / block->columns; i++) {
            for (j = 0; j < result->rows / block->rows; j++) {
                for (n = 0; n < block->rows; n++) {
                    memcpy(&result->items[((i * block->columns + j) % block->rows) * block->columns + 
                                          ((i * block->columns + j) / block->rows) * block->rows * result->columns +
                                          n * result->columns],
                           &allBlocks[i * block->columns + j]->items[n * block->columns],
                           block->columns * sizeof(MATRIX_TYPE));
                }
            }
        }
        
        // сначала удаляем все матрицы
        for (i = 0; i < numberOfProcesses; i++) {
            freeMatrix(&allBlocks[i]);
        }
        // а затем и память выделенную под ссылки на матрицы
        free(allBlocks);
    }
    
    // ура, успех
    return MATRIX_SUCCESS;
}
Как то так, разумеется найти ошибку здесь просто, но вроде то, что я решал, решал успешно.

Спасибо и до встречи, как только что-то интересное будет, напишу.

No comments:

Post a Comment