1
0
mirror of https://github.com/corundum/corundum.git synced 2025-01-16 08:12:53 +08:00

Add DMA PSDPRAM master model and DMA PSDPRAM testbenches

Signed-off-by: Alex Forencich <alex@alexforencich.com>
This commit is contained in:
Alex Forencich 2023-05-28 00:42:47 -07:00
parent 8ad370ac99
commit 0828de78e8
7 changed files with 1178 additions and 4 deletions

View File

@ -1,6 +1,6 @@
"""
Copyright (c) 2020 Alex Forencich
Copyright (c) 2020-2023 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
@ -23,12 +23,70 @@ THE SOFTWARE.
"""
import logging
from typing import NamedTuple
import cocotb
from cocotb.triggers import RisingEdge
from cocotb.queue import Queue
from cocotb.triggers import Event, RisingEdge
from cocotb_bus.bus import Bus
from cocotbext.axi.memory import Memory
from cocotbext.axi import Region
# master write helper objects
class WriteCmd(NamedTuple):
address: int
data: bytes
event: Event
class SegWriteData:
def __int__(self):
self.addr = 0
self.data = 0
self.be = 0
class WriteRespCmd(NamedTuple):
address: int
length: int
segments: int
first_seg: int
event: Event
class WriteResp(NamedTuple):
address: int
length: int
# master read helper objects
class ReadCmd(NamedTuple):
address: int
length: int
event: Event
class SegReadCmd:
def __int__(self):
self.addr = 0
class ReadRespCmd(NamedTuple):
address: int
length: int
segments: int
first_seg: int
event: Event
class ReadResp(NamedTuple):
address: int
data: bytes
def __bytes__(self):
return self.data
class BaseBus(Bus):
@ -80,6 +138,511 @@ class PsdpRamBus:
return cls(write, read)
class PsdpRamMasterWrite(Region):
def __init__(self, bus, clock, reset=None, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
if bus._name:
self.log = logging.getLogger(f"cocotb.{bus._entity._name}.{bus._name}")
else:
self.log = logging.getLogger(f"cocotb.{bus._entity._name}")
self.log.info("Parallel Simple Dual Port RAM master model (write)")
self.log.info("Copyright (c) 2020 Alex Forencich")
self.pause = False
self._pause_generator = None
self._pause_cr = None
self.in_flight_operations = 0
self._idle = Event()
self._idle.set()
self.width = len(self.bus.wr_cmd_data)
self.byte_size = 8
self.byte_lanes = len(self.bus.wr_cmd_be)
self.seg_count = len(self.bus.wr_cmd_valid)
self.seg_data_width = self.width // self.seg_count
self.seg_byte_lanes = self.seg_data_width // self.byte_size
self.seg_addr_width = len(self.bus.wr_cmd_addr) // self.seg_count
self.seg_be_width = self.seg_data_width // self.byte_size
self.seg_data_mask = 2**self.seg_data_width-1
self.seg_addr_mask = 2**self.seg_addr_width-1
self.seg_be_mask = 2**self.seg_be_width-1
self.address_width = self.seg_addr_width + (self.seg_byte_lanes*self.seg_count-1).bit_length()
self.write_command_queue = Queue()
self.write_command_queue.queue_occupancy_limit = 2
self.current_write_command = None
self.seg_write_queue = [Queue() for x in range(self.seg_count)]
self.seg_write_resp_queue = [Queue() for x in range(self.seg_count)]
self.int_write_resp_command_queue = Queue()
self.current_write_resp_command = None
super().__init__(2**self.address_width, **kwargs)
self.log.info("Parallel Simple Dual Port RAM master model configuration:")
self.log.info(" Address width: %d bits", self.address_width)
self.log.info(" Segment count: %d", self.seg_count)
self.log.info(" Segment addr width: %d bits", self.seg_addr_width)
self.log.info(" Segment data width: %d bits (%d bytes)", self.seg_data_width, self.seg_byte_lanes)
self.log.info(" Total data width: %d bits (%d bytes)", self.width, self.byte_lanes)
assert self.seg_be_width*self.seg_count == len(self.bus.wr_cmd_be)
self.bus.wr_cmd_valid.setimmediatevalue(0)
cocotb.start_soon(self._process_write())
cocotb.start_soon(self._process_write_resp())
cocotb.start_soon(self._run())
def set_pause_generator(self, generator=None):
if self._pause_cr is not None:
self._pause_cr.kill()
self._pause_cr = None
self._pause_generator = generator
if self._pause_generator is not None:
self._pause_cr = cocotb.start_soon(self._run_pause())
def clear_pause_generator(self):
self.set_pause_generator(None)
def idle(self):
return not self.in_flight_operations
async def wait(self):
while not self.idle():
await self._idle.wait()
async def write(self, address, data):
if address < 0 or address >= 2**self.address_width:
raise ValueError("Address out of range")
if isinstance(data, int):
raise ValueError("Expected bytes or bytearray for data")
if address+len(data) > 2**self.address_width:
raise ValueError("Requested transfer overruns end of address space")
event = Event()
data = bytes(data)
self.in_flight_operations += 1
self._idle.clear()
await self.write_command_queue.put(WriteCmd(address, data, event))
await event.wait()
return event.data
async def _process_write(self):
while True:
cmd = await self.write_command_queue.get()
self.current_write_command = cmd
seg_start_offset = cmd.address % self.seg_byte_lanes
seg_end_offset = ((cmd.address + len(cmd.data) - 1) % self.seg_byte_lanes) + 1
seg_be_start = (self.seg_be_mask << seg_start_offset) & self.seg_be_mask
seg_be_end = self.seg_be_mask >> (self.seg_byte_lanes - seg_end_offset)
first_seg = (cmd.address // self.seg_byte_lanes) % self.seg_count
segments = (len(cmd.data) + (cmd.address % self.seg_byte_lanes) + self.seg_byte_lanes-1) // self.seg_byte_lanes
resp_cmd = WriteRespCmd(cmd.address, len(cmd.data), segments, first_seg, cmd.event)
await self.int_write_resp_command_queue.put(resp_cmd)
offset = 0
if self.log.isEnabledFor(logging.INFO):
self.log.info("Write start addr: 0x%08x data: %s",
cmd.address, ' '.join((f'{c:02x}' for c in cmd.data)))
seg = first_seg
for k in range(segments):
start = 0
stop = self.seg_byte_lanes
be = self.seg_be_mask
if k == 0:
start = seg_start_offset
be &= seg_be_start
if k == segments-1:
stop = seg_end_offset
be &= seg_be_end
val = 0
for j in range(start, stop):
val |= cmd.data[offset] << j*8
offset += 1
op = SegWriteData()
op.addr = (cmd.address + k*self.seg_byte_lanes) // self.byte_lanes
op.data = val
op.be = be
await self.seg_write_queue[seg].put(op)
seg = (seg + 1) % self.seg_count
self.current_write_command = None
async def _process_write_resp(self):
while True:
cmd = await self.int_write_resp_command_queue.get()
self.current_write_resp_command = cmd
seg = cmd.first_seg
for k in range(cmd.segments):
await self.seg_write_resp_queue[seg].get()
seg = (seg + 1) % self.seg_count
if self.log.isEnabledFor(logging.INFO):
self.log.info("Write complete addr: 0x%08x length: %d", cmd.address, cmd.length)
write_resp = WriteResp(cmd.address, cmd.length)
cmd.event.set(write_resp)
self.current_write_resp_command = None
self.in_flight_operations -= 1
if self.in_flight_operations == 0:
self._idle.set()
async def _run(self):
cmd_valid = 0
cmd_addr = 0
cmd_data = 0
cmd_be = 0
clock_edge_event = RisingEdge(self.clock)
while True:
await clock_edge_event
cmd_ready_sample = self.bus.wr_cmd_ready.value
done_sample = self.bus.wr_done.value
if self.reset is not None and self.reset.value:
self.bus.wr_cmd_valid.setimmediatevalue(0)
continue
# process segments
for seg in range(self.seg_count):
seg_mask = 1 << seg
if (cmd_ready_sample & seg_mask) or not (cmd_valid & seg_mask):
if not self.seg_write_queue[seg].empty() and not self.pause:
op = await self.seg_write_queue[seg].get()
cmd_addr &= ~(self.seg_addr_mask << self.seg_addr_width*seg)
cmd_addr |= ((op.addr & self.seg_addr_mask) << self.seg_addr_width*seg)
cmd_data &= ~(self.seg_data_mask << self.seg_data_width*seg)
cmd_data |= ((op.data & self.seg_data_mask) << self.seg_data_width*seg)
cmd_be &= ~(self.seg_be_mask << self.seg_be_width*seg)
cmd_be |= ((op.be & self.seg_be_mask) << self.seg_be_width*seg)
cmd_valid |= seg_mask
if self.log.isEnabledFor(logging.INFO):
self.log.info("Write word seg: %d addr: 0x%08x be 0x%02x data %s",
seg, op.addr, op.be, ' '.join((f'{c:02x}' for c in op.data.to_bytes(self.seg_byte_lanes, 'little'))))
else:
cmd_valid &= ~seg_mask
if done_sample & seg_mask:
await self.seg_write_resp_queue[seg].put(None)
self.bus.wr_cmd_valid.value = cmd_valid
self.bus.wr_cmd_addr.value = cmd_addr
self.bus.wr_cmd_data.value = cmd_data
self.bus.wr_cmd_be.value = cmd_be
async def _run_pause(self):
clock_edge_event = RisingEdge(self.clock)
for val in self._pause_generator:
self.pause = val
await clock_edge_event
class PsdpRamMasterRead(Region):
def __init__(self, bus, clock, reset=None, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
if bus._name:
self.log = logging.getLogger(f"cocotb.{bus._entity._name}.{bus._name}")
else:
self.log = logging.getLogger(f"cocotb.{bus._entity._name}")
self.log.info("Parallel Simple Dual Port RAM master model (read)")
self.log.info("Copyright (c) 2020 Alex Forencich")
self.pause = False
self._pause_generator = None
self._pause_cr = None
self.in_flight_operations = 0
self._idle = Event()
self._idle.set()
self.width = len(self.bus.rd_resp_data)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
self.seg_count = len(self.bus.rd_cmd_valid)
self.seg_data_width = self.width // self.seg_count
self.seg_byte_lanes = self.seg_data_width // self.byte_size
self.seg_addr_width = len(self.bus.rd_cmd_addr) // self.seg_count
self.seg_data_mask = 2**self.seg_data_width-1
self.seg_addr_mask = 2**self.seg_addr_width-1
self.address_width = self.seg_addr_width + (self.seg_byte_lanes*self.seg_count-1).bit_length()
self.read_command_queue = Queue()
self.read_command_queue.queue_occupancy_limit = 2
self.current_read_command = None
self.seg_read_queue = [Queue() for x in range(self.seg_count)]
self.seg_read_resp_queue = [Queue() for x in range(self.seg_count)]
self.int_read_resp_command_queue = Queue()
self.current_read_resp_command = None
super().__init__(2**self.address_width, **kwargs)
self.log.info("Parallel Simple Dual Port RAM master model configuration:")
self.log.info(" Address width: %d bits", self.address_width)
self.log.info(" Segment count: %d", self.seg_count)
self.log.info(" Segment addr width: %d bits", self.seg_addr_width)
self.log.info(" Segment data width: %d bits (%d bytes)", self.seg_data_width, self.seg_byte_lanes)
self.log.info(" Total data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.bus.rd_cmd_valid.setimmediatevalue(0)
self.bus.rd_resp_ready.setimmediatevalue(0)
cocotb.start_soon(self._process_read())
cocotb.start_soon(self._process_read_resp())
cocotb.start_soon(self._run())
def set_pause_generator(self, generator=None):
if self._pause_cr is not None:
self._pause_cr.kill()
self._pause_cr = None
self._pause_generator = generator
if self._pause_generator is not None:
self._pause_cr = cocotb.start_soon(self._run_pause())
def clear_pause_generator(self):
self.set_pause_generator(None)
def idle(self):
return not self.in_flight_operations
async def wait(self):
while not self.idle():
await self._idle.wait()
async def read(self, address, length):
if address < 0 or address >= 2**self.address_width:
raise ValueError("Address out of range")
if length < 0:
raise ValueError("Read length must be positive")
if address+length > 2**self.address_width:
raise ValueError("Requested transfer overruns end of address space")
event = Event()
self.in_flight_operations += 1
self._idle.clear()
await self.read_command_queue.put(ReadCmd(address, length, event))
await event.wait()
return event.data
async def _process_read(self):
while True:
cmd = await self.read_command_queue.get()
self.current_read_command = cmd
first_seg = (cmd.address // self.seg_byte_lanes) % self.seg_count
segments = (cmd.length + (cmd.address % self.seg_byte_lanes) + self.seg_byte_lanes-1) // self.seg_byte_lanes
resp_cmd = ReadRespCmd(cmd.address, cmd.length, segments, first_seg, cmd.event)
await self.int_read_resp_command_queue.put(resp_cmd)
if self.log.isEnabledFor(logging.INFO):
self.log.info("Read start addr: 0x%08x length: %d", cmd.address, cmd.length)
seg = first_seg
for k in range(segments):
op = SegReadCmd()
op.addr = (cmd.address + k*self.seg_byte_lanes) // self.byte_lanes
await self.seg_read_queue[seg].put(op)
seg = (seg + 1) % self.seg_count
self.current_read_command = None
async def _process_read_resp(self):
while True:
cmd = await self.int_read_resp_command_queue.get()
self.current_read_resp_command = cmd
seg_start_offset = cmd.address % self.seg_byte_lanes
seg_end_offset = ((cmd.address + cmd.length - 1) % self.seg_byte_lanes) + 1
data = bytearray()
seg = cmd.first_seg
for k in range(cmd.segments):
seg_data = await self.seg_read_resp_queue[seg].get()
start = 0
stop = self.seg_byte_lanes
if k == 0:
start = seg_start_offset
if k == cmd.segments-1:
stop = seg_end_offset
for j in range(start, stop):
data.extend(bytearray([(seg_data >> j*8) & 0xff]))
seg = (seg + 1) % self.seg_count
if self.log.isEnabledFor(logging.INFO):
self.log.info("Read complete addr: 0x%08x data: %s",
cmd.address, ' '.join((f'{c:02x}' for c in data)))
read_resp = ReadResp(cmd.address, bytes(data))
cmd.event.set(read_resp)
self.current_read_resp_command = None
self.in_flight_operations -= 1
if self.in_flight_operations == 0:
self._idle.set()
async def _run(self):
cmd_valid = 0
cmd_addr = 0
resp_ready = 0
clock_edge_event = RisingEdge(self.clock)
while True:
await clock_edge_event
cmd_ready_sample = self.bus.rd_cmd_ready.value
resp_valid_sample = self.bus.rd_resp_valid.value
if resp_valid_sample:
resp_data_sample = self.bus.rd_resp_data.value
if self.reset is not None and self.reset.value:
self.bus.rd_cmd_valid.setimmediatevalue(0)
self.bus.rd_resp_ready.setimmediatevalue(0)
cmd_valid = 0
resp_ready = 0
continue
# process segments
for seg in range(self.seg_count):
seg_mask = 1 << seg
if (cmd_ready_sample & seg_mask) or not (cmd_valid & seg_mask):
if not self.seg_read_queue[seg].empty() and not self.pause:
op = await self.seg_read_queue[seg].get()
cmd_addr &= ~(self.seg_addr_mask << self.seg_addr_width*seg)
cmd_addr |= ((op.addr & self.seg_addr_mask) << self.seg_addr_width*seg)
cmd_valid |= seg_mask
if self.log.isEnabledFor(logging.INFO):
self.log.info("Read word seg: %d addr: 0x%08x", seg, op.addr)
else:
cmd_valid &= ~seg_mask
if resp_ready & resp_valid_sample & (1 << seg):
seg_data = (resp_data_sample >> self.seg_data_width*seg) & self.seg_data_mask
await self.seg_read_resp_queue[seg].put(seg_data)
resp_ready = 2**self.seg_count-1
if self.pause:
resp_ready = 0
self.bus.rd_cmd_valid.value = cmd_valid
self.bus.rd_cmd_addr.value = cmd_addr
self.bus.rd_resp_ready.value = resp_ready
async def _run_pause(self):
clock_edge_event = RisingEdge(self.clock)
for val in self._pause_generator:
self.pause = val
await clock_edge_event
class PsdpRamMaster(Region):
def __init__(self, bus, clock, reset=None, **kwargs):
self.write_if = None
self.read_if = None
self.write_if = PsdpRamMasterWrite(bus.write, clock, reset)
self.read_if = PsdpRamMasterRead(bus.read, clock, reset)
super().__init__(max(self.write_if.size, self.read_if.size), **kwargs)
def init_read(self, address, length, event=None):
return self.read_if.init_read(address, length, event)
def init_write(self, address, data, event=None):
return self.write_if.init_write(address, data, event)
def idle(self):
return (not self.read_if or self.read_if.idle()) and (not self.write_if or self.write_if.idle())
async def wait(self):
while not self.idle():
await self.write_if.wait()
await self.read_if.wait()
async def wait_read(self):
await self.read_if.wait()
async def wait_write(self):
await self.write_if.wait()
async def read(self, address, length):
return await self.read_if.read(address, length)
async def write(self, address, data):
return await self.write_if.write(address, data)
class PsdpRamWrite(Memory):
def __init__(self, bus, clock, reset=None, size=1024, mem=None, *args, **kwargs):
@ -116,7 +679,7 @@ class PsdpRamWrite(Memory):
self.log.info(" Segment count: %d", self.seg_count)
self.log.info(" Segment addr width: %d bits", self.seg_addr_width)
self.log.info(" Segment data width: %d bits (%d bytes)", self.seg_data_width, self.seg_byte_lanes)
self.log.info(" Total data width: %d bits (%d bytes)", self.width, self.width // self.byte_size)
self.log.info(" Total data width: %d bits (%d bytes)", self.width, self.byte_lanes)
assert self.seg_be_width*self.seg_count == len(self.bus.wr_cmd_be)
@ -249,7 +812,7 @@ class PsdpRamRead(Memory):
self.log.info(" Segment count: %d", self.seg_count)
self.log.info(" Segment addr width: %d bits", self.seg_addr_width)
self.log.info(" Segment data width: %d bits (%d bytes)", self.seg_data_width, self.seg_byte_lanes)
self.log.info(" Total data width: %d bits (%d bytes)", self.width, self.width // self.byte_size)
self.log.info(" Total data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.bus.rd_cmd_ready.setimmediatevalue(0)
self.bus.rd_resp_valid.setimmediatevalue(0)

73
tb/dma_psdpram/Makefile Normal file
View File

@ -0,0 +1,73 @@
# Copyright (c) 2023 Alex Forencich
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
TOPLEVEL_LANG = verilog
SIM ?= icarus
WAVES ?= 0
COCOTB_HDL_TIMEUNIT = 1ns
COCOTB_HDL_TIMEPRECISION = 1ps
DUT = dma_psdpram
TOPLEVEL = $(DUT)
MODULE = test_$(DUT)
VERILOG_SOURCES += ../../rtl/$(DUT).v
# module parameters
export PARAM_SIZE := 65536
export PARAM_SEG_COUNT := 2
export PARAM_SEG_DATA_WIDTH := 32
export PARAM_SEG_BE_WIDTH := $(shell expr $(PARAM_SEG_DATA_WIDTH) / 8 )
export PARAM_SEG_ADDR_WIDTH := $(shell python -c "print(($(PARAM_SIZE)//($(PARAM_SEG_COUNT)*$(PARAM_SEG_BE_WIDTH))-1).bit_length())"),
export PARAM_PIPELINE := 2
ifeq ($(SIM), icarus)
PLUSARGS += -fst
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-P $(TOPLEVEL).$(subst PARAM_,,$(v))=$($(v)))
ifeq ($(WAVES), 1)
VERILOG_SOURCES += iverilog_dump.v
COMPILE_ARGS += -s iverilog_dump
endif
else ifeq ($(SIM), verilator)
COMPILE_ARGS += -Wno-SELRANGE -Wno-WIDTH
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-G$(subst PARAM_,,$(v))=$($(v)))
ifeq ($(WAVES), 1)
COMPILE_ARGS += --trace-fst
endif
endif
include $(shell cocotb-config --makefiles)/Makefile.sim
iverilog_dump.v:
echo 'module iverilog_dump();' > $@
echo 'initial begin' >> $@
echo ' $$dumpfile("$(TOPLEVEL).fst");' >> $@
echo ' $$dumpvars(0, $(TOPLEVEL));' >> $@
echo 'end' >> $@
echo 'endmodule' >> $@
clean::
@rm -rf iverilog_dump.v
@rm -rf dump.fst $(TOPLEVEL).fst

View File

@ -0,0 +1 @@
../dma_psdp_ram.py

View File

@ -0,0 +1,229 @@
#!/usr/bin/env python
"""
Copyright (c) 2023 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import itertools
import logging
import os
import random
import sys
import cocotb_test.simulator
import pytest
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge, Timer
from cocotb.regression import TestFactory
try:
from dma_psdp_ram import PsdpRamMaster, PsdpRamBus
except ImportError:
# attempt import from current directory
sys.path.insert(0, os.path.join(os.path.dirname(__file__)))
try:
from dma_psdp_ram import PsdpRamMaster, PsdpRamBus
finally:
del sys.path[0]
class TB(object):
def __init__(self, dut):
self.dut = dut
self.log = logging.getLogger("cocotb.tb")
self.log.setLevel(logging.DEBUG)
cocotb.start_soon(Clock(dut.clk, 10, units="ns").start())
# DMA RAM
self.dma_ram_master = PsdpRamMaster(PsdpRamBus.from_entity(dut), dut.clk, dut.rst)
def set_idle_generator(self, generator=None):
if generator:
self.dma_ram_master.write_if.set_pause_generator(generator())
self.dma_ram_master.read_if.set_pause_generator(generator())
def set_backpressure_generator(self, generator=None):
if generator:
pass
async def cycle_reset(self):
self.dut.rst.setimmediatevalue(0)
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst.value = 1
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst.value = 0
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_inserter=None, size=None):
tb = TB(dut)
byte_lanes = tb.dma_ram_master.write_if.byte_lanes
await tb.cycle_reset()
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
for length in list(range(1, byte_lanes*2))+[1024]:
for offset in list(range(byte_lanes, byte_lanes*2))+list(range(4096-byte_lanes, 4096)):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
await tb.dma_ram_master.write(addr-4, b'\xaa'*(length+8))
await tb.dma_ram_master.write(addr, test_data)
data = await tb.dma_ram_master.read(addr-1, length+2)
assert data.data == b'\xaa'+test_data+b'\xaa'
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inserter=None, size=None):
tb = TB(dut)
byte_lanes = tb.dma_ram_master.write_if.byte_lanes
await tb.cycle_reset()
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
for length in list(range(1, byte_lanes*2))+[1024]:
for offset in list(range(byte_lanes, byte_lanes*2))+list(range(4096-byte_lanes, 4096)):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
await tb.dma_ram_master.write(addr, test_data)
data = await tb.dma_ram_master.read(addr, length)
assert data.data == test_data
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
tb = TB(dut)
await tb.cycle_reset()
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
async def worker(master, offset, aperture, count=16):
for k in range(count):
length = random.randint(1, min(512, aperture))
addr = offset+random.randint(0, aperture-length)
test_data = bytearray([x % 256 for x in range(length)])
await Timer(random.randint(1, 100), 'ns')
await master.write(addr, test_data)
await Timer(random.randint(1, 100), 'ns')
data = await master.read(addr, length)
assert data.data == test_data
workers = []
for k in range(16):
workers.append(cocotb.start_soon(worker(tb.dma_ram_master, k*0x1000, 0x1000, count=16)))
while workers:
await workers.pop(0).join()
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
def cycle_pause():
return itertools.cycle([1, 1, 1, 0])
if cocotb.SIM_NAME:
for test in [run_test_write, run_test_read, run_stress_test]:
factory = TestFactory(test)
factory.add_option("idle_inserter", [None, cycle_pause])
factory.add_option("backpressure_inserter", [None, cycle_pause])
factory.generate_tests()
# cocotb-test
tests_dir = os.path.dirname(__file__)
rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
@pytest.mark.parametrize("seg_data_width", [32, 64])
@pytest.mark.parametrize("seg_count", [2, 4])
def test_dma_psdpram(request, seg_data_width, seg_count):
dut = "dma_psdpram"
module = os.path.splitext(os.path.basename(__file__))[0]
toplevel = dut
verilog_sources = [
os.path.join(rtl_dir, f"{dut}.v"),
]
parameters = {}
parameters['SIZE'] = 65536
parameters['SEG_COUNT'] = seg_count
parameters['SEG_DATA_WIDTH'] = seg_data_width
parameters['SEG_BE_WIDTH'] = parameters['SEG_DATA_WIDTH'] // 8
parameters['SEG_ADDR_WIDTH'] = (parameters['SIZE']//(parameters['SEG_COUNT']*parameters['SEG_BE_WIDTH'])-1).bit_length()
parameters['PIPELINE'] = 2
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
sim_build = os.path.join(tests_dir, "sim_build",
request.node.name.replace('[', '-').replace(']', ''))
cocotb_test.simulator.run(
python_search=[tests_dir],
verilog_sources=verilog_sources,
toplevel=toplevel,
module=module,
parameters=parameters,
sim_build=sim_build,
extra_env=extra_env,
)

View File

@ -0,0 +1,73 @@
# Copyright (c) 2023 Alex Forencich
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
TOPLEVEL_LANG = verilog
SIM ?= icarus
WAVES ?= 0
COCOTB_HDL_TIMEUNIT = 1ns
COCOTB_HDL_TIMEPRECISION = 1ps
DUT = dma_psdpram_async
TOPLEVEL = $(DUT)
MODULE = test_$(DUT)
VERILOG_SOURCES += ../../rtl/$(DUT).v
# module parameters
export PARAM_SIZE := 65536
export PARAM_SEG_COUNT := 2
export PARAM_SEG_DATA_WIDTH := 32
export PARAM_SEG_BE_WIDTH := $(shell expr $(PARAM_SEG_DATA_WIDTH) / 8 )
export PARAM_SEG_ADDR_WIDTH := $(shell python -c "print(($(PARAM_SIZE)//($(PARAM_SEG_COUNT)*$(PARAM_SEG_BE_WIDTH))-1).bit_length())"),
export PARAM_PIPELINE := 2
ifeq ($(SIM), icarus)
PLUSARGS += -fst
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-P $(TOPLEVEL).$(subst PARAM_,,$(v))=$($(v)))
ifeq ($(WAVES), 1)
VERILOG_SOURCES += iverilog_dump.v
COMPILE_ARGS += -s iverilog_dump
endif
else ifeq ($(SIM), verilator)
COMPILE_ARGS += -Wno-SELRANGE -Wno-WIDTH
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-G$(subst PARAM_,,$(v))=$($(v)))
ifeq ($(WAVES), 1)
COMPILE_ARGS += --trace-fst
endif
endif
include $(shell cocotb-config --makefiles)/Makefile.sim
iverilog_dump.v:
echo 'module iverilog_dump();' > $@
echo 'initial begin' >> $@
echo ' $$dumpfile("$(TOPLEVEL).fst");' >> $@
echo ' $$dumpvars(0, $(TOPLEVEL));' >> $@
echo 'end' >> $@
echo 'endmodule' >> $@
clean::
@rm -rf iverilog_dump.v
@rm -rf dump.fst $(TOPLEVEL).fst

View File

@ -0,0 +1 @@
../dma_psdp_ram.py

View File

@ -0,0 +1,234 @@
#!/usr/bin/env python
"""
Copyright (c) 2023 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import itertools
import logging
import os
import random
import sys
import cocotb_test.simulator
import pytest
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge, Timer
from cocotb.regression import TestFactory
try:
from dma_psdp_ram import PsdpRamMasterWrite, PsdpRamMasterRead, PsdpRamWriteBus, PsdpRamReadBus
except ImportError:
# attempt import from current directory
sys.path.insert(0, os.path.join(os.path.dirname(__file__)))
try:
from dma_psdp_ram import PsdpRamMasterWrite, PsdpRamMasterRead, PsdpRamWriteBus, PsdpRamReadBus
finally:
del sys.path[0]
class TB(object):
def __init__(self, dut):
self.dut = dut
self.log = logging.getLogger("cocotb.tb")
self.log.setLevel(logging.DEBUG)
cocotb.start_soon(Clock(dut.clk_wr, 10, units="ns").start())
cocotb.start_soon(Clock(dut.clk_rd, 11, units="ns").start())
# DMA RAM
self.dma_ram_master_wr = PsdpRamMasterWrite(PsdpRamWriteBus.from_entity(dut), dut.clk_wr, dut.rst_wr)
self.dma_ram_master_rd = PsdpRamMasterRead(PsdpRamReadBus.from_entity(dut), dut.clk_rd, dut.rst_rd)
def set_idle_generator(self, generator=None):
if generator:
self.dma_ram_master_wr.set_pause_generator(generator())
self.dma_ram_master_rd.set_pause_generator(generator())
def set_backpressure_generator(self, generator=None):
if generator:
pass
async def cycle_reset(self):
self.dut.rst_wr.setimmediatevalue(0)
self.dut.rst_rd.setimmediatevalue(0)
await RisingEdge(self.dut.clk_wr)
await RisingEdge(self.dut.clk_wr)
self.dut.rst_wr.value = 1
self.dut.rst_rd.value = 1
await RisingEdge(self.dut.clk_wr)
await RisingEdge(self.dut.clk_wr)
self.dut.rst_wr.value = 0
self.dut.rst_rd.value = 0
await RisingEdge(self.dut.clk_wr)
await RisingEdge(self.dut.clk_wr)
async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_inserter=None, size=None):
tb = TB(dut)
byte_lanes = tb.dma_ram_master_wr.byte_lanes
await tb.cycle_reset()
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
for length in list(range(1, byte_lanes*2))+[1024]:
for offset in list(range(byte_lanes, byte_lanes*2))+list(range(4096-byte_lanes, 4096)):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
await tb.dma_ram_master_wr.write(addr-4, b'\xaa'*(length+8))
await tb.dma_ram_master_wr.write(addr, test_data)
data = await tb.dma_ram_master_rd.read(addr-1, length+2)
assert data.data == b'\xaa'+test_data+b'\xaa'
await RisingEdge(dut.clk_wr)
await RisingEdge(dut.clk_wr)
async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inserter=None, size=None):
tb = TB(dut)
byte_lanes = tb.dma_ram_master_wr.byte_lanes
await tb.cycle_reset()
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
for length in list(range(1, byte_lanes*2))+[1024]:
for offset in list(range(byte_lanes, byte_lanes*2))+list(range(4096-byte_lanes, 4096)):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
await tb.dma_ram_master_wr.write(addr, test_data)
data = await tb.dma_ram_master_rd.read(addr, length)
assert data.data == test_data
await RisingEdge(dut.clk_wr)
await RisingEdge(dut.clk_wr)
async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
tb = TB(dut)
await tb.cycle_reset()
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
async def worker(master_wr, master_rd, offset, aperture, count=16):
for k in range(count):
length = random.randint(1, min(512, aperture))
addr = offset+random.randint(0, aperture-length)
test_data = bytearray([x % 256 for x in range(length)])
await Timer(random.randint(1, 100), 'ns')
await master_wr.write(addr, test_data)
await Timer(random.randint(1, 100), 'ns')
data = await master_rd.read(addr, length)
assert data.data == test_data
workers = []
for k in range(16):
workers.append(cocotb.start_soon(worker(tb.dma_ram_master_wr, tb.dma_ram_master_rd, k*0x1000, 0x1000, count=16)))
while workers:
await workers.pop(0).join()
await RisingEdge(dut.clk_wr)
await RisingEdge(dut.clk_wr)
def cycle_pause():
return itertools.cycle([1, 1, 1, 0])
if cocotb.SIM_NAME:
for test in [run_test_write, run_test_read, run_stress_test]:
factory = TestFactory(test)
factory.add_option("idle_inserter", [None, cycle_pause])
factory.add_option("backpressure_inserter", [None, cycle_pause])
factory.generate_tests()
# cocotb-test
tests_dir = os.path.dirname(__file__)
rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
@pytest.mark.parametrize("seg_data_width", [32, 64])
@pytest.mark.parametrize("seg_count", [2, 4])
def test_dma_psdpram_async(request, seg_data_width, seg_count):
dut = "dma_psdpram_async"
module = os.path.splitext(os.path.basename(__file__))[0]
toplevel = dut
verilog_sources = [
os.path.join(rtl_dir, f"{dut}.v"),
]
parameters = {}
parameters['SIZE'] = 65536
parameters['SEG_COUNT'] = seg_count
parameters['SEG_DATA_WIDTH'] = seg_data_width
parameters['SEG_BE_WIDTH'] = parameters['SEG_DATA_WIDTH'] // 8
parameters['SEG_ADDR_WIDTH'] = (parameters['SIZE']//(parameters['SEG_COUNT']*parameters['SEG_BE_WIDTH'])-1).bit_length()
parameters['PIPELINE'] = 2
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
sim_build = os.path.join(tests_dir, "sim_build",
request.node.name.replace('[', '-').replace(']', ''))
cocotb_test.simulator.run(
python_search=[tests_dir],
verilog_sources=verilog_sources,
toplevel=toplevel,
module=module,
parameters=parameters,
sim_build=sim_build,
extra_env=extra_env,
)