1
0
mirror of https://github.com/corundum/corundum.git synced 2025-01-16 08:12:53 +08:00

Rework Ethernet module testbenches

This commit is contained in:
Alex Forencich 2014-09-25 00:36:08 -07:00
parent 5eaba1c3b3
commit 33a61d8d89
4 changed files with 660 additions and 1334 deletions

View File

@ -181,6 +181,35 @@ def bench():
def clkgen(): def clkgen():
clk.next = not clk clk.next = not clk
error_header_early_termination_asserted = Signal(bool(0))
@always(clk.posedge)
def monitor():
if (error_header_early_termination):
error_header_early_termination_asserted.next = 1
def wait_normal():
while input_axis_tvalid or output_eth_payload_tvalid:
yield clk.posedge
def wait_pause_source():
while input_axis_tvalid or output_eth_payload_tvalid:
source_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
source_pause.next = False
yield clk.posedge
def wait_pause_sink():
while input_axis_tvalid or output_eth_payload_tvalid:
sink_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
sink_pause.next = False
yield clk.posedge
@instance @instance
def check(): def check():
yield delay(100) yield delay(100)
@ -192,350 +221,185 @@ def bench():
yield delay(100) yield delay(100)
yield clk.posedge yield clk.posedge
yield clk.posedge for payload_len in range(1,18):
print("test 1: test packet")
current_test.next = 1
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
source_queue.put(test_frame.build_axis())
yield clk.posedge
yield output_eth_payload_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame
yield delay(100)
yield clk.posedge
print("test 2: longer packet")
current_test.next = 2
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = bytearray(range(256))
source_queue.put(test_frame.build_axis())
yield clk.posedge
yield output_eth_payload_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame
yield delay(100)
yield clk.posedge
print("test 3: test packet with pauses")
current_test.next = 3
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = bytearray(range(256))
source_queue.put(test_frame.build_axis())
yield clk.posedge
yield delay(64)
yield clk.posedge
source_pause.next = True
yield delay(32)
yield clk.posedge
source_pause.next = False
yield delay(64)
yield clk.posedge
sink_pause.next = True
yield delay(32)
yield clk.posedge
sink_pause.next = False
yield output_eth_payload_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame
yield delay(100)
yield clk.posedge
print("test 4: back-to-back packets")
current_test.next = 4
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = b'\x01\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = b'\x01\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
source_queue.put(test_frame1.build_axis())
source_queue.put(test_frame2.build_axis())
yield clk.posedge
yield output_eth_payload_tlast.posedge
yield clk.posedge
yield output_eth_payload_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 5: alternate pause source")
current_test.next = 5
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(32))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(32))
source_queue.put(test_frame1.build_axis())
source_queue.put(test_frame2.build_axis())
yield clk.posedge
while input_axis_tvalid or output_eth_payload_tvalid:
source_pause.next = True
yield clk.posedge yield clk.posedge
print("test 1: test packet, length %d" % payload_len)
current_test.next = 1
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = bytearray(range(payload_len))
axis_frame = test_frame.build_axis()
for wait in wait_normal, wait_pause_source, wait_pause_sink:
source_queue.put(axis_frame)
yield clk.posedge
yield clk.posedge
yield wait()
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame
assert sink_queue.empty()
yield delay(100)
yield clk.posedge yield clk.posedge
print("test 2: back-to-back packets, length %d" % payload_len)
current_test.next = 2
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(payload_len))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(payload_len))
axis_frame1 = test_frame1.build_axis()
axis_frame2 = test_frame2.build_axis()
for wait in wait_normal, wait_pause_source, wait_pause_sink:
source_queue.put(axis_frame1)
source_queue.put(axis_frame2)
yield clk.posedge
yield clk.posedge
yield wait()
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame2
assert sink_queue.empty()
yield delay(100)
yield clk.posedge yield clk.posedge
source_pause.next = False print("test 3: tuser assert, length %d" % payload_len)
current_test.next = 3
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(payload_len))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(payload_len))
axis_frame1 = test_frame1.build_axis()
axis_frame2 = test_frame2.build_axis()
axis_frame1.user = 1
for wait in wait_normal, wait_pause_source, wait_pause_sink:
source_queue.put(axis_frame1)
source_queue.put(axis_frame2)
yield clk.posedge
yield clk.posedge
yield wait()
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame1
assert rx_frame.payload.user[-1]
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame2
assert sink_queue.empty()
yield delay(100)
for length in range(1,15):
yield clk.posedge yield clk.posedge
print("test 4: truncated packet, length %d" % length)
current_test.next = 4
yield clk.posedge test_frame1 = eth_ep.EthFrame()
yield clk.posedge test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(16))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(16))
rx_frame = None axis_frame1 = test_frame1.build_axis()
if not sink_queue.empty(): axis_frame2 = test_frame2.build_axis()
rx_frame = sink_queue.get()
assert rx_frame == test_frame1 axis_frame1.data = axis_frame1.data[:length]
rx_frame = None for wait in wait_normal, wait_pause_source, wait_pause_sink:
if not sink_queue.empty(): error_header_early_termination_asserted.next = 0
rx_frame = sink_queue.get()
assert rx_frame == test_frame2 source_queue.put(axis_frame1)
source_queue.put(axis_frame2)
yield clk.posedge
yield clk.posedge
yield delay(100) yield wait()
yield clk.posedge yield clk.posedge
print("test 6: alternate pause sink") yield clk.posedge
current_test.next = 6 yield clk.posedge
test_frame1 = eth_ep.EthFrame() assert error_header_early_termination_asserted
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(32))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(32))
source_queue.put(test_frame1.build_axis())
source_queue.put(test_frame2.build_axis())
yield clk.posedge
while input_axis_tvalid or output_eth_payload_tvalid: rx_frame = None
sink_pause.next = True if not sink_queue.empty():
yield clk.posedge rx_frame = sink_queue.get()
yield clk.posedge
yield clk.posedge
sink_pause.next = False
yield clk.posedge
yield clk.posedge assert rx_frame == test_frame2
yield clk.posedge
rx_frame = None assert sink_queue.empty()
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame1 yield delay(100)
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 7: alternate pause source 2")
current_test.next = 7
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(33))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(33))
source_queue.put(test_frame1.build_axis())
source_queue.put(test_frame2.build_axis())
yield clk.posedge
while input_axis_tvalid or output_eth_payload_tvalid:
source_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
source_pause.next = False
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 8: alternate pause sink 2")
current_test.next = 8
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(33))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(33))
source_queue.put(test_frame1.build_axis())
source_queue.put(test_frame2.build_axis())
yield clk.posedge
while input_axis_tvalid or output_eth_payload_tvalid:
sink_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
sink_pause.next = False
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 9: tuser assert")
current_test.next = 9
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
axi_frame = test_frame.build_axis()
axi_frame.user = 1
source_queue.put(axi_frame)
yield clk.posedge
yield output_eth_payload_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame
assert rx_frame.payload.user[-1]
yield delay(100)
yield clk.posedge
print("test 10: truncated packet")
current_test.next = 10
test_frame = axis_ep.AXIStreamFrame()
test_frame.data = bytearray(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09')
source_queue.put(test_frame)
yield clk.posedge
yield input_axis_tlast.posedge
yield clk.posedge
yield clk.posedge
assert error_header_early_termination
yield delay(100)
raise StopSimulation raise StopSimulation
return dut, source, sink, clkgen, check return dut, source, sink, clkgen, monitor, check
def test_bench(): def test_bench():
sim = Simulation(bench()) sim = Simulation(bench())

View File

@ -191,6 +191,35 @@ def bench():
def clkgen(): def clkgen():
clk.next = not clk clk.next = not clk
error_header_early_termination_asserted = Signal(bool(0))
@always(clk.posedge)
def monitor():
if (error_header_early_termination):
error_header_early_termination_asserted.next = 1
def wait_normal():
while input_axis_tvalid or output_eth_payload_tvalid:
yield clk.posedge
def wait_pause_source():
while input_axis_tvalid or output_eth_payload_tvalid:
source_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
source_pause.next = False
yield clk.posedge
def wait_pause_sink():
while input_axis_tvalid or output_eth_payload_tvalid:
sink_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
sink_pause.next = False
yield clk.posedge
@instance @instance
def check(): def check():
yield delay(100) yield delay(100)
@ -202,350 +231,185 @@ def bench():
yield delay(100) yield delay(100)
yield clk.posedge yield clk.posedge
yield clk.posedge for payload_len in range(1,18):
print("test 1: test packet")
current_test.next = 1
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
source_queue.put(test_frame.build_axis())
yield clk.posedge
yield output_eth_payload_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame
yield delay(100)
yield clk.posedge
print("test 2: longer packet")
current_test.next = 2
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = bytearray(range(256))
source_queue.put(test_frame.build_axis())
yield clk.posedge
yield output_eth_payload_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame
yield delay(100)
yield clk.posedge
print("test 3: test packet with pauses")
current_test.next = 3
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = bytearray(range(256))
source_queue.put(test_frame.build_axis())
yield clk.posedge
yield delay(64)
yield clk.posedge
source_pause.next = True
yield delay(32)
yield clk.posedge
source_pause.next = False
yield delay(64)
yield clk.posedge
sink_pause.next = True
yield delay(32)
yield clk.posedge
sink_pause.next = False
yield output_eth_payload_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame
yield delay(100)
yield clk.posedge
print("test 4: back-to-back packets")
current_test.next = 4
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = b'\x01\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = b'\x01\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
source_queue.put(test_frame1.build_axis())
source_queue.put(test_frame2.build_axis())
yield clk.posedge
yield output_eth_payload_tlast.posedge
yield clk.posedge
yield output_eth_payload_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 5: alternate pause source")
current_test.next = 5
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(32))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(32))
source_queue.put(test_frame1.build_axis())
source_queue.put(test_frame2.build_axis())
yield clk.posedge
while input_axis_tvalid or output_eth_payload_tvalid:
source_pause.next = True
yield clk.posedge yield clk.posedge
print("test 1: test packet, length %d" % payload_len)
current_test.next = 1
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = bytearray(range(payload_len))
axis_frame = test_frame.build_axis()
for wait in wait_normal, wait_pause_source, wait_pause_sink:
source_queue.put(axis_frame)
yield clk.posedge
yield clk.posedge
yield wait()
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame
assert sink_queue.empty()
yield delay(100)
yield clk.posedge yield clk.posedge
print("test 2: back-to-back packets, length %d" % payload_len)
current_test.next = 2
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(payload_len))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(payload_len))
axis_frame1 = test_frame1.build_axis()
axis_frame2 = test_frame2.build_axis()
for wait in wait_normal, wait_pause_source, wait_pause_sink:
source_queue.put(axis_frame1)
source_queue.put(axis_frame2)
yield clk.posedge
yield clk.posedge
yield wait()
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame2
assert sink_queue.empty()
yield delay(100)
yield clk.posedge yield clk.posedge
source_pause.next = False print("test 3: tuser assert, length %d" % payload_len)
current_test.next = 3
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(payload_len))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(payload_len))
axis_frame1 = test_frame1.build_axis()
axis_frame2 = test_frame2.build_axis()
axis_frame1.user = 1
for wait in wait_normal, wait_pause_source, wait_pause_sink:
source_queue.put(axis_frame1)
source_queue.put(axis_frame2)
yield clk.posedge
yield clk.posedge
yield wait()
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame1
assert rx_frame.payload.user[-1]
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame2
assert sink_queue.empty()
yield delay(100)
for length in range(1,15):
yield clk.posedge yield clk.posedge
print("test 4: truncated packet, length %d" % length)
current_test.next = 4
yield clk.posedge test_frame1 = eth_ep.EthFrame()
yield clk.posedge test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(16))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(16))
rx_frame = None axis_frame1 = test_frame1.build_axis()
if not sink_queue.empty(): axis_frame2 = test_frame2.build_axis()
rx_frame = sink_queue.get()
assert rx_frame == test_frame1 axis_frame1.data = axis_frame1.data[:length]
rx_frame = None for wait in wait_normal, wait_pause_source, wait_pause_sink:
if not sink_queue.empty(): error_header_early_termination_asserted.next = 0
rx_frame = sink_queue.get()
assert rx_frame == test_frame2 source_queue.put(axis_frame1)
source_queue.put(axis_frame2)
yield clk.posedge
yield clk.posedge
yield delay(100) yield wait()
yield clk.posedge yield clk.posedge
print("test 6: alternate pause sink") yield clk.posedge
current_test.next = 6 yield clk.posedge
test_frame1 = eth_ep.EthFrame() assert error_header_early_termination_asserted
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(32))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(32))
source_queue.put(test_frame1.build_axis())
source_queue.put(test_frame2.build_axis())
yield clk.posedge
while input_axis_tvalid or output_eth_payload_tvalid: rx_frame = None
sink_pause.next = True if not sink_queue.empty():
yield clk.posedge rx_frame = sink_queue.get()
yield clk.posedge
yield clk.posedge
sink_pause.next = False
yield clk.posedge
yield clk.posedge assert rx_frame == test_frame2
yield clk.posedge
rx_frame = None assert sink_queue.empty()
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame1 yield delay(100)
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 7: alternate pause source 2")
current_test.next = 7
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(33))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(33))
source_queue.put(test_frame1.build_axis())
source_queue.put(test_frame2.build_axis())
yield clk.posedge
while input_axis_tvalid or output_eth_payload_tvalid:
source_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
source_pause.next = False
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 8: alternate pause sink 2")
current_test.next = 8
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(33))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(33))
source_queue.put(test_frame1.build_axis())
source_queue.put(test_frame2.build_axis())
yield clk.posedge
while input_axis_tvalid or output_eth_payload_tvalid:
sink_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
sink_pause.next = False
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 9: tuser assert")
current_test.next = 9
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
axi_frame = test_frame.build_axis()
axi_frame.user = 1
source_queue.put(axi_frame)
yield clk.posedge
yield output_eth_payload_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame == test_frame
assert rx_frame.payload.user[-1]
yield delay(100)
yield clk.posedge
print("test 10: truncated packet")
current_test.next = 10
test_frame = axis_ep.AXIStreamFrame()
test_frame.data = bytearray(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09')
source_queue.put(test_frame)
yield clk.posedge
yield input_axis_tlast.posedge
yield clk.posedge
yield clk.posedge
assert error_header_early_termination
yield delay(100)
raise StopSimulation raise StopSimulation
return dut, source, sink, clkgen, check return dut, source, sink, clkgen, monitor, check
def test_bench(): def test_bench():
sim = Simulation(bench()) sim = Simulation(bench())

View File

@ -177,6 +177,28 @@ def bench():
def clkgen(): def clkgen():
clk.next = not clk clk.next = not clk
def wait_normal():
while input_eth_payload_tvalid or output_axis_tvalid or input_eth_hdr_valid:
yield clk.posedge
def wait_pause_source():
while input_eth_payload_tvalid or output_axis_tvalid or input_eth_hdr_valid:
source_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
source_pause.next = False
yield clk.posedge
def wait_pause_sink():
while input_eth_payload_tvalid or output_axis_tvalid or input_eth_hdr_valid:
sink_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
sink_pause.next = False
yield clk.posedge
@instance @instance
def check(): def check():
yield delay(100) yield delay(100)
@ -188,364 +210,141 @@ def bench():
yield delay(100) yield delay(100)
yield clk.posedge yield clk.posedge
yield clk.posedge for payload_len in range(1,18):
print("test 1: test packet")
current_test.next = 1
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
source_queue.put(test_frame)
yield output_axis_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame
yield delay(100)
yield clk.posedge
print("test 2: longer packet")
current_test.next = 2
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = bytearray(range(256))
source_queue.put(test_frame)
yield output_axis_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame
yield delay(100)
yield clk.posedge
print("test 3: test packet with pauses")
current_test.next = 3
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = bytearray(range(256))
source_queue.put(test_frame)
yield delay(64)
yield clk.posedge
sink_pause.next = True
yield delay(32)
yield clk.posedge
sink_pause.next = False
yield delay(64)
yield clk.posedge
source_pause.next = True
yield delay(32)
yield clk.posedge
source_pause.next = False
yield output_axis_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame
yield delay(100)
yield clk.posedge
print("test 4: back-to-back packets")
current_test.next = 4
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = b'\x01\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = b'\x02\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
source_queue.put(test_frame1)
source_queue.put(test_frame2)
yield output_axis_tlast.posedge
yield output_axis_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 5: alternate pause source")
current_test.next = 5
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(32))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(32))
source_queue.put(test_frame1)
source_queue.put(test_frame2)
yield clk.posedge
yield clk.posedge
while input_eth_payload_tvalid:
source_pause.next = True
yield clk.posedge yield clk.posedge
print("test 1: test packet, length %d" % payload_len)
current_test.next = 1
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = bytearray(range(payload_len))
for wait in wait_normal, wait_pause_source, wait_pause_sink:
source_queue.put(test_frame)
yield clk.posedge
yield clk.posedge
yield wait()
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame
assert sink_queue.empty()
yield delay(100)
yield clk.posedge yield clk.posedge
print("test 2: back-to-back packets, length %d" % payload_len)
current_test.next = 2
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(payload_len))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(payload_len))
for wait in wait_normal, wait_pause_source, wait_pause_sink:
source_queue.put(test_frame1)
source_queue.put(test_frame2)
yield clk.posedge
yield clk.posedge
yield wait()
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame2
assert sink_queue.empty()
yield delay(100)
yield clk.posedge yield clk.posedge
source_pause.next = False print("test 3: tuser assert, length %d" % payload_len)
yield clk.posedge current_test.next = 3
yield clk.posedge test_frame1 = eth_ep.EthFrame()
yield clk.posedge test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
rx_frame = None test_frame1.eth_src_mac = 0x5A5152535455
if not sink_queue.empty(): test_frame1.eth_type = 0x8000
rx_frame = sink_queue.get() test_frame1.payload = bytearray(range(payload_len))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(payload_len))
check_frame = eth_ep.EthFrame() test_frame1.payload.user = 1
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame1 for wait in wait_normal, wait_pause_source, wait_pause_sink:
source_queue.put(test_frame1)
source_queue.put(test_frame2)
yield clk.posedge
yield clk.posedge
rx_frame = None yield wait()
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame() yield clk.posedge
check_frame.parse_axis(rx_frame) yield clk.posedge
yield clk.posedge
assert check_frame == test_frame2 rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
yield delay(100) check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
yield clk.posedge assert check_frame == test_frame1
print("test 6: alternate pause sink") assert rx_frame.user[-1]
current_test.next = 6
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
test_frame1 = eth_ep.EthFrame() check_frame = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5 check_frame.parse_axis(rx_frame)
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(32))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(32))
source_queue.put(test_frame1)
source_queue.put(test_frame2)
yield clk.posedge
yield clk.posedge
while input_eth_payload_tvalid or output_axis_tvalid: assert check_frame == test_frame2
sink_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
sink_pause.next = False
yield clk.posedge
yield clk.posedge assert sink_queue.empty()
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame() yield delay(100)
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 7: alternate pause source 2")
current_test.next = 7
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(33))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(33))
source_queue.put(test_frame1)
source_queue.put(test_frame2)
yield clk.posedge
yield clk.posedge
while input_eth_payload_tvalid:
source_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
source_pause.next = False
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 8: alternate pause sink 2")
current_test.next = 8
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(33))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(33))
source_queue.put(test_frame1)
source_queue.put(test_frame2)
yield clk.posedge
yield clk.posedge
while input_eth_payload_tvalid or output_axis_tvalid:
sink_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
sink_pause.next = False
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 9: tuser assert")
current_test.next = 9
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
test_frame.payload.user = 1
source_queue.put(test_frame)
yield output_axis_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame
assert rx_frame.user[-1]
yield delay(100)
raise StopSimulation raise StopSimulation

View File

@ -187,6 +187,28 @@ def bench():
def clkgen(): def clkgen():
clk.next = not clk clk.next = not clk
def wait_normal():
while input_eth_payload_tvalid or output_axis_tvalid or input_eth_hdr_valid:
yield clk.posedge
def wait_pause_source():
while input_eth_payload_tvalid or output_axis_tvalid or input_eth_hdr_valid:
source_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
source_pause.next = False
yield clk.posedge
def wait_pause_sink():
while input_eth_payload_tvalid or output_axis_tvalid or input_eth_hdr_valid:
sink_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
sink_pause.next = False
yield clk.posedge
@instance @instance
def check(): def check():
yield delay(100) yield delay(100)
@ -198,364 +220,141 @@ def bench():
yield delay(100) yield delay(100)
yield clk.posedge yield clk.posedge
yield clk.posedge for payload_len in range(1,18):
print("test 1: test packet")
current_test.next = 1
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
source_queue.put(test_frame)
yield output_axis_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame
yield delay(100)
yield clk.posedge
print("test 2: longer packet")
current_test.next = 2
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = bytearray(range(256))
source_queue.put(test_frame)
yield output_axis_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame
yield delay(100)
yield clk.posedge
print("test 3: test packet with pauses")
current_test.next = 3
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = bytearray(range(256))
source_queue.put(test_frame)
yield delay(64)
yield clk.posedge
sink_pause.next = True
yield delay(32)
yield clk.posedge
sink_pause.next = False
yield delay(64)
yield clk.posedge
source_pause.next = True
yield delay(32)
yield clk.posedge
source_pause.next = False
yield output_axis_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame
yield delay(100)
yield clk.posedge
print("test 4: back-to-back packets")
current_test.next = 4
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = b'\x01\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = b'\x02\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
source_queue.put(test_frame1)
source_queue.put(test_frame2)
yield output_axis_tlast.posedge
yield output_axis_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 5: alternate pause source")
current_test.next = 5
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(32))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(32))
source_queue.put(test_frame1)
source_queue.put(test_frame2)
yield clk.posedge
yield clk.posedge
while input_eth_payload_tvalid:
source_pause.next = True
yield clk.posedge yield clk.posedge
print("test 1: test packet, length %d" % payload_len)
current_test.next = 1
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = bytearray(range(payload_len))
for wait in wait_normal, wait_pause_source, wait_pause_sink:
source_queue.put(test_frame)
yield clk.posedge
yield clk.posedge
yield wait()
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame
assert sink_queue.empty()
yield delay(100)
yield clk.posedge yield clk.posedge
print("test 2: back-to-back packets, length %d" % payload_len)
current_test.next = 2
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(payload_len))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(payload_len))
for wait in wait_normal, wait_pause_source, wait_pause_sink:
source_queue.put(test_frame1)
source_queue.put(test_frame2)
yield clk.posedge
yield clk.posedge
yield wait()
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame2
assert sink_queue.empty()
yield delay(100)
yield clk.posedge yield clk.posedge
source_pause.next = False print("test 3: tuser assert, length %d" % payload_len)
yield clk.posedge current_test.next = 3
yield clk.posedge test_frame1 = eth_ep.EthFrame()
yield clk.posedge test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
rx_frame = None test_frame1.eth_src_mac = 0x5A5152535455
if not sink_queue.empty(): test_frame1.eth_type = 0x8000
rx_frame = sink_queue.get() test_frame1.payload = bytearray(range(payload_len))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(payload_len))
check_frame = eth_ep.EthFrame() test_frame1.payload.user = 1
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame1 for wait in wait_normal, wait_pause_source, wait_pause_sink:
source_queue.put(test_frame1)
source_queue.put(test_frame2)
yield clk.posedge
yield clk.posedge
rx_frame = None yield wait()
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame() yield clk.posedge
check_frame.parse_axis(rx_frame) yield clk.posedge
yield clk.posedge
assert check_frame == test_frame2 rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
yield delay(100) check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
yield clk.posedge assert check_frame == test_frame1
print("test 6: alternate pause sink") assert rx_frame.user[-1]
current_test.next = 6
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
test_frame1 = eth_ep.EthFrame() check_frame = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5 check_frame.parse_axis(rx_frame)
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(32))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(32))
source_queue.put(test_frame1)
source_queue.put(test_frame2)
yield clk.posedge
yield clk.posedge
while input_eth_payload_tvalid or output_axis_tvalid: assert check_frame == test_frame2
sink_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
sink_pause.next = False
yield clk.posedge
yield clk.posedge assert sink_queue.empty()
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame() yield delay(100)
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 7: alternate pause source 2")
current_test.next = 7
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(33))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(33))
source_queue.put(test_frame1)
source_queue.put(test_frame2)
yield clk.posedge
yield clk.posedge
while input_eth_payload_tvalid:
source_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
source_pause.next = False
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 8: alternate pause sink 2")
current_test.next = 8
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(33))
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(33))
source_queue.put(test_frame1)
source_queue.put(test_frame2)
yield clk.posedge
yield clk.posedge
while input_eth_payload_tvalid or output_axis_tvalid:
sink_pause.next = True
yield clk.posedge
yield clk.posedge
yield clk.posedge
sink_pause.next = False
yield clk.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame1
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame2
yield delay(100)
yield clk.posedge
print("test 9: tuser assert")
current_test.next = 9
test_frame = eth_ep.EthFrame()
test_frame.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame.eth_src_mac = 0x5A5152535455
test_frame.eth_type = 0x8000
test_frame.payload = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
test_frame.payload.user = 1
source_queue.put(test_frame)
yield output_axis_tlast.posedge
yield clk.posedge
yield clk.posedge
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
check_frame = eth_ep.EthFrame()
check_frame.parse_axis(rx_frame)
assert check_frame == test_frame
assert rx_frame.user[-1]
yield delay(100)
raise StopSimulation raise StopSimulation