1
0
mirror of https://github.com/corundum/corundum.git synced 2025-01-16 08:12:53 +08:00

Add GMIIFrame object and add tests and asserts for GMII error signal

This commit is contained in:
Alex Forencich 2015-05-07 19:10:44 -07:00
parent 3a180bd24f
commit f93310b85b
4 changed files with 179 additions and 27 deletions

View File

@ -24,6 +24,63 @@ THE SOFTWARE.
from myhdl import *
class GMIIFrame(object):
def __init__(self, data=b'', error=None):
self.data = b''
self.error = None
if type(data) is GMIIFrame:
self.data = data.data
self.error = data.error
else:
self.data = bytearray(data)
def build(self):
if self.data is None:
return
f = list(self.data)
d = []
er = []
i = 0
assert_er = False
if (type(self.error) is int or type(self.error) is bool) and self.error:
assert_er = True
self.error = None
while len(f) > 0:
d.append(f.pop(0))
if self.error is None:
er.append(0)
else:
er.append(self.error[i])
i += 1
if assert_er:
er[-1] = 1
self.error = 1
return d, er
def parse(self, d, er):
if d is None or er is None:
return
self.data = bytearray(d)
self.error = er
def __eq__(self, other):
if type(other) is GMIIFrame:
return self.data == other.data
def __repr__(self):
return 'GMIIFrame(data=%s, error=%s)' % (repr(self.data), repr(self.error))
def __iter__(self):
return self.data.__iter__()
def GMIISource(clk, rst,
txd,
tx_en,
@ -33,34 +90,45 @@ def GMIISource(clk, rst,
@instance
def logic():
frame = []
frame = None
d = []
er = []
ifg_cnt = 0
while True:
yield clk.posedge, rst.posedge
if rst:
frame = []
frame = None
txd.next = 0
tx_en.next = 0
tx_er.next = 0
d = []
er = []
ifg_cnt = 0
else:
if ifg_cnt > 0:
ifg_cnt -= 1
txd.next = 0
tx_er.next = 0
tx_en.next = 0
elif len(frame) > 0:
txd.next = frame.pop(0)
elif len(d) > 0:
txd.next = d.pop(0)
tx_er.next = er.pop(0)
tx_en.next = 1
if len(frame) == 0:
if len(d) == 0:
ifg_cnt = 12
elif not fifo.empty():
frame = list(fifo.get())
frame = GMIIFrame(fifo.get())
d, er = frame.build()
if name is not None:
print("[%s] Sending frame %s" % (name, repr(frame)))
txd.next = frame.pop(0)
txd.next = d.pop(0)
tx_er.next = er.pop(0)
tx_en.next = 1
else:
txd.next = 0
tx_er.next = 0
tx_en.next = 0
return logic
@ -76,24 +144,33 @@ def GMIISink(clk, rst,
@instance
def logic():
frame = None
d = []
er = []
while True:
yield clk.posedge, rst.posedge
if rst:
frame = None
d = []
er = []
else:
if rx_dv:
if frame is None:
frame = []
frame.append(int(rxd))
frame = GMIIFrame()
d = []
er = []
d.append(int(rxd))
er.append(int(rx_er))
elif frame is not None:
if len(frame) > 0:
frame = bytearray(frame)
if len(d) > 0:
frame.parse(d, er)
if fifo is not None:
fifo.put(frame)
if name is not None:
print("[%s] Got frame %s" % (name, repr(frame)))
frame = None
d = []
er = []
return logic

View File

@ -318,10 +318,10 @@ def bench():
if not gmii_sink_queue.empty():
rx_frame = gmii_sink_queue.get()
assert rx_frame[0:8] == bytearray(b'\x55\x55\x55\x55\x55\x55\x55\xD5')
assert rx_frame.data[0:8] == bytearray(b'\x55\x55\x55\x55\x55\x55\x55\xD5')
eth_frame = eth_ep.EthFrame()
eth_frame.parse_axis_fcs(rx_frame[8:])
eth_frame.parse_axis_fcs(rx_frame.data[8:])
print(hex(eth_frame.eth_fcs))
print(hex(eth_frame.calc_fcs()))

View File

@ -182,8 +182,9 @@ def bench():
test_frame.update_fcs()
axis_frame = test_frame.build_axis_fcs()
gmii_frame = gmii_ep.GMIIFrame(b'\x55\x55\x55\x55\x55\x55\x55\xD5'+bytearray(axis_frame))
source_queue.put(b'\x55\x55\x55\x55\x55\x55\x55\xD5'+bytearray(axis_frame))
source_queue.put(gmii_frame)
yield clk.posedge
yield clk.posedge
@ -227,9 +228,11 @@ def bench():
axis_frame1 = test_frame1.build_axis_fcs()
axis_frame2 = test_frame2.build_axis_fcs()
gmii_frame1 = gmii_ep.GMIIFrame(b'\x55\x55\x55\x55\x55\x55\x55\xD5'+bytearray(axis_frame1))
gmii_frame2 = gmii_ep.GMIIFrame(b'\x55\x55\x55\x55\x55\x55\x55\xD5'+bytearray(axis_frame2))
source_queue.put(b'\x55\x55\x55\x55\x55\x55\x55\xD5'+bytearray(axis_frame1))
source_queue.put(b'\x55\x55\x55\x55\x55\x55\x55\xD5'+bytearray(axis_frame2))
source_queue.put(gmii_frame1)
source_queue.put(gmii_frame2)
yield clk.posedge
yield clk.posedge
@ -294,8 +297,11 @@ def bench():
error_bad_frame_asserted.next = 0
error_bad_fcs_asserted.next = 0
source_queue.put(b'\x55\x55\x55\x55\x55\x55\x55\xD5'+bytearray(axis_frame1))
source_queue.put(b'\x55\x55\x55\x55\x55\x55\x55\xD5'+bytearray(axis_frame2))
gmii_frame1 = gmii_ep.GMIIFrame(b'\x55\x55\x55\x55\x55\x55\x55\xD5'+bytearray(axis_frame1))
gmii_frame2 = gmii_ep.GMIIFrame(b'\x55\x55\x55\x55\x55\x55\x55\xD5'+bytearray(axis_frame2))
source_queue.put(gmii_frame1)
source_queue.put(gmii_frame2)
yield clk.posedge
yield clk.posedge
@ -334,6 +340,74 @@ def bench():
yield delay(100)
yield clk.posedge
print("test 4: errored frame, length %d" % payload_len)
current_test.next = 4
test_frame1 = eth_ep.EthFrame()
test_frame1.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame1.eth_src_mac = 0x5A5152535455
test_frame1.eth_type = 0x8000
test_frame1.payload = bytearray(range(payload_len))
test_frame1.update_fcs()
test_frame2 = eth_ep.EthFrame()
test_frame2.eth_dest_mac = 0xDAD1D2D3D4D5
test_frame2.eth_src_mac = 0x5A5152535455
test_frame2.eth_type = 0x8000
test_frame2.payload = bytearray(range(payload_len))
test_frame2.update_fcs()
axis_frame1 = test_frame1.build_axis_fcs()
axis_frame2 = test_frame2.build_axis_fcs()
error_bad_frame_asserted.next = 0
error_bad_fcs_asserted.next = 0
gmii_frame1 = gmii_ep.GMIIFrame(b'\x55\x55\x55\x55\x55\x55\x55\xD5'+bytearray(axis_frame1))
gmii_frame2 = gmii_ep.GMIIFrame(b'\x55\x55\x55\x55\x55\x55\x55\xD5'+bytearray(axis_frame2))
gmii_frame1.error = 1
source_queue.put(gmii_frame1)
source_queue.put(gmii_frame2)
yield clk.posedge
yield clk.posedge
while gmii_rx_dv or output_axis_tvalid or not source_queue.empty():
yield clk.posedge
yield clk.posedge
while gmii_rx_dv or output_axis_tvalid or not source_queue.empty():
yield clk.posedge
yield clk.posedge
yield clk.posedge
yield clk.posedge
assert error_bad_frame_asserted
assert not error_bad_fcs_asserted
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame.user[-1]
rx_frame = None
if not sink_queue.empty():
rx_frame = sink_queue.get()
eth_frame = eth_ep.EthFrame()
eth_frame.parse_axis(rx_frame)
eth_frame.update_fcs()
assert eth_frame == test_frame2
assert sink_queue.empty()
yield delay(100)
raise StopSimulation
return dut, monitor, source, sink, clkgen, check

View File

@ -194,10 +194,10 @@ def bench():
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame[0:8] == bytearray(b'\x55\x55\x55\x55\x55\x55\x55\xD5')
assert rx_frame.data[0:8] == bytearray(b'\x55\x55\x55\x55\x55\x55\x55\xD5')
eth_frame = eth_ep.EthFrame()
eth_frame.parse_axis_fcs(rx_frame[8:])
eth_frame.parse_axis_fcs(rx_frame.data[8:])
print(hex(eth_frame.eth_fcs))
print(hex(eth_frame.calc_fcs()))
@ -249,10 +249,10 @@ def bench():
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame[0:8] == bytearray(b'\x55\x55\x55\x55\x55\x55\x55\xD5')
assert rx_frame.data[0:8] == bytearray(b'\x55\x55\x55\x55\x55\x55\x55\xD5')
eth_frame = eth_ep.EthFrame()
eth_frame.parse_axis_fcs(rx_frame[8:])
eth_frame.parse_axis_fcs(rx_frame.data[8:])
print(hex(eth_frame.eth_fcs))
print(hex(eth_frame.calc_fcs()))
@ -268,10 +268,10 @@ def bench():
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame[0:8] == bytearray(b'\x55\x55\x55\x55\x55\x55\x55\xD5')
assert rx_frame.data[0:8] == bytearray(b'\x55\x55\x55\x55\x55\x55\x55\xD5')
eth_frame = eth_ep.EthFrame()
eth_frame.parse_axis_fcs(rx_frame[8:])
eth_frame.parse_axis_fcs(rx_frame.data[8:])
print(hex(eth_frame.eth_fcs))
print(hex(eth_frame.calc_fcs()))
@ -325,7 +325,8 @@ def bench():
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame[0:8] == bytearray(b'\x55\x55\x55\x55\x55\x55\x55\xD5')
assert rx_frame.data[0:8] == bytearray(b'\x55\x55\x55\x55\x55\x55\x55\xD5')
assert rx_frame.error[-1]
# bad packet
@ -333,10 +334,10 @@ def bench():
if not sink_queue.empty():
rx_frame = sink_queue.get()
assert rx_frame[0:8] == bytearray(b'\x55\x55\x55\x55\x55\x55\x55\xD5')
assert rx_frame.data[0:8] == bytearray(b'\x55\x55\x55\x55\x55\x55\x55\xD5')
eth_frame = eth_ep.EthFrame()
eth_frame.parse_axis_fcs(rx_frame[8:])
eth_frame.parse_axis_fcs(rx_frame.data[8:])
print(hex(eth_frame.eth_fcs))
print(hex(eth_frame.calc_fcs()))