1
0
mirror of https://github.com/corundum/corundum.git synced 2025-01-16 08:12:53 +08:00

Test DROP_WHEN_FULL parameter

Signed-off-by: Alex Forencich <alex@alexforencich.com>
This commit is contained in:
Alex Forencich 2023-08-14 16:59:57 -07:00
parent c4f298de6f
commit c3cd676c5d
4 changed files with 202 additions and 52 deletions

View File

@ -465,18 +465,36 @@ async def run_test_overflow(dut):
for k in range(count):
await tb.source.send(test_frame)
for k in range((depth//byte_lanes)*2):
for k in range((depth//byte_lanes)*3):
await RisingEdge(dut.s_clk)
assert not tb.source.idle()
if dut.DROP_WHEN_FULL.value:
assert tb.source.idle()
else:
assert not tb.source.idle()
tb.sink.pause = False
for k in range(count):
rx_frame = await tb.sink.recv()
if dut.DROP_WHEN_FULL.value:
for k in range((depth//byte_lanes)*3):
await RisingEdge(dut.s_clk)
assert rx_frame.tdata == test_data
assert not rx_frame.tuser
rx_count = 0
while not tb.sink.empty():
rx_frame = await tb.sink.recv()
assert rx_frame.tdata == test_data
assert not rx_frame.tuser
assert rx_count < count
else:
for k in range(count):
rx_frame = await tb.sink.recv()
assert rx_frame.tdata == test_data
assert not rx_frame.tuser
assert tb.sink.empty()
@ -548,13 +566,32 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
cur_id = (cur_id + 1) % id_count
for test_frame in test_frames:
rx_frame = await tb.sink.recv()
if dut.DROP_WHEN_FULL.value:
cycles = 0
while cycles < 100:
cycles += 1
if not tb.source.idle() or dut.s_axis_tvalid.value.integer or dut.m_axis_tvalid.value.integer or dut.m_status_depth.value.integer:
cycles = 0
await RisingEdge(dut.m_clk)
assert rx_frame.tdata == test_frame.tdata
assert rx_frame.tid == test_frame.tid
assert rx_frame.tdest == test_frame.tdest
assert not rx_frame.tuser
while not tb.sink.empty():
rx_frame = await tb.sink.recv()
while True:
test_frame = test_frames.pop(0)
if not rx_frame.tuser and rx_frame.tid == test_frame.tid and rx_frame.tdest == test_frame.tdest and rx_frame.tdata == test_frame.tdata:
break
assert len(test_frames) < 512
else:
for test_frame in test_frames:
rx_frame = await tb.sink.recv()
assert rx_frame.tdata == test_frame.tdata
assert rx_frame.tid == test_frame.tid
assert rx_frame.tdest == test_frame.tdest
assert not rx_frame.tuser
assert tb.sink.empty()
@ -617,7 +654,7 @@ rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
@pytest.mark.parametrize(("s_clk", "m_clk"), [(10, 10), (10, 11), (11, 10)])
@pytest.mark.parametrize(("frame_fifo", "drop_oversize_frame", "drop_bad_frame", "drop_when_full"),
[(0, 0, 0, 0), (1, 0, 0, 0), (1, 1, 0, 0), (1, 1, 1, 0)])
[(0, 0, 0, 0), (1, 0, 0, 0), (1, 1, 0, 0), (1, 1, 1, 0), (1, 1, 1, 1)])
@pytest.mark.parametrize(("ram_pipeline", "output_fifo"),
[(0, 0), (1, 0), (4, 0), (0, 1), (1, 1), (4, 1)])
@pytest.mark.parametrize("data_width", [8, 16, 32, 64])

View File

@ -462,18 +462,36 @@ async def run_test_overflow(dut):
for k in range(count):
await tb.source.send(test_frame)
for k in range((depth//byte_lanes)*2):
for k in range((depth//byte_lanes)*3):
await RisingEdge(dut.s_clk)
assert not tb.source.idle()
if dut.DROP_WHEN_FULL.value:
assert tb.source.idle()
else:
assert not tb.source.idle()
tb.sink.pause = False
for k in range(count):
rx_frame = await tb.sink.recv()
if dut.DROP_WHEN_FULL.value:
for k in range((depth//byte_lanes)*3):
await RisingEdge(dut.s_clk)
assert rx_frame.tdata == test_data
assert not rx_frame.tuser
rx_count = 0
while not tb.sink.empty():
rx_frame = await tb.sink.recv()
assert rx_frame.tdata == test_data
assert not rx_frame.tuser
assert rx_count < count
else:
for k in range(count):
rx_frame = await tb.sink.recv()
assert rx_frame.tdata == test_data
assert not rx_frame.tuser
assert tb.sink.empty()
@ -545,13 +563,32 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
cur_id = (cur_id + 1) % id_count
for test_frame in test_frames:
rx_frame = await tb.sink.recv()
if dut.DROP_WHEN_FULL.value:
cycles = 0
while cycles < 100:
cycles += 1
if not tb.source.idle() or dut.s_axis_tvalid.value.integer or dut.m_axis_tvalid.value.integer or dut.m_status_depth.value.integer:
cycles = 0
await RisingEdge(dut.m_clk)
assert rx_frame.tdata == test_frame.tdata
assert rx_frame.tid == test_frame.tid
assert rx_frame.tdest == test_frame.tdest
assert not rx_frame.tuser
while not tb.sink.empty():
rx_frame = await tb.sink.recv()
while True:
test_frame = test_frames.pop(0)
if not rx_frame.tuser and rx_frame.tid == test_frame.tid and rx_frame.tdest == test_frame.tdest and rx_frame.tdata == test_frame.tdata:
break
assert len(test_frames) < 512
else:
for test_frame in test_frames:
rx_frame = await tb.sink.recv()
assert rx_frame.tdata == test_frame.tdata
assert rx_frame.tid == test_frame.tid
assert rx_frame.tdest == test_frame.tdest
assert not rx_frame.tuser
assert tb.sink.empty()
@ -613,7 +650,7 @@ rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
@pytest.mark.parametrize(("frame_fifo", "drop_oversize_frame", "drop_bad_frame", "drop_when_full"),
[(0, 0, 0, 0), (1, 0, 0, 0), (1, 1, 0, 0), (1, 1, 1, 0)])
[(0, 0, 0, 0), (1, 0, 0, 0), (1, 1, 0, 0), (1, 1, 1, 0), (1, 1, 1, 1)])
@pytest.mark.parametrize("m_data_width", [8, 16, 32])
@pytest.mark.parametrize("s_data_width", [8, 16, 32])
def test_axis_async_fifo_adapter(request, s_data_width, m_data_width, frame_fifo, drop_oversize_frame, drop_bad_frame, drop_when_full):

View File

@ -251,18 +251,36 @@ async def run_test_overflow(dut):
for k in range(count):
await tb.source.send(test_frame)
for k in range((depth//byte_lanes)*2):
for k in range((depth//byte_lanes)*3):
await RisingEdge(dut.clk)
assert not tb.source.idle()
if dut.DROP_WHEN_FULL.value:
assert tb.source.idle()
else:
assert not tb.source.idle()
tb.sink.pause = False
for k in range(count):
rx_frame = await tb.sink.recv()
if dut.DROP_WHEN_FULL.value:
for k in range((depth//byte_lanes)*3):
await RisingEdge(dut.clk)
assert rx_frame.tdata == test_data
assert not rx_frame.tuser
rx_count = 0
while not tb.sink.empty():
rx_frame = await tb.sink.recv()
assert rx_frame.tdata == test_data
assert not rx_frame.tuser
assert rx_count < count
else:
for k in range(count):
rx_frame = await tb.sink.recv()
assert rx_frame.tdata == test_data
assert not rx_frame.tuser
assert tb.sink.empty()
@ -334,13 +352,34 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
cur_id = (cur_id + 1) % id_count
for test_frame in test_frames:
rx_frame = await tb.sink.recv()
if dut.DROP_WHEN_FULL.value:
cycles = 0
while cycles < 100:
cycles += 1
if not tb.source.idle() or dut.s_axis_tvalid.value.integer or dut.m_axis_tvalid.value.integer or dut.status_depth.value.integer:
cycles = 0
await RisingEdge(dut.clk)
assert rx_frame.tdata == test_frame.tdata
assert rx_frame.tid == test_frame.tid
assert rx_frame.tdest == test_frame.tdest
assert not rx_frame.tuser
while not tb.sink.empty():
rx_frame = await tb.sink.recv()
assert len(test_frames) > 0
while True:
test_frame = test_frames.pop(0)
if not rx_frame.tuser and rx_frame.tid == test_frame.tid and rx_frame.tdest == test_frame.tdest and rx_frame.tdata == test_frame.tdata:
break
assert len(test_frames) < 512
else:
for test_frame in test_frames:
rx_frame = await tb.sink.recv()
assert rx_frame.tdata == test_frame.tdata
assert rx_frame.tid == test_frame.tid
assert rx_frame.tdest == test_frame.tdest
assert not rx_frame.tuser
assert tb.sink.empty()
@ -396,7 +435,7 @@ rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
@pytest.mark.parametrize(("frame_fifo", "drop_oversize_frame", "drop_bad_frame", "drop_when_full"),
[(0, 0, 0, 0), (1, 0, 0, 0), (1, 1, 0, 0), (1, 1, 1, 0)])
[(0, 0, 0, 0), (1, 0, 0, 0), (1, 1, 0, 0), (1, 1, 1, 0), (1, 1, 1, 1)])
@pytest.mark.parametrize(("ram_pipeline", "output_fifo"),
[(0, 0), (1, 0), (4, 0), (0, 1), (1, 1), (4, 1)])
@pytest.mark.parametrize("data_width", [8, 16, 32, 64])

View File

@ -251,18 +251,36 @@ async def run_test_overflow(dut):
for k in range(count):
await tb.source.send(test_frame)
for k in range((depth//byte_lanes)*2):
for k in range((depth//byte_lanes)*3):
await RisingEdge(dut.clk)
assert not tb.source.idle()
if dut.DROP_WHEN_FULL.value:
assert tb.source.idle()
else:
assert not tb.source.idle()
tb.sink.pause = False
for k in range(count):
rx_frame = await tb.sink.recv()
if dut.DROP_WHEN_FULL.value:
for k in range((depth//byte_lanes)*3):
await RisingEdge(dut.clk)
assert rx_frame.tdata == test_data
assert not rx_frame.tuser
rx_count = 0
while not tb.sink.empty():
rx_frame = await tb.sink.recv()
assert rx_frame.tdata == test_data
assert not rx_frame.tuser
assert rx_count < count
else:
for k in range(count):
rx_frame = await tb.sink.recv()
assert rx_frame.tdata == test_data
assert not rx_frame.tuser
assert tb.sink.empty()
@ -334,13 +352,32 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
cur_id = (cur_id + 1) % id_count
for test_frame in test_frames:
rx_frame = await tb.sink.recv()
if dut.DROP_WHEN_FULL.value:
cycles = 0
while cycles < 100:
cycles += 1
if not tb.source.idle() or dut.s_axis_tvalid.value.integer or dut.m_axis_tvalid.value.integer or dut.status_depth.value.integer:
cycles = 0
await RisingEdge(dut.clk)
assert rx_frame.tdata == test_frame.tdata
assert rx_frame.tid == test_frame.tid
assert rx_frame.tdest == test_frame.tdest
assert not rx_frame.tuser
while not tb.sink.empty():
rx_frame = await tb.sink.recv()
while True:
test_frame = test_frames.pop(0)
if not rx_frame.tuser and rx_frame.tid == test_frame.tid and rx_frame.tdest == test_frame.tdest and rx_frame.tdata == test_frame.tdata:
break
assert len(test_frames) < 512
else:
for test_frame in test_frames:
rx_frame = await tb.sink.recv()
assert rx_frame.tdata == test_frame.tdata
assert rx_frame.tid == test_frame.tid
assert rx_frame.tdest == test_frame.tdest
assert not rx_frame.tuser
assert tb.sink.empty()
@ -396,7 +433,7 @@ rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
@pytest.mark.parametrize(("frame_fifo", "drop_oversize_frame", "drop_bad_frame", "drop_when_full"),
[(0, 0, 0, 0), (1, 0, 0, 0), (1, 1, 0, 0), (1, 1, 1, 0)])
[(0, 0, 0, 0), (1, 0, 0, 0), (1, 1, 0, 0), (1, 1, 1, 0), (1, 1, 1, 1)])
@pytest.mark.parametrize("m_data_width", [8, 16, 32])
@pytest.mark.parametrize("s_data_width", [8, 16, 32])
def test_axis_fifo_adapter(request, s_data_width, m_data_width, frame_fifo, drop_oversize_frame, drop_bad_frame, drop_when_full):