Add TX underrun and error tests

Signed-off-by: Alex Forencich <alex@alexforencich.com>
This commit is contained in:
Alex Forencich 2024-01-29 16:16:11 -08:00
parent 915f4c21ff
commit e24f887009
10 changed files with 885 additions and 3 deletions

View File

@ -216,6 +216,82 @@ async def run_test_alignment(dut, payload_data=None, ifg=12):
await RisingEdge(dut.clk)
async def run_test_underrun(dut, ifg=12):
tb = TB(dut)
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
await tb.source.send(test_frame)
for k in range(16):
await RisingEdge(dut.clk)
tb.source.pause = True
for k in range(4):
await RisingEdge(dut.clk)
tb.source.pause = False
for k in range(3):
rx_frame = await tb.sink.recv()
if k == 1:
assert rx_frame.data[-1] == 0xFE
assert rx_frame.ctrl[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.ctrl is None
assert tb.sink.empty()
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
async def run_test_error(dut, ifg=12):
tb = TB(dut)
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
if k == 1:
test_frame.tuser = 1
await tb.source.send(test_frame)
for k in range(3):
rx_frame = await tb.sink.recv()
if k == 1:
assert rx_frame.data[-1] == 0xFE
assert rx_frame.ctrl[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.ctrl is None
assert tb.sink.empty()
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
def size_list():
return list(range(60, 128)) + [512, 1514, 9214] + [60]*10
@ -241,6 +317,12 @@ if cocotb.SIM_NAME:
factory.add_option("ifg", [12])
factory.generate_tests()
for test in [run_test_underrun, run_test_error]:
factory = TestFactory(test)
factory.add_option("ifg", [12])
factory.generate_tests()
# cocotb-test

View File

@ -141,6 +141,94 @@ async def run_test(dut, payload_lengths=None, payload_data=None, ifg=12, enable_
await RisingEdge(dut.clk)
async def run_test_underrun(dut, ifg=12, enable_gen=None, mii_sel=False):
tb = TB(dut)
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
tb.dut.mii_select.value = mii_sel
if enable_gen is not None:
tb.set_enable_generator(enable_gen())
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
await tb.source.send(test_frame)
for k in range(200 if mii_sel else 100):
while True:
await RisingEdge(dut.clk)
if dut.clk_enable.value.integer:
break
tb.source.pause = True
for k in range(10):
while True:
await RisingEdge(dut.clk)
if dut.clk_enable.value.integer:
break
tb.source.pause = False
for k in range(3):
rx_frame = await tb.sink.recv()
if k == 1:
assert rx_frame.error[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.sink.empty()
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
async def run_test_error(dut, ifg=12, enable_gen=None, mii_sel=False):
tb = TB(dut)
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
tb.dut.mii_select.value = mii_sel
if enable_gen is not None:
tb.set_enable_generator(enable_gen())
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
if k == 1:
test_frame.tuser = 1
await tb.source.send(test_frame)
for k in range(3):
rx_frame = await tb.sink.recv()
if k == 1:
assert rx_frame.error[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.sink.empty()
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
def size_list():
return list(range(60, 128)) + [512, 1514] + [60]*10
@ -163,6 +251,14 @@ if cocotb.SIM_NAME:
factory.add_option("mii_sel", [False, True])
factory.generate_tests()
for test in [run_test_underrun, run_test_error]:
factory = TestFactory(test)
factory.add_option("ifg", [12])
factory.add_option("enable_gen", [None, cycle_en])
factory.add_option("mii_sel", [False, True])
factory.generate_tests()
# cocotb-test

View File

@ -197,6 +197,82 @@ async def run_test_alignment(dut, payload_data=None, ifg=12):
await RisingEdge(dut.clk)
async def run_test_underrun(dut, ifg=12):
tb = TB(dut)
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
await tb.source.send(test_frame)
for k in range(32):
await RisingEdge(dut.clk)
tb.source.pause = True
for k in range(4):
await RisingEdge(dut.clk)
tb.source.pause = False
for k in range(3):
rx_frame = await tb.sink.recv()
if k == 1:
assert rx_frame.data[-1] == 0xFE
assert rx_frame.ctrl[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.ctrl is None
assert tb.sink.empty()
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
async def run_test_error(dut, ifg=12):
tb = TB(dut)
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
if k == 1:
test_frame.tuser = 1
await tb.source.send(test_frame)
for k in range(3):
rx_frame = await tb.sink.recv()
if k == 1:
assert rx_frame.data[-1] == 0xFE
assert rx_frame.ctrl[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.ctrl is None
assert tb.sink.empty()
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
def size_list():
return list(range(60, 128)) + [512, 1514, 9214] + [60]*10
@ -222,6 +298,12 @@ if cocotb.SIM_NAME:
factory.add_option("ifg", [12])
factory.generate_tests()
for test in [run_test_underrun, run_test_error]:
factory = TestFactory(test)
factory.add_option("ifg", [12])
factory.generate_tests()
# cocotb-test

View File

@ -205,6 +205,82 @@ async def run_test_alignment(dut, payload_data=None, ifg=12):
await RisingEdge(dut.clk)
async def run_test_underrun(dut, ifg=12):
tb = TB(dut)
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
await tb.source.send(test_frame)
for k in range(16):
await RisingEdge(dut.clk)
tb.source.pause = True
for k in range(4):
await RisingEdge(dut.clk)
tb.source.pause = False
for k in range(3):
rx_frame = await tb.sink.recv()
if k == 1:
assert rx_frame.data[-1] == 0xFE
assert rx_frame.ctrl[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.ctrl is None
assert tb.sink.empty()
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
async def run_test_error(dut, ifg=12):
tb = TB(dut)
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
if k == 1:
test_frame.tuser = 1
await tb.source.send(test_frame)
for k in range(3):
rx_frame = await tb.sink.recv()
if k == 1:
assert rx_frame.data[-1] == 0xFE
assert rx_frame.ctrl[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.ctrl is None
assert tb.sink.empty()
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
def size_list():
return list(range(60, 128)) + [512, 1514, 9214] + [60]*10
@ -230,6 +306,12 @@ if cocotb.SIM_NAME:
factory.add_option("ifg", [12])
factory.generate_tests()
for test in [run_test_underrun, run_test_error]:
factory = TestFactory(test)
factory.add_option("ifg", [12])
factory.generate_tests()
# cocotb-test

View File

@ -313,6 +313,84 @@ async def run_test_tx_alignment(dut, payload_data=None, ifg=12):
await RisingEdge(dut.tx_clk)
async def run_test_tx_underrun(dut, ifg=12):
tb = TB(dut)
tb.xgmii_source.ifg = ifg
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
await tb.axis_source.send(test_frame)
for k in range(64*16 // tb.axis_source.width):
await RisingEdge(dut.tx_clk)
tb.axis_source.pause = True
for k in range(4):
await RisingEdge(dut.tx_clk)
tb.axis_source.pause = False
for k in range(3):
rx_frame = await tb.xgmii_sink.recv()
if k == 1:
assert rx_frame.data[-1] == 0xFE
assert rx_frame.ctrl[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.ctrl is None
assert tb.xgmii_sink.empty()
await RisingEdge(dut.tx_clk)
await RisingEdge(dut.tx_clk)
async def run_test_tx_error(dut, ifg=12):
tb = TB(dut)
tb.xgmii_source.ifg = ifg
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
if k == 1:
test_frame.tuser = 1
await tb.axis_source.send(test_frame)
for k in range(3):
rx_frame = await tb.xgmii_sink.recv()
if k == 1:
assert rx_frame.data[-1] == 0xFE
assert rx_frame.ctrl[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.ctrl is None
assert tb.xgmii_sink.empty()
await RisingEdge(dut.tx_clk)
await RisingEdge(dut.tx_clk)
async def run_test_lfc(dut, ifg=12):
tb = TB(dut)
@ -618,6 +696,12 @@ if cocotb.SIM_NAME:
factory.add_option("ifg", [12])
factory.generate_tests()
for test in [run_test_tx_underrun, run_test_tx_error]:
factory = TestFactory(test)
factory.add_option("ifg", [12])
factory.generate_tests()
if cocotb.top.PFC_ENABLE.value:
for test in [run_test_lfc, run_test_pfc]:
factory = TestFactory(test)

View File

@ -272,6 +272,100 @@ async def run_test_tx(dut, payload_lengths=None, payload_data=None, ifg=12, enab
await RisingEdge(dut.tx_clk)
async def run_test_tx_underrun(dut, ifg=12, enable_gen=None, mii_sel=False):
tb = TB(dut)
tb.gmii_source.ifg = ifg
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
tb.dut.rx_mii_select.value = mii_sel
tb.dut.tx_mii_select.value = mii_sel
if enable_gen is not None:
tb.set_enable_generator_rx(enable_gen())
tb.set_enable_generator_tx(enable_gen())
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
await tb.axis_source.send(test_frame)
for k in range(200 if mii_sel else 100):
while True:
await RisingEdge(dut.tx_clk)
if dut.tx_clk_enable.value.integer:
break
tb.axis_source.pause = True
for k in range(10):
while True:
await RisingEdge(dut.tx_clk)
if dut.tx_clk_enable.value.integer:
break
tb.axis_source.pause = False
for k in range(3):
rx_frame = await tb.gmii_sink.recv()
if k == 1:
assert rx_frame.error[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.gmii_sink.empty()
await RisingEdge(dut.tx_clk)
await RisingEdge(dut.tx_clk)
async def run_test_tx_error(dut, ifg=12, enable_gen=None, mii_sel=False):
tb = TB(dut)
tb.gmii_source.ifg = ifg
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
tb.dut.rx_mii_select.value = mii_sel
tb.dut.tx_mii_select.value = mii_sel
if enable_gen is not None:
tb.set_enable_generator_rx(enable_gen())
tb.set_enable_generator_tx(enable_gen())
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
if k == 1:
test_frame.tuser = 1
await tb.axis_source.send(test_frame)
for k in range(3):
rx_frame = await tb.gmii_sink.recv()
if k == 1:
assert rx_frame.error[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.gmii_sink.empty()
await RisingEdge(dut.tx_clk)
await RisingEdge(dut.tx_clk)
async def run_test_lfc(dut, ifg=12, enable_gen=None, mii_sel=True):
tb = TB(dut)
@ -586,6 +680,14 @@ if cocotb.SIM_NAME:
factory.add_option("mii_sel", [False, True])
factory.generate_tests()
for test in [run_test_tx_underrun, run_test_tx_error]:
factory = TestFactory(test)
factory.add_option("ifg", [12])
factory.add_option("enable_gen", [None, cycle_en])
factory.add_option("mii_sel", [False, True])
factory.generate_tests()
if cocotb.top.PFC_ENABLE.value:
for test in [run_test_lfc, run_test_pfc]:
factory = TestFactory(test)

View File

@ -35,7 +35,7 @@ from cocotb.triggers import RisingEdge
from cocotb.regression import TestFactory
from cocotbext.eth import GmiiFrame, GmiiPhy
from cocotbext.axi import AxiStreamBus, AxiStreamSource, AxiStreamSink
from cocotbext.axi import AxiStreamBus, AxiStreamSource, AxiStreamSink, AxiStreamFrame
class TB:
@ -152,6 +152,92 @@ async def run_test_tx(dut, payload_lengths=None, payload_data=None, ifg=12, spee
await RisingEdge(dut.tx_clk)
async def run_test_tx_underrun(dut, ifg=12, speed=1000e6):
tb = TB(dut, speed)
tb.gmii_phy.rx.ifg = ifg
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
tb.set_speed(speed)
await tb.reset()
for k in range(100):
await RisingEdge(dut.rx_clk)
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
await tb.axis_source.send(test_frame)
for k in range(200 if speed != 1000e6 else 100):
await RisingEdge(dut.tx_clk)
tb.axis_source.pause = True
for k in range(10):
await RisingEdge(dut.tx_clk)
tb.axis_source.pause = False
for k in range(3):
rx_frame = await tb.gmii_phy.tx.recv()
if k == 1:
assert rx_frame.error[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.gmii_phy.tx.empty()
await RisingEdge(dut.tx_clk)
await RisingEdge(dut.tx_clk)
async def run_test_tx_error(dut, ifg=12, speed=1000e6):
tb = TB(dut, speed)
tb.gmii_phy.rx.ifg = ifg
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
tb.set_speed(speed)
await tb.reset()
for k in range(100):
await RisingEdge(dut.rx_clk)
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
if k == 1:
test_frame.tuser = 1
await tb.axis_source.send(test_frame)
for k in range(3):
rx_frame = await tb.gmii_phy.tx.recv()
if k == 1:
assert rx_frame.error[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.gmii_phy.tx.empty()
await RisingEdge(dut.tx_clk)
await RisingEdge(dut.tx_clk)
def size_list():
return list(range(60, 128)) + [512, 1514] + [60]*10
@ -175,6 +261,13 @@ if cocotb.SIM_NAME:
factory.add_option("speed", [1000e6, 100e6, 10e6])
factory.generate_tests()
for test in [run_test_tx_underrun, run_test_tx_error]:
factory = TestFactory(test)
factory.add_option("ifg", [12])
factory.add_option("speed", [1000e6, 100e6, 10e6])
factory.generate_tests()
# cocotb-test

View File

@ -34,7 +34,7 @@ from cocotb.triggers import RisingEdge, Timer
from cocotb.regression import TestFactory
from cocotbext.eth import GmiiFrame, RgmiiPhy
from cocotbext.axi import AxiStreamBus, AxiStreamSource, AxiStreamSink
from cocotbext.axi import AxiStreamBus, AxiStreamSource, AxiStreamSink, AxiStreamFrame
class TB:
@ -159,6 +159,94 @@ async def run_test_tx(dut, payload_lengths=None, payload_data=None, ifg=12, spee
await RisingEdge(dut.tx_clk)
async def run_test_tx_underrun(dut, ifg=12, speed=1000e6):
tb = TB(dut, speed)
tb.rgmii_phy.rx.ifg = ifg
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
await tb.reset()
for k in range(100):
await RisingEdge(dut.rx_clk)
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
await tb.axis_source.send(test_frame)
for k in range(200 if speed != 1000e6 else 100):
while True:
await RisingEdge(dut.tx_clk)
if dut.mac_gmii_tx_clk_en.value.integer:
break
tb.axis_source.pause = True
for k in range(10):
while True:
await RisingEdge(dut.tx_clk)
if dut.mac_gmii_tx_clk_en.value.integer:
break
tb.axis_source.pause = False
for k in range(3):
rx_frame = await tb.rgmii_phy.tx.recv()
if k == 1:
assert rx_frame.error[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.rgmii_phy.tx.empty()
await RisingEdge(dut.tx_clk)
await RisingEdge(dut.tx_clk)
async def run_test_tx_error(dut, ifg=12, speed=1000e6):
tb = TB(dut, speed)
tb.rgmii_phy.rx.ifg = ifg
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
await tb.reset()
for k in range(100):
await RisingEdge(dut.rx_clk)
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
if k == 1:
test_frame.tuser = 1
await tb.axis_source.send(test_frame)
for k in range(3):
rx_frame = await tb.rgmii_phy.tx.recv()
if k == 1:
assert rx_frame.error[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.rgmii_phy.tx.empty()
await RisingEdge(dut.tx_clk)
await RisingEdge(dut.tx_clk)
def size_list():
return list(range(60, 128)) + [512, 1514] + [60]*10
@ -182,6 +270,13 @@ if cocotb.SIM_NAME:
factory.add_option("speed", [1000e6, 100e6, 10e6])
factory.generate_tests()
for test in [run_test_tx_underrun, run_test_tx_error]:
factory = TestFactory(test)
factory.add_option("ifg", [12])
factory.add_option("speed", [1000e6, 100e6, 10e6])
factory.generate_tests()
# cocotb-test

View File

@ -34,7 +34,7 @@ from cocotb.triggers import RisingEdge
from cocotb.regression import TestFactory
from cocotbext.eth import GmiiFrame, MiiPhy
from cocotbext.axi import AxiStreamBus, AxiStreamSource, AxiStreamSink
from cocotbext.axi import AxiStreamBus, AxiStreamSource, AxiStreamSink, AxiStreamFrame
class TB:
@ -122,6 +122,82 @@ async def run_test_tx(dut, payload_lengths=None, payload_data=None, ifg=12, spee
await RisingEdge(dut.tx_clk)
async def run_test_tx_underrun(dut, ifg=12):
tb = TB(dut)
tb.mii_phy.rx.ifg = ifg
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
await tb.axis_source.send(test_frame)
for k in range(200):
await RisingEdge(dut.tx_clk)
tb.axis_source.pause = True
for k in range(10):
await RisingEdge(dut.tx_clk)
tb.axis_source.pause = False
for k in range(3):
rx_frame = await tb.mii_phy.tx.recv()
if k == 1:
assert rx_frame.error[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.mii_phy.tx.empty()
await RisingEdge(dut.tx_clk)
await RisingEdge(dut.tx_clk)
async def run_test_tx_error(dut, ifg=12):
tb = TB(dut)
tb.mii_phy.rx.ifg = ifg
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
if k == 1:
test_frame.tuser = 1
await tb.axis_source.send(test_frame)
for k in range(3):
rx_frame = await tb.mii_phy.tx.recv()
if k == 1:
assert rx_frame.error[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.mii_phy.tx.empty()
await RisingEdge(dut.tx_clk)
await RisingEdge(dut.tx_clk)
def size_list():
return list(range(60, 128)) + [512, 1514] + [60]*10
@ -145,6 +221,12 @@ if cocotb.SIM_NAME:
factory.add_option("speed", [100e6, 10e6])
factory.generate_tests()
for test in [run_test_tx_underrun, run_test_tx_error]:
factory = TestFactory(test)
factory.add_option("ifg", [12])
factory.generate_tests()
# cocotb-test

View File

@ -288,6 +288,84 @@ async def run_test_tx_alignment(dut, payload_data=None, ifg=12):
await RisingEdge(dut.tx_clk)
async def run_test_tx_underrun(dut, ifg=12):
tb = TB(dut)
tb.serdes_source.ifg = ifg
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
await tb.axis_source.send(test_frame)
for k in range(64*16 // tb.axis_source.width):
await RisingEdge(dut.tx_clk)
tb.axis_source.pause = True
for k in range(4):
await RisingEdge(dut.tx_clk)
tb.axis_source.pause = False
for k in range(3):
rx_frame = await tb.serdes_sink.recv()
if k == 1:
assert rx_frame.data[-1] == 0xFE
assert rx_frame.ctrl[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.ctrl is None
assert tb.serdes_sink.empty()
await RisingEdge(dut.tx_clk)
await RisingEdge(dut.tx_clk)
async def run_test_tx_error(dut, ifg=12):
tb = TB(dut)
tb.serdes_source.ifg = ifg
tb.dut.cfg_ifg.value = ifg
tb.dut.cfg_tx_enable.value = 1
await tb.reset()
test_data = bytes(x for x in range(60))
for k in range(3):
test_frame = AxiStreamFrame(test_data)
if k == 1:
test_frame.tuser = 1
await tb.axis_source.send(test_frame)
for k in range(3):
rx_frame = await tb.serdes_sink.recv()
if k == 1:
assert rx_frame.data[-1] == 0xFE
assert rx_frame.ctrl[-1] == 1
else:
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.ctrl is None
assert tb.serdes_sink.empty()
await RisingEdge(dut.tx_clk)
await RisingEdge(dut.tx_clk)
async def run_test_rx_frame_sync(dut):
tb = TB(dut)
@ -353,6 +431,12 @@ if cocotb.SIM_NAME:
factory.add_option("ifg", [12])
factory.generate_tests()
for test in [run_test_tx_underrun, run_test_tx_error]:
factory = TestFactory(test)
factory.add_option("ifg", [12])
factory.generate_tests()
factory = TestFactory(run_test_rx_frame_sync)
factory.generate_tests()