Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message exchange failure when perform alltoallv (cpus) #334

Open
kylasa opened this issue Jul 19, 2022 · 0 comments
Open

Message exchange failure when perform alltoallv (cpus) #334

kylasa opened this issue Jul 19, 2022 · 0 comments

Comments

@kylasa
Copy link

kylasa commented Jul 19, 2022

When performing an alltoallv message exchange on cpus results in the following error


terminate called after throwing an instance of 'gloo::EnforceNotMet'
what(): [enforce fail at ../third_party/gloo/gloo/transport/tcp/pair.cc:490] op.preamble.length <= op.nbytes. 881392472 vs 881392448

This error is reproducible and a standalone python script is included in this Issue report submission.
This is a very simple script which uses 10 machines/10-processes to reproduce this above mentioned error.
I used 10 machine cluster to reproduce this error repeatedly... however I guess the same may happen on a single machine using 10 processes.

This script performs the following tasks

  1. create the processgroup with 10 ranks
  2. exchanges no of int64's which will be exchanged, this no. is used on the receiving side to allocate buffers.
  3. once the buffers are allocated alltoallv (which is included in the standalone script) is performed to exchange int64's.
  4. The error happens when performing alltoallv using cpus.

Some observations about this error:

  1. This error happens when sending large messages. The same piece of logic works when smaller messages were sent.
  2. This standalone script is created by mimic'ing some of the functionality in the application I am working on at the moment.
  3. The hardcoded no. of int64's is one such instance when this error is deterministically reproducible.

Please use the following standalone script

import numpy as np
import argparse
import torch 
import os
import time
from datetime import timedelta
import torch.distributed as dist
from timeit import default_timer as timer
from datetime import timedelta


def alltoall_cpu(rank, world_size, output_tensor_list, input_tensor_list):
    input_tensor_list = [tensor.to(torch.device('cpu')) for tensor in input_tensor_list]
    for i in range(world_size):
        dist.scatter(output_tensor_list[i], input_tensor_list if i == rank else [], src=i)


def alltoallv_cpu(rank, world_size, output_tensor_list, input_tensor_list):
    senders = []
    for i in range(world_size):
        if i == rank:
            output_tensor_list[i] = input_tensor_list[i].to(torch.device('cpu'))
        else:
            sender = dist.isend(input_tensor_list[i].to(torch.device('cpu')), dst=i, tag=i)
            senders.append(sender)

    for i in range(world_size):
        if i != rank:
            dist.recv(output_tensor_list[i], src=i, tag=i)

    torch.distributed.barrier()

def splitdata_exec(rank, world_size):
    int64_counts = np.array([
        [0,         110105856, 110093280, 110116272, 110097840, 110111128, 110174059, 110087008, 110125040, 110087400],#0 
        [110174059, 0,         110158903, 110160317, 110149564, 110170899, 110166538, 110139263, 110163283, 110154040],#1
        [110251793, 110254110, 0,         110243087, 110249640, 110270594, 110248594, 110249172, 110277587, 110242484],#2
        [110191018, 110171210, 110170046, 0,         110167632, 110165475, 110174676, 110158908, 110171609, 110158631],#3 
        [110197278, 110198689, 110193780, 110198301, 0,         110208663, 110184046, 110194628, 110200308, 110168337],#4 
        [110256343, 110244546, 110248884, 110255858, 110236621, 0,         110247954, 110246921, 110247543, 110243309],#5 
        [110113348, 109915976, 109891208, 109908240, 109916552, 109917544, 0,         109893592, 109930888, 109895912],#6 
        [110024052, 109995591, 110003242, 110013125, 110002038, 110013278, 110003047, 0,         110015547, 109981915],#7 
        [109936439, 109948208, 109937391, 109936696, 109930888, 109941325, 109940259, 109917662, 0,         109917002],#8 
        [110050394, 110029327, 110036926, 110043437, 110021664, 110051453, 110036305, 110039768, 110054324, 0],#9
        ])

    start = timer()

    sizes = int64_counts[rank]
    print('[Rank: ', rank, '] outgoing int64 counts: ', sizes)

    # buffer sizes send/recv
    send_counts = list(torch.Tensor(sizes).type(dtype=torch.int64).chunk(world_size))
    recv_counts = list(torch.zeros([world_size], dtype=torch.int64).chunk(world_size))
    alltoall_cpu(rank, world_size, recv_counts, send_counts)

    #allocate buffers
    recv_nodes = []
    for i in recv_counts:
        recv_nodes.append(torch.zeros(i.tolist(), dtype=torch.int64))

    #form the outgoing message
    send_nodes = []
    for i in range(world_size):
        # sending 
        d = np.ones(shape=(sizes[i]), dtype=np.int64)*rank
        send_nodes.append(torch.from_numpy(d))

    alltoallv_cpu(rank, world_size, recv_nodes, send_nodes)
    end = timer()

    for i in range(world_size): 
        data = recv_nodes[i].numpy()
        assert np.all(data == np.ones(data.shape, dtype=np.int64)*i)

    print('[Rank: ', rank, '] Done with the test...')

def multi_dev_proc_init(params):
    rank = int(os.environ["RANK"])
    dist.init_process_group("gloo", rank=rank, world_size=params.world_size, timeout=timedelta(seconds=5*60))
    splitdata_exec(rank, params.world_size)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Construct graph partitions')
    parser.add_argument('--world-size', help='no. of processes to spawn', default=1, type=int, required=True)
    params = parser.parse_args()
    multi_dev_proc_init(params)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant