123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797 |
- #pragma once
- #include "udp_address.h"
- #if defined(_linux_)
- #include <contrib/libs/ibdrv/include/infiniband/verbs.h>
- #include <contrib/libs/ibdrv/include/rdma/rdma_cma.h>
- #endif
- namespace NNetliba {
- #define CHECK_Z(x) \
- { \
- int rv = (x); \
- if (rv != 0) { \
- fprintf(stderr, "check_z failed, errno = %d\n", errno); \
- Y_ABORT_UNLESS(0, "check_z"); \
- } \
- }
- //////////////////////////////////////////////////////////////////////////
- const int MAX_SGE = 1;
- const size_t MAX_INLINE_DATA_SIZE = 16;
- const int MAX_OUTSTANDING_RDMA = 10;
- #if defined(_linux_)
- class TIBContext: public TThrRefBase, TNonCopyable {
- ibv_context* Context;
- ibv_pd* ProtDomain;
- TMutex Lock;
- ~TIBContext() override {
- if (Context) {
- CHECK_Z(ibv_dealloc_pd(ProtDomain));
- CHECK_Z(ibv_close_device(Context));
- }
- }
- public:
- TIBContext(ibv_device* device) {
- Context = ibv_open_device(device);
- if (Context) {
- ProtDomain = ibv_alloc_pd(Context);
- }
- }
- bool IsValid() const {
- return Context != nullptr && ProtDomain != nullptr;
- }
- class TLock {
- TIntrusivePtr<TIBContext> Ptr;
- TGuard<TMutex> Guard;
- public:
- TLock(TPtrArg<TIBContext> ctx)
- : Ptr(ctx)
- , Guard(ctx->Lock)
- {
- }
- ibv_context* GetContext() {
- return Ptr->Context;
- }
- ibv_pd* GetProtDomain() {
- return Ptr->ProtDomain;
- }
- };
- };
- class TIBPort: public TThrRefBase, TNonCopyable {
- int Port;
- int LID;
- TIntrusivePtr<TIBContext> IBCtx;
- enum {
- MAX_GID = 16
- };
- ibv_gid MyGidArr[MAX_GID];
- public:
- TIBPort(TPtrArg<TIBContext> ctx, int port)
- : IBCtx(ctx)
- {
- ibv_port_attr portAttrs;
- TIBContext::TLock ibContext(IBCtx);
- CHECK_Z(ibv_query_port(ibContext.GetContext(), port, &portAttrs));
- Port = port;
- LID = portAttrs.lid;
- for (int i = 0; i < MAX_GID; ++i) {
- ibv_gid& dst = MyGidArr[i];
- Zero(dst);
- ibv_query_gid(ibContext.GetContext(), Port, i, &dst);
- }
- }
- int GetPort() const {
- return Port;
- }
- int GetLID() const {
- return LID;
- }
- TIBContext* GetCtx() {
- return IBCtx.Get();
- }
- void GetGID(ibv_gid* res) const {
- *res = MyGidArr[0];
- }
- int GetGIDIndex(const ibv_gid& arg) const {
- for (int i = 0; i < MAX_GID; ++i) {
- const ibv_gid& chk = MyGidArr[i];
- if (memcmp(&chk, &arg, sizeof(chk)) == 0) {
- return i;
- }
- }
- return 0;
- }
- void GetAHAttr(ibv_wc* wc, ibv_grh* grh, ibv_ah_attr* res) {
- TIBContext::TLock ibContext(IBCtx);
- CHECK_Z(ibv_init_ah_from_wc(ibContext.GetContext(), Port, wc, grh, res));
- }
- };
- class TComplectionQueue: public TThrRefBase, TNonCopyable {
- ibv_cq* CQ;
- TIntrusivePtr<TIBContext> IBCtx;
- ~TComplectionQueue() override {
- if (CQ) {
- CHECK_Z(ibv_destroy_cq(CQ));
- }
- }
- public:
- TComplectionQueue(TPtrArg<TIBContext> ctx, int maxCQEcount)
- : IBCtx(ctx)
- {
- TIBContext::TLock ibContext(IBCtx);
- /* ibv_cq_init_attr_ex attr;
- Zero(attr);
- attr.cqe = maxCQEcount;
- attr.cq_create_flags = 0;
- ibv_cq_ex *vcq = ibv_create_cq_ex(ibContext.GetContext(), &attr);
- if (vcq) {
- CQ = (ibv_cq*)vcq; // doubtful trick but that's life
- } else {*/
- // no completion channel
- // no completion vector
- CQ = ibv_create_cq(ibContext.GetContext(), maxCQEcount, nullptr, nullptr, 0);
- // }
- }
- ibv_cq* GetCQ() {
- return CQ;
- }
- int Poll(ibv_wc* res, int bufSize) {
- Y_ASSERT(bufSize >= 1);
- //struct ibv_wc
- //{
- // ui64 wr_id;
- // enum ibv_wc_status status;
- // enum ibv_wc_opcode opcode;
- // ui32 vendor_err;
- // ui32 byte_len;
- // ui32 imm_data;/* network byte order */
- // ui32 qp_num;
- // ui32 src_qp;
- // enum ibv_wc_flags wc_flags;
- // ui16 pkey_index;
- // ui16 slid;
- // ui8 sl;
- // ui8 dlid_path_bits;
- //};
- int rv = ibv_poll_cq(CQ, bufSize, res);
- if (rv < 0) {
- Y_ABORT_UNLESS(0, "ibv_poll_cq failed");
- }
- if (rv > 0) {
- //printf("Completed wr\n");
- //printf(" wr_id = %" PRIx64 "\n", wc.wr_id);
- //printf(" status = %d\n", wc.status);
- //printf(" opcode = %d\n", wc.opcode);
- //printf(" byte_len = %d\n", wc.byte_len);
- //printf(" imm_data = %d\n", wc.imm_data);
- //printf(" qp_num = %d\n", wc.qp_num);
- //printf(" src_qp = %d\n", wc.src_qp);
- //printf(" wc_flags = %x\n", wc.wc_flags);
- //printf(" slid = %d\n", wc.slid);
- }
- //rv = number_of_toggled_wc;
- return rv;
- }
- };
- //struct ibv_mr
- //{
- // struct ibv_context *context;
- // struct ibv_pd *pd;
- // void *addr;
- // size_t length;
- // ui32 handle;
- // ui32 lkey;
- // ui32 rkey;
- //};
- class TMemoryRegion: public TThrRefBase, TNonCopyable {
- ibv_mr* MR;
- TIntrusivePtr<TIBContext> IBCtx;
- ~TMemoryRegion() override {
- if (MR) {
- CHECK_Z(ibv_dereg_mr(MR));
- }
- }
- public:
- TMemoryRegion(TPtrArg<TIBContext> ctx, size_t len)
- : IBCtx(ctx)
- {
- TIBContext::TLock ibContext(IBCtx);
- int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; // TODO: IBV_ACCESS_ALLOCATE_MR
- MR = ibv_reg_mr(ibContext.GetProtDomain(), 0, len, access);
- Y_ASSERT(MR);
- }
- ui32 GetLKey() const {
- static_assert(sizeof(ui32) == sizeof(MR->lkey), "expect sizeof(ui32) == sizeof(MR->lkey)");
- return MR->lkey;
- }
- ui32 GetRKey() const {
- static_assert(sizeof(ui32) == sizeof(MR->lkey), "expect sizeof(ui32) == sizeof(MR->lkey)");
- return MR->lkey;
- }
- char* GetData() {
- return MR ? (char*)MR->addr : nullptr;
- }
- bool IsCovered(const void* data, size_t len) const {
- size_t dataAddr = reinterpret_cast<size_t>(data) / sizeof(char);
- size_t bufAddr = reinterpret_cast<size_t>(MR->addr) / sizeof(char);
- return (dataAddr >= bufAddr) && (dataAddr + len <= bufAddr + MR->length);
- }
- };
- class TSharedReceiveQueue: public TThrRefBase, TNonCopyable {
- ibv_srq* SRQ;
- TIntrusivePtr<TIBContext> IBCtx;
- ~TSharedReceiveQueue() override {
- if (SRQ) {
- ibv_destroy_srq(SRQ);
- }
- }
- public:
- TSharedReceiveQueue(TPtrArg<TIBContext> ctx, int maxWR)
- : IBCtx(ctx)
- {
- ibv_srq_init_attr attr;
- Zero(attr);
- attr.srq_context = this;
- attr.attr.max_sge = MAX_SGE;
- attr.attr.max_wr = maxWR;
- TIBContext::TLock ibContext(IBCtx);
- SRQ = ibv_create_srq(ibContext.GetProtDomain(), &attr);
- Y_ASSERT(SRQ);
- }
- ibv_srq* GetSRQ() {
- return SRQ;
- }
- void PostReceive(TPtrArg<TMemoryRegion> mem, ui64 id, const void* buf, size_t len) {
- Y_ASSERT(mem->IsCovered(buf, len));
- ibv_recv_wr wr, *bad;
- ibv_sge sg;
- sg.addr = reinterpret_cast<ui64>(buf) / sizeof(char);
- sg.length = len;
- sg.lkey = mem->GetLKey();
- Zero(wr);
- wr.wr_id = id;
- wr.sg_list = &sg;
- wr.num_sge = 1;
- CHECK_Z(ibv_post_srq_recv(SRQ, &wr, &bad));
- }
- };
- inline void MakeAH(ibv_ah_attr* res, TPtrArg<TIBPort> port, int lid, int serviceLevel) {
- Zero(*res);
- res->dlid = lid;
- res->port_num = port->GetPort();
- res->sl = serviceLevel;
- }
- void MakeAH(ibv_ah_attr* res, TPtrArg<TIBPort> port, const TUdpAddress& remoteAddr, const TUdpAddress& localAddr, int serviceLevel);
- class TAddressHandle: public TThrRefBase, TNonCopyable {
- ibv_ah* AH;
- TIntrusivePtr<TIBContext> IBCtx;
- ~TAddressHandle() override {
- if (AH) {
- CHECK_Z(ibv_destroy_ah(AH));
- }
- AH = nullptr;
- IBCtx = nullptr;
- }
- public:
- TAddressHandle(TPtrArg<TIBContext> ctx, ibv_ah_attr* attr)
- : IBCtx(ctx)
- {
- TIBContext::TLock ibContext(IBCtx);
- AH = ibv_create_ah(ibContext.GetProtDomain(), attr);
- Y_ASSERT(AH != nullptr);
- }
- TAddressHandle(TPtrArg<TIBPort> port, int lid, int serviceLevel)
- : IBCtx(port->GetCtx())
- {
- ibv_ah_attr attr;
- MakeAH(&attr, port, lid, serviceLevel);
- TIBContext::TLock ibContext(IBCtx);
- AH = ibv_create_ah(ibContext.GetProtDomain(), &attr);
- Y_ASSERT(AH != nullptr);
- }
- TAddressHandle(TPtrArg<TIBPort> port, const TUdpAddress& remoteAddr, const TUdpAddress& localAddr, int serviceLevel)
- : IBCtx(port->GetCtx())
- {
- ibv_ah_attr attr;
- MakeAH(&attr, port, remoteAddr, localAddr, serviceLevel);
- TIBContext::TLock ibContext(IBCtx);
- AH = ibv_create_ah(ibContext.GetProtDomain(), &attr);
- Y_ASSERT(AH != nullptr);
- }
- ibv_ah* GetAH() {
- return AH;
- }
- bool IsValid() const {
- return AH != nullptr;
- }
- };
- // GRH + wc -> address_handle_attr
- //int ibv_init_ah_from_wc(struct ibv_context *context, ui8 port_num,
- //struct ibv_wc *wc, struct ibv_grh *grh,
- //struct ibv_ah_attr *ah_attr)
- //ibv_create_ah_from_wc(struct ibv_pd *pd, struct ibv_wc *wc, struct ibv_grh
- // *grh, ui8 port_num)
- class TQueuePair: public TThrRefBase, TNonCopyable {
- protected:
- ibv_qp* QP;
- int MyPSN; // start packet sequence number
- TIntrusivePtr<TIBContext> IBCtx;
- TIntrusivePtr<TComplectionQueue> CQ;
- TIntrusivePtr<TSharedReceiveQueue> SRQ;
- TQueuePair(TPtrArg<TIBContext> ctx, TPtrArg<TComplectionQueue> cq, TPtrArg<TSharedReceiveQueue> srq,
- int sendQueueSize,
- ibv_qp_type qp_type)
- : IBCtx(ctx)
- , CQ(cq)
- , SRQ(srq)
- {
- MyPSN = GetCycleCount() & 0xffffff; // should be random and different on different runs, 24bit
- ibv_qp_init_attr attr;
- Zero(attr);
- attr.qp_context = this; // not really useful
- attr.send_cq = cq->GetCQ();
- attr.recv_cq = cq->GetCQ();
- attr.srq = srq->GetSRQ();
- attr.cap.max_send_wr = sendQueueSize;
- attr.cap.max_recv_wr = 0; // we are using srq, no need for qp's rq
- attr.cap.max_send_sge = MAX_SGE;
- attr.cap.max_recv_sge = MAX_SGE;
- attr.cap.max_inline_data = MAX_INLINE_DATA_SIZE;
- attr.qp_type = qp_type;
- attr.sq_sig_all = 1; // inline sends need not be signaled, but if they are not work queue overflows
- TIBContext::TLock ibContext(IBCtx);
- QP = ibv_create_qp(ibContext.GetProtDomain(), &attr);
- Y_ASSERT(QP);
- //struct ibv_qp {
- // struct ibv_context *context;
- // void *qp_context;
- // struct ibv_pd *pd;
- // struct ibv_cq *send_cq;
- // struct ibv_cq *recv_cq;
- // struct ibv_srq *srq;
- // ui32 handle;
- // ui32 qp_num;
- // enum ibv_qp_state state;
- // enum ibv_qp_type qp_type;
- // pthread_mutex_t mutex;
- // pthread_cond_t cond;
- // ui32 events_completed;
- //};
- //qp_context The value qp_context that was provided to ibv_create_qp()
- //qp_num The number of this Queue Pair
- //state The last known state of this Queue Pair. The actual state may be different from this state (in the RDMA device transitioned the state into other state)
- //qp_type The Transport Service Type of this Queue Pair
- }
- ~TQueuePair() override {
- if (QP) {
- CHECK_Z(ibv_destroy_qp(QP));
- }
- }
- void FillSendAttrs(ibv_send_wr* wr, ibv_sge* sg,
- ui64 localAddr, ui32 lKey, ui64 id, size_t len) {
- sg->addr = localAddr;
- sg->length = len;
- sg->lkey = lKey;
- Zero(*wr);
- wr->wr_id = id;
- wr->sg_list = sg;
- wr->num_sge = 1;
- if (len <= MAX_INLINE_DATA_SIZE) {
- wr->send_flags = IBV_SEND_INLINE;
- }
- }
- void FillSendAttrs(ibv_send_wr* wr, ibv_sge* sg,
- TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
- ui64 localAddr = reinterpret_cast<ui64>(data) / sizeof(char);
- ui32 lKey = 0;
- if (mem) {
- Y_ASSERT(mem->IsCovered(data, len));
- lKey = mem->GetLKey();
- } else {
- Y_ASSERT(len <= MAX_INLINE_DATA_SIZE);
- }
- FillSendAttrs(wr, sg, localAddr, lKey, id, len);
- }
- public:
- int GetQPN() const {
- if (QP)
- return QP->qp_num;
- return 0;
- }
- int GetPSN() const {
- return MyPSN;
- }
- // we are using srq
- //void PostReceive(const TMemoryRegion &mem)
- //{
- // ibv_recv_wr wr, *bad;
- // ibv_sge sg;
- // sg.addr = mem.Addr;
- // sg.length = mem.Length;
- // sg.lkey = mem.lkey;
- // Zero(wr);
- // wr.wr_id = 13;
- // wr.sg_list = sg;
- // wr.num_sge = 1;
- // CHECK_Z(ibv_post_recv(QP, &wr, &bad));
- //}
- };
- class TRCQueuePair: public TQueuePair {
- public:
- TRCQueuePair(TPtrArg<TIBContext> ctx, TPtrArg<TComplectionQueue> cq, TPtrArg<TSharedReceiveQueue> srq, int sendQueueSize)
- : TQueuePair(ctx, cq, srq, sendQueueSize, IBV_QPT_RC)
- {
- }
- // SRQ should have receive posted
- void Init(const ibv_ah_attr& peerAddr, int peerQPN, int peerPSN) {
- Y_ASSERT(QP->qp_type == IBV_QPT_RC);
- ibv_qp_attr attr;
- //{
- // enum ibv_qp_state qp_state;
- // enum ibv_qp_state cur_qp_state;
- // enum ibv_mtu path_mtu;
- // enum ibv_mig_state path_mig_state;
- // ui32 qkey;
- // ui32 rq_psn;
- // ui32 sq_psn;
- // ui32 dest_qp_num;
- // int qp_access_flags;
- // struct ibv_qp_cap cap;
- // struct ibv_ah_attr ah_attr;
- // struct ibv_ah_attr alt_ah_attr;
- // ui16 pkey_index;
- // ui16 alt_pkey_index;
- // ui8 en_sqd_async_notify;
- // ui8 sq_draining;
- // ui8 max_rd_atomic;
- // ui8 max_dest_rd_atomic;
- // ui8 min_rnr_timer;
- // ui8 port_num;
- // ui8 timeout;
- // ui8 retry_cnt;
- // ui8 rnr_retry;
- // ui8 alt_port_num;
- // ui8 alt_timeout;
- //};
- // RESET -> INIT
- Zero(attr);
- attr.qp_state = IBV_QPS_INIT;
- attr.pkey_index = 0;
- attr.port_num = peerAddr.port_num;
- // for connected QP
- attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_ATOMIC;
- CHECK_Z(ibv_modify_qp(QP, &attr,
- IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS));
- // INIT -> ReadyToReceive
- //PostReceive(mem);
- attr.qp_state = IBV_QPS_RTR;
- attr.path_mtu = IBV_MTU_512; // allows more fine grained VL arbitration
- // for connected QP
- attr.ah_attr = peerAddr;
- attr.dest_qp_num = peerQPN;
- attr.rq_psn = peerPSN;
- attr.max_dest_rd_atomic = MAX_OUTSTANDING_RDMA; // number of outstanding RDMA requests
- attr.min_rnr_timer = 12; // recommended
- CHECK_Z(ibv_modify_qp(QP, &attr,
- IBV_QP_STATE | IBV_QP_PATH_MTU |
- IBV_QP_AV | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER));
- // ReadyToReceive -> ReadyToTransmit
- attr.qp_state = IBV_QPS_RTS;
- // for connected QP
- attr.timeout = 14; // increased to 18 for sometime, 14 recommended
- //attr.retry_cnt = 0; // for debug purposes
- //attr.rnr_retry = 0; // for debug purposes
- attr.retry_cnt = 7; // release configuration
- attr.rnr_retry = 7; // release configuration (try forever)
- attr.sq_psn = MyPSN;
- attr.max_rd_atomic = MAX_OUTSTANDING_RDMA; // number of outstanding RDMA requests
- CHECK_Z(ibv_modify_qp(QP, &attr,
- IBV_QP_STATE |
- IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC));
- }
- void PostSend(TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
- ibv_send_wr wr, *bad;
- ibv_sge sg;
- FillSendAttrs(&wr, &sg, mem, id, data, len);
- wr.opcode = IBV_WR_SEND;
- //IBV_WR_RDMA_WRITE
- //IBV_WR_RDMA_WRITE_WITH_IMM
- //IBV_WR_SEND
- //IBV_WR_SEND_WITH_IMM
- //IBV_WR_RDMA_READ
- //wr.imm_data = xz;
- CHECK_Z(ibv_post_send(QP, &wr, &bad));
- }
- void PostRDMAWrite(ui64 remoteAddr, ui32 remoteKey,
- TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
- ibv_send_wr wr, *bad;
- ibv_sge sg;
- FillSendAttrs(&wr, &sg, mem, id, data, len);
- wr.opcode = IBV_WR_RDMA_WRITE;
- wr.wr.rdma.remote_addr = remoteAddr;
- wr.wr.rdma.rkey = remoteKey;
- CHECK_Z(ibv_post_send(QP, &wr, &bad));
- }
- void PostRDMAWrite(ui64 remoteAddr, ui32 remoteKey,
- ui64 localAddr, ui32 localKey, ui64 id, size_t len) {
- ibv_send_wr wr, *bad;
- ibv_sge sg;
- FillSendAttrs(&wr, &sg, localAddr, localKey, id, len);
- wr.opcode = IBV_WR_RDMA_WRITE;
- wr.wr.rdma.remote_addr = remoteAddr;
- wr.wr.rdma.rkey = remoteKey;
- CHECK_Z(ibv_post_send(QP, &wr, &bad));
- }
- void PostRDMAWriteImm(ui64 remoteAddr, ui32 remoteKey, ui32 immData,
- TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
- ibv_send_wr wr, *bad;
- ibv_sge sg;
- FillSendAttrs(&wr, &sg, mem, id, data, len);
- wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
- wr.imm_data = immData;
- wr.wr.rdma.remote_addr = remoteAddr;
- wr.wr.rdma.rkey = remoteKey;
- CHECK_Z(ibv_post_send(QP, &wr, &bad));
- }
- };
- class TUDQueuePair: public TQueuePair {
- TIntrusivePtr<TIBPort> Port;
- public:
- TUDQueuePair(TPtrArg<TIBPort> port, TPtrArg<TComplectionQueue> cq, TPtrArg<TSharedReceiveQueue> srq, int sendQueueSize)
- : TQueuePair(port->GetCtx(), cq, srq, sendQueueSize, IBV_QPT_UD)
- , Port(port)
- {
- }
- // SRQ should have receive posted
- void Init(int qkey) {
- Y_ASSERT(QP->qp_type == IBV_QPT_UD);
- ibv_qp_attr attr;
- // RESET -> INIT
- Zero(attr);
- attr.qp_state = IBV_QPS_INIT;
- attr.pkey_index = 0;
- attr.port_num = Port->GetPort();
- // for unconnected qp
- attr.qkey = qkey;
- CHECK_Z(ibv_modify_qp(QP, &attr,
- IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_QKEY));
- // INIT -> ReadyToReceive
- //PostReceive(mem);
- attr.qp_state = IBV_QPS_RTR;
- CHECK_Z(ibv_modify_qp(QP, &attr, IBV_QP_STATE));
- // ReadyToReceive -> ReadyToTransmit
- attr.qp_state = IBV_QPS_RTS;
- attr.sq_psn = 0;
- CHECK_Z(ibv_modify_qp(QP, &attr, IBV_QP_STATE | IBV_QP_SQ_PSN));
- }
- void PostSend(TPtrArg<TAddressHandle> ah, int remoteQPN, int remoteQKey,
- TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
- ibv_send_wr wr, *bad;
- ibv_sge sg;
- FillSendAttrs(&wr, &sg, mem, id, data, len);
- wr.opcode = IBV_WR_SEND;
- wr.wr.ud.ah = ah->GetAH();
- wr.wr.ud.remote_qpn = remoteQPN;
- wr.wr.ud.remote_qkey = remoteQKey;
- //IBV_WR_SEND_WITH_IMM
- //wr.imm_data = xz;
- CHECK_Z(ibv_post_send(QP, &wr, &bad));
- }
- };
- TIntrusivePtr<TIBPort> GetIBDevice();
- #else
- //////////////////////////////////////////////////////////////////////////
- // stub for OS without IB support
- //////////////////////////////////////////////////////////////////////////
- enum ibv_wc_opcode {
- IBV_WC_SEND,
- IBV_WC_RDMA_WRITE,
- IBV_WC_RDMA_READ,
- IBV_WC_COMP_SWAP,
- IBV_WC_FETCH_ADD,
- IBV_WC_BIND_MW,
- IBV_WC_RECV = 1 << 7,
- IBV_WC_RECV_RDMA_WITH_IMM
- };
- enum ibv_wc_status {
- IBV_WC_SUCCESS,
- // lots of errors follow
- };
- //struct ibv_device;
- //struct ibv_pd;
- union ibv_gid {
- ui8 raw[16];
- struct {
- ui64 subnet_prefix;
- ui64 interface_id;
- } global;
- };
- struct ibv_wc {
- ui64 wr_id;
- enum ibv_wc_status status;
- enum ibv_wc_opcode opcode;
- ui32 imm_data; /* in network byte order */
- ui32 qp_num;
- ui32 src_qp;
- };
- struct ibv_grh {};
- struct ibv_ah_attr {
- ui8 sl;
- };
- //struct ibv_cq;
- class TIBContext: public TThrRefBase, TNonCopyable {
- public:
- bool IsValid() const {
- return false;
- }
- //ibv_context *GetContext() { return 0; }
- //ibv_pd *GetProtDomain() { return 0; }
- };
- class TIBPort: public TThrRefBase, TNonCopyable {
- public:
- TIBPort(TPtrArg<TIBContext>, int) {
- }
- int GetPort() const {
- return 1;
- }
- int GetLID() const {
- return 1;
- }
- TIBContext* GetCtx() {
- return 0;
- }
- void GetGID(ibv_gid* res) {
- Zero(*res);
- }
- void GetAHAttr(ibv_wc*, ibv_grh*, ibv_ah_attr*) {
- }
- };
- class TComplectionQueue: public TThrRefBase, TNonCopyable {
- public:
- TComplectionQueue(TPtrArg<TIBContext>, int) {
- }
- //ibv_cq *GetCQ() { return 0; }
- int Poll(ibv_wc*, int) {
- return 0;
- }
- };
- class TMemoryRegion: public TThrRefBase, TNonCopyable {
- public:
- TMemoryRegion(TPtrArg<TIBContext>, size_t) {
- }
- ui32 GetLKey() const {
- return 0;
- }
- ui32 GetRKey() const {
- return 0;
- }
- char* GetData() {
- return 0;
- }
- bool IsCovered(const void*, size_t) const {
- return false;
- }
- };
- class TSharedReceiveQueue: public TThrRefBase, TNonCopyable {
- public:
- TSharedReceiveQueue(TPtrArg<TIBContext>, int) {
- }
- //ibv_srq *GetSRQ() { return SRQ; }
- void PostReceive(TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
- }
- };
- inline void MakeAH(ibv_ah_attr*, TPtrArg<TIBPort>, int, int) {
- }
- class TAddressHandle: public TThrRefBase, TNonCopyable {
- public:
- TAddressHandle(TPtrArg<TIBContext>, ibv_ah_attr*) {
- }
- TAddressHandle(TPtrArg<TIBPort>, int, int) {
- }
- TAddressHandle(TPtrArg<TIBPort>, const TUdpAddress&, const TUdpAddress&, int) {
- }
- //ibv_ah *GetAH() { return AH; }
- bool IsValid() {
- return true;
- }
- };
- class TQueuePair: public TThrRefBase, TNonCopyable {
- public:
- int GetQPN() const {
- return 0;
- }
- int GetPSN() const {
- return 0;
- }
- };
- class TRCQueuePair: public TQueuePair {
- public:
- TRCQueuePair(TPtrArg<TIBContext>, TPtrArg<TComplectionQueue>, TPtrArg<TSharedReceiveQueue>, int) {
- }
- // SRQ should have receive posted
- void Init(const ibv_ah_attr&, int, int) {
- }
- void PostSend(TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
- }
- void PostRDMAWrite(ui64, ui32, TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
- }
- void PostRDMAWrite(ui64, ui32, ui64, ui32, ui64, size_t) {
- }
- void PostRDMAWriteImm(ui64, ui32, ui32, TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
- }
- };
- class TUDQueuePair: public TQueuePair {
- TIntrusivePtr<TIBPort> Port;
- public:
- TUDQueuePair(TPtrArg<TIBPort>, TPtrArg<TComplectionQueue>, TPtrArg<TSharedReceiveQueue>, int) {
- }
- // SRQ should have receive posted
- void Init(int) {
- }
- void PostSend(TPtrArg<TAddressHandle>, int, int, TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
- }
- };
- inline TIntrusivePtr<TIBPort> GetIBDevice() {
- return 0;
- }
- #endif
- }
|