/
test_cq_events.py
39 lines (27 loc) · 972 Bytes
/
test_cq_events.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import errno
import unittest
from pyverbs.pyverbs_error import PyverbsRDMAError
from tests.base import RCResources, UDResources
from tests.base import RDMATestCase
from tests.utils import traffic
from pyverbs.cq import CQ, CompChannel
def create_cq_with_comp_channel(agr_obj):
agr_obj.comp_channel = CompChannel(agr_obj.ctx)
agr_obj.cq = CQ(agr_obj.ctx, agr_obj.num_msgs, None, agr_obj.comp_channel)
agr_obj.cq.req_notify()
class CqEventsUD(UDResources):
def create_cq(self):
create_cq_with_comp_channel(self)
class CqEventsRC(RCResources):
def create_cq(self):
create_cq_with_comp_channel(self)
class CqEventsTestCase(RDMATestCase):
def setUp(self):
super().setUp()
self.iters = 100
def test_cq_events_ud(self):
self.create_players(CqEventsUD)
traffic(**self.traffic_args)
def test_cq_events_rc(self):
self.create_players(CqEventsRC)
traffic(**self.traffic_args)