1
0
mirror of https://github.com/corundum/corundum.git synced 2025-01-16 08:12:53 +08:00

fpga/common/tb: Rework driver model to better match C code

Signed-off-by: Alex Forencich <alex@alexforencich.com>
This commit is contained in:
Alex Forencich 2023-03-31 17:44:06 -07:00
parent ec1d7fe904
commit d06fbaf178

View File

@ -384,16 +384,23 @@ class Packet:
class EqRing:
def __init__(self, interface, size, stride, index, hw_regs):
def __init__(self, interface, index, hw_regs):
self.interface = interface
self.log = interface.log
self.driver = interface.driver
self.log_size = size.bit_length() - 1
self.size = 2**self.log_size
self.size_mask = self.size-1
self.stride = stride
self.log_size = 0
self.size = 0
self.size_mask = 0
self.stride = 0
self.index = index
self.interrupt_index = 0
self.active = False
self.buf_size = 0
self.buf_region = None
self.buf_dma = 0
self.buf = None
self.irq = None
self.head_ptr = 0
self.tail_ptr = 0
@ -404,11 +411,27 @@ class EqRing:
async def init(self):
self.log.info("Init EqRing %d (interface %d)", self.index, self.interface.index)
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_ACTIVE_LOG_SIZE_REG, 0) # active, log size
async def alloc(self, size, stride):
if self.active:
raise Exception("Cannot allocate active ring")
if self.buf:
raise Exception("Already allocated")
self.log_size = size.bit_length() - 1
self.size = 2**self.log_size
self.size_mask = self.size-1
self.stride = stride
self.buf_size = self.size*self.stride
self.buf_region = self.driver.pool.alloc_region(self.buf_size)
self.buf_dma = self.buf_region.get_absolute_address(0)
self.buf = self.buf_region.mem
self.head_ptr = 0
self.tail_ptr = 0
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_ACTIVE_LOG_SIZE_REG, 0) # active, log size
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_BASE_ADDR_REG, self.buf_dma & 0xffffffff) # base address
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_BASE_ADDR_REG+4, self.buf_dma >> 32) # base address
@ -417,20 +440,37 @@ class EqRing:
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_TAIL_PTR_REG, self.tail_ptr & self.hw_ptr_mask) # tail pointer
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_ACTIVE_LOG_SIZE_REG, self.log_size) # active, log size
async def activate(self, int_index):
async def free(self):
await self.deactivate()
if self.buf:
# TODO
pass
async def activate(self, irq):
self.log.info("Activate EqRing %d (interface %d)", self.index, self.interface.index)
self.interrupt_index = int_index
await self.deactivate()
self.irq = irq
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_ACTIVE_LOG_SIZE_REG, 0) # active, log size
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_INTERRUPT_INDEX_REG, int_index) # interrupt index
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_BASE_ADDR_REG, self.buf_dma & 0xffffffff) # base address
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_BASE_ADDR_REG+4, self.buf_dma >> 32) # base address
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_INTERRUPT_INDEX_REG, irq) # interrupt index
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_HEAD_PTR_REG, self.head_ptr & self.hw_ptr_mask) # head pointer
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_TAIL_PTR_REG, self.tail_ptr & self.hw_ptr_mask) # tail pointer
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_ACTIVE_LOG_SIZE_REG, self.log_size | MQNIC_EVENT_QUEUE_ACTIVE_MASK) # active, log size
self.active = True
async def deactivate(self):
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_ACTIVE_LOG_SIZE_REG, self.log_size) # active, log size
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_INTERRUPT_INDEX_REG, self.interrupt_index) # interrupt index
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_INTERRUPT_INDEX_REG, 0) # interrupt index
self.irq = None
self.active = False
def empty(self):
return self.head_ptr == self.tail_ptr
@ -446,9 +486,12 @@ class EqRing:
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_TAIL_PTR_REG, self.tail_ptr & self.hw_ptr_mask)
async def arm(self):
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_INTERRUPT_INDEX_REG, self.interrupt_index | MQNIC_EVENT_QUEUE_ARM_MASK) # interrupt index
if not self.active:
return
async def process(self):
await self.hw_regs.write_dword(MQNIC_EVENT_QUEUE_INTERRUPT_INDEX_REG, self.irq | MQNIC_EVENT_QUEUE_ARM_MASK) # interrupt index
async def process_eq(self):
if not self.interface.port_up:
return
@ -469,12 +512,12 @@ class EqRing:
if event_data[0] == 0:
# transmit completion
cq = self.interface.tx_cpl_queues[event_data[1]]
await self.interface.process_tx_cq(cq)
await cq.handler(cq)
await cq.arm()
elif event_data[0] == 1:
# receive completion
cq = self.interface.rx_cpl_queues[event_data[1]]
await self.interface.process_rx_cq(cq)
await cq.handler(cq)
await cq.arm()
eq_tail_ptr += 1
@ -485,17 +528,26 @@ class EqRing:
class CqRing:
def __init__(self, interface, size, stride, index, hw_regs):
def __init__(self, interface, index, hw_regs):
self.interface = interface
self.log = interface.log
self.driver = interface.driver
self.log_size = size.bit_length() - 1
self.size = 2**self.log_size
self.size_mask = self.size-1
self.stride = stride
self.log_size = 0
self.size = 0
self.size_mask = 0
self.stride = 0
self.index = index
self.interrupt_index = 0
self.ring_index = 0
self.active = False
self.buf_size = 0
self.buf_region = None
self.buf_dma = 0
self.buf = None
self.eq = None
self.src_ring = None
self.handler = None
self.head_ptr = 0
self.tail_ptr = 0
@ -506,11 +558,27 @@ class CqRing:
async def init(self):
self.log.info("Init CqRing %d (interface %d)", self.index, self.interface.index)
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_ACTIVE_LOG_SIZE_REG, 0) # active, log size
async def alloc(self, size, stride):
if self.active:
raise Exception("Cannot allocate active ring")
if self.buf:
raise Exception("Already allocated")
self.log_size = size.bit_length() - 1
self.size = 2**self.log_size
self.size_mask = self.size-1
self.stride = stride
self.buf_size = self.size*self.stride
self.buf_region = self.driver.pool.alloc_region(self.buf_size)
self.buf_dma = self.buf_region.get_absolute_address(0)
self.buf = self.buf_region.mem
self.head_ptr = 0
self.tail_ptr = 0
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_ACTIVE_LOG_SIZE_REG, 0) # active, log size
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_BASE_ADDR_REG, self.buf_dma & 0xffffffff) # base address
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_BASE_ADDR_REG+4, self.buf_dma >> 32) # base address
@ -519,20 +587,37 @@ class CqRing:
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_TAIL_PTR_REG, self.tail_ptr & self.hw_ptr_mask) # tail pointer
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_ACTIVE_LOG_SIZE_REG, self.log_size) # active, log size
async def activate(self, int_index):
async def free(self):
await self.deactivate()
if self.buf:
# TODO
pass
async def activate(self, eq):
self.log.info("Activate CqRing %d (interface %d)", self.index, self.interface.index)
self.interrupt_index = int_index
await self.deactivate()
self.eq = eq
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_ACTIVE_LOG_SIZE_REG, 0) # active, log size
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_INTERRUPT_INDEX_REG, int_index) # event index
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_BASE_ADDR_REG, self.buf_dma & 0xffffffff) # base address
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_BASE_ADDR_REG+4, self.buf_dma >> 32) # base address
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_INTERRUPT_INDEX_REG, eq.index) # event index
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_HEAD_PTR_REG, self.head_ptr & self.hw_ptr_mask) # head pointer
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_TAIL_PTR_REG, self.tail_ptr & self.hw_ptr_mask) # tail pointer
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_ACTIVE_LOG_SIZE_REG, self.log_size | MQNIC_CPL_QUEUE_ACTIVE_MASK) # active, log size
self.active = True
async def deactivate(self):
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_ACTIVE_LOG_SIZE_REG, self.log_size) # active, log size
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_INTERRUPT_INDEX_REG, self.interrupt_index) # event index
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_INTERRUPT_INDEX_REG, 0) # event index
self.eq = None
self.active = False
def empty(self):
return self.head_ptr == self.tail_ptr
@ -548,23 +633,33 @@ class CqRing:
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_TAIL_PTR_REG, self.tail_ptr & self.hw_ptr_mask)
async def arm(self):
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_INTERRUPT_INDEX_REG, self.interrupt_index | MQNIC_CPL_QUEUE_ARM_MASK) # event index
if not self.active:
return
await self.hw_regs.write_dword(MQNIC_CPL_QUEUE_INTERRUPT_INDEX_REG, self.eq.index | MQNIC_CPL_QUEUE_ARM_MASK) # event index
class TxRing:
def __init__(self, interface, size, stride, index, hw_regs):
def __init__(self, interface, index, hw_regs):
self.interface = interface
self.log = interface.log
self.driver = interface.driver
self.log_queue_size = size.bit_length() - 1
self.log_desc_block_size = int(stride/MQNIC_DESC_SIZE).bit_length() - 1
self.desc_block_size = 2**self.log_desc_block_size
self.size = 2**self.log_queue_size
self.size_mask = self.size-1
self.full_size = self.size >> 1
self.stride = stride
self.log_queue_size = 0
self.log_desc_block_size = 0
self.desc_block_size = 0
self.size = 0
self.size_mask = 0
self.full_size = 0
self.stride = 0
self.index = index
self.cpl_index = 0
self.active = False
self.buf_size = 0
self.buf_region = None
self.buf_dma = 0
self.buf = None
self.cq = None
self.head_ptr = 0
self.tail_ptr = 0
@ -581,6 +676,22 @@ class TxRing:
async def init(self):
self.log.info("Init TxRing %d (interface %d)", self.index, self.interface.index)
await self.hw_regs.write_dword(MQNIC_QUEUE_ACTIVE_LOG_SIZE_REG, 0) # active, log size
async def alloc(self, size, stride):
if self.active:
raise Exception("Cannot allocate active ring")
if self.buf:
raise Exception("Already allocated")
self.log_queue_size = size.bit_length() - 1
self.log_desc_block_size = int(stride/MQNIC_DESC_SIZE).bit_length() - 1
self.desc_block_size = 2**self.log_desc_block_size
self.size = 2**self.log_queue_size
self.size_mask = self.size-1
self.full_size = self.size >> 1
self.stride = stride
self.tx_info = [None]*self.size
self.buf_size = self.size*self.stride
@ -588,6 +699,9 @@ class TxRing:
self.buf_dma = self.buf_region.get_absolute_address(0)
self.buf = self.buf_region.mem
self.head_ptr = 0
self.tail_ptr = 0
await self.hw_regs.write_dword(MQNIC_QUEUE_ACTIVE_LOG_SIZE_REG, 0) # active, log size
await self.hw_regs.write_dword(MQNIC_QUEUE_BASE_ADDR_REG, self.buf_dma & 0xffffffff) # base address
await self.hw_regs.write_dword(MQNIC_QUEUE_BASE_ADDR_REG+4, self.buf_dma >> 32) # base address
@ -596,20 +710,43 @@ class TxRing:
await self.hw_regs.write_dword(MQNIC_QUEUE_TAIL_PTR_REG, self.tail_ptr & self.hw_ptr_mask) # tail pointer
await self.hw_regs.write_dword(MQNIC_QUEUE_ACTIVE_LOG_SIZE_REG, self.log_queue_size | (self.log_desc_block_size << 8)) # active, log desc block size, log queue size
async def activate(self, cpl_index):
async def free(self):
await self.deactivate()
if self.buf:
# TODO
pass
async def activate(self, cq):
self.log.info("Activate TxRing %d (interface %d)", self.index, self.interface.index)
self.cpl_index = cpl_index
await self.deactivate()
self.cq = cq
self.cq.src_ring = self
self.cq.handler = TxRing.process_tx_cq
await self.hw_regs.write_dword(MQNIC_QUEUE_ACTIVE_LOG_SIZE_REG, 0) # active, log size
await self.hw_regs.write_dword(MQNIC_QUEUE_CPL_QUEUE_INDEX_REG, cpl_index) # completion queue index
await self.hw_regs.write_dword(MQNIC_QUEUE_BASE_ADDR_REG, self.buf_dma & 0xffffffff) # base address
await self.hw_regs.write_dword(MQNIC_QUEUE_BASE_ADDR_REG+4, self.buf_dma >> 32) # base address
await self.hw_regs.write_dword(MQNIC_QUEUE_CPL_QUEUE_INDEX_REG, cq.index) # completion queue index
await self.hw_regs.write_dword(MQNIC_QUEUE_HEAD_PTR_REG, self.head_ptr & self.hw_ptr_mask) # head pointer
await self.hw_regs.write_dword(MQNIC_QUEUE_TAIL_PTR_REG, self.tail_ptr & self.hw_ptr_mask) # tail pointer
await self.hw_regs.write_dword(MQNIC_QUEUE_ACTIVE_LOG_SIZE_REG, self.log_queue_size | (self.log_desc_block_size << 8) | MQNIC_QUEUE_ACTIVE_MASK) # active, log desc block size, log queue size
self.active = True
async def deactivate(self):
await self.hw_regs.write_dword(MQNIC_QUEUE_ACTIVE_LOG_SIZE_REG, self.log_queue_size | (self.log_desc_block_size << 8)) # active, log desc block size, log queue size
if self.cq:
self.cq.src_ring = None
self.cq.handler = None
self.cq = None
self.active = False
def empty(self):
return self.head_ptr == self.clean_tail_ptr
@ -634,21 +771,78 @@ class TxRing:
self.free_desc(index)
self.clean_tail_ptr += 1
@staticmethod
async def process_tx_cq(cq):
interface = cq.interface
interface.log.info("Process TX CQ %d for TX queue %d (interface %d)", cq.index, cq.src_ring.index, interface.index)
ring = cq.src_ring
if not interface.port_up:
return
# process completion queue
await cq.read_head_ptr()
cq_tail_ptr = cq.tail_ptr
cq_index = cq_tail_ptr & cq.size_mask
while (cq.head_ptr != cq_tail_ptr):
cpl_data = struct.unpack_from("<HHHxxQ", cq.buf, cq_index*cq.stride)
ring_index = cpl_data[1] & ring.size_mask
interface.log.info("CPL data: %s", cpl_data)
interface.log.info("Ring index: %d", ring_index)
ring.free_desc(ring_index)
cq_tail_ptr += 1
cq_index = cq_tail_ptr & cq.size_mask
cq.tail_ptr = cq_tail_ptr
await cq.write_tail_ptr()
# process ring
await ring.read_tail_ptr()
ring_clean_tail_ptr = ring.clean_tail_ptr
ring_index = ring_clean_tail_ptr & ring.size_mask
while (ring_clean_tail_ptr != ring.tail_ptr):
if ring.tx_info[ring_index]:
break
ring_clean_tail_ptr += 1
ring_index = ring_clean_tail_ptr & ring.size_mask
ring.clean_tail_ptr = ring_clean_tail_ptr
ring.clean_event.set()
class RxRing:
def __init__(self, interface, size, stride, index, hw_regs):
def __init__(self, interface, index, hw_regs):
self.interface = interface
self.log = interface.log
self.driver = interface.driver
self.log_queue_size = size.bit_length() - 1
self.log_desc_block_size = int(stride/MQNIC_DESC_SIZE).bit_length() - 1
self.desc_block_size = 2**self.log_desc_block_size
self.size = 2**self.log_queue_size
self.size_mask = self.size-1
self.full_size = self.size >> 1
self.stride = stride
self.log_queue_size = 0
self.log_desc_block_size = 0
self.desc_block_size = 0
self.size = 0
self.size_mask = 0
self.full_size = 0
self.stride = 0
self.index = index
self.cpl_index = 0
self.active = False
self.buf_size = 0
self.buf_region = None
self.buf_dma = 0
self.buf = None
self.cq = None
self.head_ptr = 0
self.tail_ptr = 0
@ -663,6 +857,22 @@ class RxRing:
async def init(self):
self.log.info("Init RxRing %d (interface %d)", self.index, self.interface.index)
await self.hw_regs.write_dword(MQNIC_QUEUE_ACTIVE_LOG_SIZE_REG, 0) # active, log size
async def alloc(self, size, stride):
if self.active:
raise Exception("Cannot allocate active ring")
if self.buf:
raise Exception("Already allocated")
self.log_queue_size = size.bit_length() - 1
self.log_desc_block_size = int(stride/MQNIC_DESC_SIZE).bit_length() - 1
self.desc_block_size = 2**self.log_desc_block_size
self.size = 2**self.log_queue_size
self.size_mask = self.size-1
self.full_size = self.size >> 1
self.stride = stride
self.rx_info = [None]*self.size
self.buf_size = self.size*self.stride
@ -670,6 +880,9 @@ class RxRing:
self.buf_dma = self.buf_region.get_absolute_address(0)
self.buf = self.buf_region.mem
self.head_ptr = 0
self.tail_ptr = 0
await self.hw_regs.write_dword(MQNIC_QUEUE_ACTIVE_LOG_SIZE_REG, 0) # active, log size
await self.hw_regs.write_dword(MQNIC_QUEUE_BASE_ADDR_REG, self.buf_dma & 0xffffffff) # base address
await self.hw_regs.write_dword(MQNIC_QUEUE_BASE_ADDR_REG+4, self.buf_dma >> 32) # base address
@ -678,22 +891,45 @@ class RxRing:
await self.hw_regs.write_dword(MQNIC_QUEUE_TAIL_PTR_REG, self.tail_ptr & self.hw_ptr_mask) # tail pointer
await self.hw_regs.write_dword(MQNIC_QUEUE_ACTIVE_LOG_SIZE_REG, self.log_queue_size | (self.log_desc_block_size << 8)) # active, log desc block size, log queue size
async def activate(self, cpl_index):
async def free(self):
await self.deactivate()
if self.buf:
# TODO
pass
async def activate(self, cq):
self.log.info("Activate RxRing %d (interface %d)", self.index, self.interface.index)
self.cpl_index = cpl_index
await self.deactivate()
self.cq = cq
self.cq.src_ring = self
self.cq.handler = RxRing.process_rx_cq
await self.hw_regs.write_dword(MQNIC_QUEUE_ACTIVE_LOG_SIZE_REG, 0) # active, log size
await self.hw_regs.write_dword(MQNIC_QUEUE_CPL_QUEUE_INDEX_REG, cpl_index) # completion queue index
await self.hw_regs.write_dword(MQNIC_QUEUE_BASE_ADDR_REG, self.buf_dma & 0xffffffff) # base address
await self.hw_regs.write_dword(MQNIC_QUEUE_BASE_ADDR_REG+4, self.buf_dma >> 32) # base address
await self.hw_regs.write_dword(MQNIC_QUEUE_CPL_QUEUE_INDEX_REG, cq.index) # completion queue index
await self.hw_regs.write_dword(MQNIC_QUEUE_HEAD_PTR_REG, self.head_ptr & self.hw_ptr_mask) # head pointer
await self.hw_regs.write_dword(MQNIC_QUEUE_TAIL_PTR_REG, self.tail_ptr & self.hw_ptr_mask) # tail pointer
await self.hw_regs.write_dword(MQNIC_QUEUE_ACTIVE_LOG_SIZE_REG, self.log_queue_size | (self.log_desc_block_size << 8) | MQNIC_QUEUE_ACTIVE_MASK) # active, log desc block size, log queue size
self.active = True
await self.refill_buffers()
async def deactivate(self):
await self.hw_regs.write_dword(MQNIC_QUEUE_ACTIVE_LOG_SIZE_REG, self.log_queue_size | (self.log_desc_block_size << 8)) # active, log desc block size, log queue size
if self.cq:
self.cq.src_ring = None
self.cq.handler = None
self.cq = None
self.active = False
def empty(self):
return self.head_ptr == self.clean_tail_ptr
@ -744,6 +980,72 @@ class RxRing:
await self.write_head_ptr()
@staticmethod
async def process_rx_cq(cq):
interface = cq.interface
interface.log.info("Process RX CQ %d for RX queue %d (interface %d)", cq.index, cq.src_ring.index, interface.index)
ring = cq.src_ring
if not interface.port_up:
return
# process completion queue
await cq.read_head_ptr()
cq_tail_ptr = cq.tail_ptr
cq_index = cq_tail_ptr & cq.size_mask
while (cq.head_ptr != cq_tail_ptr):
cpl_data = struct.unpack_from("<HHHxxLHH", cq.buf, cq_index*cq.stride)
ring_index = cpl_data[1] & ring.size_mask
interface.log.info("CPL data: %s", cpl_data)
interface.log.info("Ring index: %d", ring_index)
pkt = ring.rx_info[ring_index]
length = cpl_data[2]
skb = Packet()
skb.data = pkt[:length]
skb.queue = ring.index
skb.timestamp_ns = cpl_data[3]
skb.timestamp_s = cpl_data[4]
skb.rx_checksum = cpl_data[5]
interface.log.info("Packet: %s", skb)
interface.pkt_rx_queue.append(skb)
interface.pkt_rx_sync.set()
ring.free_desc(ring_index)
cq_tail_ptr += 1
cq_index = cq_tail_ptr & cq.size_mask
cq.tail_ptr = cq_tail_ptr
await cq.write_tail_ptr()
# process ring
await ring.read_tail_ptr()
ring_clean_tail_ptr = ring.clean_tail_ptr
ring_index = ring_clean_tail_ptr & ring.size_mask
while (ring_clean_tail_ptr != ring.tail_ptr):
if ring.rx_info[ring_index]:
break
ring_clean_tail_ptr += 1
ring_index = ring_clean_tail_ptr & ring.size_mask
ring.clean_tail_ptr = ring_clean_tail_ptr
# replenish buffers
await ring.refill_buffers()
class BaseScheduler:
def __init__(self, port, index, rb):
@ -1005,34 +1307,29 @@ class Interface:
self.sched_blocks = []
for k in range(self.event_queue_count):
q = EqRing(self, 1024, MQNIC_EVENT_SIZE, self.index,
self.hw_regs.create_window(self.event_queue_offset + k*self.event_queue_stride, self.event_queue_stride))
await q.init()
self.event_queues.append(q)
eq = EqRing(self, k, self.hw_regs.create_window(self.event_queue_offset + k*self.event_queue_stride, self.event_queue_stride))
await eq.init()
self.event_queues.append(eq)
for k in range(self.tx_queue_count):
q = TxRing(self, 1024, MQNIC_DESC_SIZE*4, k,
self.hw_regs.create_window(self.tx_queue_offset + k*self.tx_queue_stride, self.tx_queue_stride))
await q.init()
self.tx_queues.append(q)
txq = TxRing(self, k, self.hw_regs.create_window(self.tx_queue_offset + k*self.tx_queue_stride, self.tx_queue_stride))
await txq.init()
self.tx_queues.append(txq)
for k in range(self.tx_cpl_queue_count):
q = CqRing(self, 1024, MQNIC_CPL_SIZE, k,
self.hw_regs.create_window(self.tx_cpl_queue_offset + k*self.tx_cpl_queue_stride, self.tx_cpl_queue_stride))
await q.init()
self.tx_cpl_queues.append(q)
cq = CqRing(self, k, self.hw_regs.create_window(self.tx_cpl_queue_offset + k*self.tx_cpl_queue_stride, self.tx_cpl_queue_stride))
await cq.init()
self.tx_cpl_queues.append(cq)
for k in range(self.rx_queue_count):
q = RxRing(self, 1024, MQNIC_DESC_SIZE*4, k,
self.hw_regs.create_window(self.rx_queue_offset + k*self.rx_queue_stride, self.rx_queue_stride))
await q.init()
self.rx_queues.append(q)
rxq = RxRing(self, k, self.hw_regs.create_window(self.rx_queue_offset + k*self.rx_queue_stride, self.rx_queue_stride))
await rxq.init()
self.rx_queues.append(rxq)
for k in range(self.rx_cpl_queue_count):
q = CqRing(self, 1024, MQNIC_CPL_SIZE, k,
self.hw_regs.create_window(self.rx_cpl_queue_offset + k*self.rx_cpl_queue_stride, self.rx_cpl_queue_stride))
await q.init()
self.rx_cpl_queues.append(q)
cq = CqRing(self, k, self.hw_regs.create_window(self.rx_cpl_queue_offset + k*self.rx_cpl_queue_stride, self.rx_cpl_queue_stride))
await cq.init()
self.rx_cpl_queues.append(cq)
for k in range(self.port_count):
rb = self.reg_blocks.find(MQNIC_RB_PORT_TYPE, MQNIC_RB_PORT_VER, index=k)
@ -1050,32 +1347,30 @@ class Interface:
assert self.sched_block_count == len(self.sched_blocks)
for eq in self.event_queues:
await eq.alloc(1024, MQNIC_EVENT_SIZE)
await eq.activate(self.index) # TODO?
await eq.arm()
# wait for all writes to complete
await self.hw_regs.read_dword(0)
async def open(self):
for q in self.event_queues:
await q.activate(self.index) # TODO?
q.handler = None # TODO
await q.arm()
for rxq in self.rx_queues:
cq = self.rx_cpl_queues[rxq.index]
await cq.alloc(1024, MQNIC_CPL_SIZE)
await cq.activate(self.event_queues[cq.index % self.event_queue_count])
await cq.arm()
await rxq.alloc(1024, MQNIC_DESC_SIZE*4)
await rxq.activate(cq)
for q in self.rx_cpl_queues:
await q.activate(q.index % self.event_queue_count)
q.ring_index = q.index
q.handler = None # TODO
await q.arm()
for q in self.rx_queues:
await q.activate(q.index)
for q in self.tx_cpl_queues:
await q.activate(q.index % self.event_queue_count)
q.ring_index = q.index
q.handler = None # TODO
await q.arm()
for q in self.tx_queues:
await q.activate(q.index)
for txq in self.tx_queues:
cq = self.tx_cpl_queues[txq.index]
await cq.alloc(1024, MQNIC_CPL_SIZE)
await cq.activate(self.event_queues[cq.index % self.event_queue_count])
await cq.arm()
await txq.alloc(1024, MQNIC_DESC_SIZE*4)
await txq.activate(cq)
# wait for all writes to complete
await self.hw_regs.read_dword(0)
@ -1085,20 +1380,13 @@ class Interface:
async def close(self):
self.port_up = False
for q in self.tx_queues:
await q.deactivate()
for txq in self.tx_queues:
await txq.deactivate()
await txq.cq.deactivate()
for q in self.tx_cpl_queues:
await q.deactivate()
for q in self.rx_queues:
await q.deactivate()
for q in self.rx_cpl_queues:
await q.deactivate()
for q in self.event_queues:
await q.deactivate()
for rxq in self.rx_queues:
await rxq.deactivate()
await rxq.cq.deactivate()
# wait for all writes to complete
await self.hw_regs.read_dword(0)
@ -1109,116 +1397,6 @@ class Interface:
for q in self.rx_queues:
await q.free_buf()
async def process_tx_cq(self, cq_ring):
self.log.info("Process TX CQ %d (interface %d)", cq_ring.ring_index, self.index)
ring = self.tx_queues[cq_ring.ring_index]
if not self.port_up:
return
# process completion queue
await cq_ring.read_head_ptr()
cq_tail_ptr = cq_ring.tail_ptr
cq_index = cq_tail_ptr & cq_ring.size_mask
while (cq_ring.head_ptr != cq_tail_ptr):
cpl_data = struct.unpack_from("<HHHxxQ", cq_ring.buf, cq_index*cq_ring.stride)
ring_index = cpl_data[1] & ring.size_mask
self.log.info("CPL data: %s", cpl_data)
self.log.info("Ring index: %d", ring_index)
ring.free_desc(ring_index)
cq_tail_ptr += 1
cq_index = cq_tail_ptr & cq_ring.size_mask
cq_ring.tail_ptr = cq_tail_ptr
await cq_ring.write_tail_ptr()
# process ring
await ring.read_tail_ptr()
ring_clean_tail_ptr = ring.clean_tail_ptr
ring_index = ring_clean_tail_ptr & ring.size_mask
while (ring_clean_tail_ptr != ring.tail_ptr):
if ring.tx_info[ring_index]:
break
ring_clean_tail_ptr += 1
ring_index = ring_clean_tail_ptr & ring.size_mask
ring.clean_tail_ptr = ring_clean_tail_ptr
ring.clean_event.set()
async def process_rx_cq(self, cq_ring):
self.log.info("Process RX CQ %d (interface %d)", cq_ring.ring_index, self.index)
ring = self.rx_queues[cq_ring.ring_index]
if not self.port_up:
return
# process completion queue
await cq_ring.read_head_ptr()
cq_tail_ptr = cq_ring.tail_ptr
cq_index = cq_tail_ptr & cq_ring.size_mask
while (cq_ring.head_ptr != cq_tail_ptr):
cpl_data = struct.unpack_from("<HHHxxLHH", cq_ring.buf, cq_index*cq_ring.stride)
ring_index = cpl_data[1] & ring.size_mask
self.log.info("CPL data: %s", cpl_data)
self.log.info("Ring index: %d", ring_index)
pkt = ring.rx_info[ring_index]
length = cpl_data[2]
skb = Packet()
skb.data = pkt[:length]
skb.queue = ring.index
skb.timestamp_ns = cpl_data[3]
skb.timestamp_s = cpl_data[4]
skb.rx_checksum = cpl_data[5]
self.log.info("Packet: %s", skb)
self.pkt_rx_queue.append(skb)
self.pkt_rx_sync.set()
ring.free_desc(ring_index)
cq_tail_ptr += 1
cq_index = cq_tail_ptr & cq_ring.size_mask
cq_ring.tail_ptr = cq_tail_ptr
await cq_ring.write_tail_ptr()
# process ring
await ring.read_tail_ptr()
ring_clean_tail_ptr = ring.clean_tail_ptr
ring_index = ring_clean_tail_ptr & ring.size_mask
while (ring_clean_tail_ptr != ring.tail_ptr):
if ring.rx_info[ring_index]:
break
ring_clean_tail_ptr += 1
ring_index = ring_clean_tail_ptr & ring.size_mask
ring.clean_tail_ptr = ring_clean_tail_ptr
# replenish buffers
await ring.refill_buffers()
async def start_xmit(self, skb, tx_ring=None, csum_start=None, csum_offset=None):
if not self.port_up:
return
@ -1514,8 +1692,8 @@ class Driver:
self.log.info("Interrupt handler start (IRQ %d)", index)
for i in self.interfaces:
for eq in i.event_queues:
if eq.interrupt_index == index:
await eq.process()
if eq.irq == index:
await eq.process_eq()
await eq.arm()
self.log.info("Interrupt handler end (IRQ %d)", index)