1
0
mirror of https://github.com/corundum/corundum.git synced 2025-01-30 08:32:52 +08:00

Update AXI simulation model

This commit is contained in:
Alex Forencich 2020-07-02 21:28:35 -07:00
parent 281e1a2156
commit ebae4e436d

292
tb/axi.py
View File

@ -73,6 +73,15 @@ AWCACHE_WRITE_BACK_READ_ALLOC = 0b0111
AWCACHE_WRITE_BACK_WRITE_ALLOC = 0b1111
AWCACHE_WRITE_BACK_READ_AND_WRIE_ALLOC = 0b1111
PROT_PRIVILEGED = 0b001
PROT_NONSECURE = 0b010
PROT_INSTRUCTION = 0b100
RESP_OKAY = 0b00
RESP_EXOKAY = 0b01
RESP_SLVERR = 0b10
RESP_DECERR = 0b11
class AXIMaster(object):
def __init__(self):
self.write_command_queue = []
@ -101,19 +110,21 @@ class AXIMaster(object):
self.int_read_addr_sync = Signal(False)
self.int_read_resp_command_queue = []
self.int_read_resp_command_sync = Signal(False)
self.int_read_resp_queue = []
self.int_read_resp_queue_list = {}
self.int_read_resp_sync = Signal(False)
self.in_flight_operations = 0
self.max_burst_len = 256
self.has_logic = False
self.clk = None
def init_read(self, address, length, burst=0b01, size=None, lock=0b0, cache=0b0000, prot=0b010, qos=0b0000, region=0b0000, user=None):
def init_read(self, address, length, burst=0b01, size=None, lock=0b0, cache=0b0011, prot=0b010, qos=0b0000, region=0b0000, user=None):
self.read_command_queue.append((address, length, burst, size, lock, cache, prot, qos, region, user))
self.read_command_sync.next = not self.read_command_sync
def init_write(self, address, data, burst=0b01, size=None, lock=0b0, cache=0b0000, prot=0b010, qos=0b0000, region=0b0000, user=None):
def init_write(self, address, data, burst=0b01, size=None, lock=0b0, cache=0b0011, prot=0b010, qos=0b0000, region=0b0000, user=None):
self.write_command_queue.append((address, data, burst, size, lock, cache, prot, qos, region, user))
self.write_command_sync.next = not self.write_command_sync
@ -179,6 +190,12 @@ class AXIMaster(object):
m_axi_ruser=None,
m_axi_rvalid=Signal(bool(False)),
m_axi_rready=Signal(bool(False)),
pause=False,
awpause=False,
wpause=False,
bpause=False,
arpause=False,
rpause=False,
name=None
):
@ -186,26 +203,25 @@ class AXIMaster(object):
raise Exception("Logic already instantiated!")
if m_axi_wdata is not None:
assert m_axi_awid is not None
assert m_axi_bid is not None
assert len(m_axi_awid) == len(m_axi_bid)
if m_axi_awid is not None:
assert m_axi_bid is not None
assert len(m_axi_awid) == len(m_axi_bid)
assert m_axi_awaddr is not None
assert len(m_axi_wdata) % 8 == 0
assert len(m_axi_wdata) / 8 == len(m_axi_wstrb)
w = len(m_axi_wdata)
if m_axi_rdata is not None:
assert m_axi_arid is not None
assert m_axi_rid is not None
assert len(m_axi_arid) == len(m_axi_rid)
if m_axi_arid is not None:
assert m_axi_rid is not None
assert len(m_axi_arid) == len(m_axi_rid)
assert m_axi_araddr is not None
assert len(m_axi_rdata) % 8 == 0
w = len(m_axi_rdata)
if m_axi_wdata is not None:
assert len(m_axi_wdata) == len(m_axi_rdata)
assert len(m_axi_awid) == len(m_axi_arid)
assert len(m_axi_awaddr) == len(m_axi_araddr)
assert len(m_axi_wdata) == len(m_axi_rdata)
bw = int(w/8)
@ -214,12 +230,28 @@ class AXIMaster(object):
self.has_logic = True
self.clk = clk
m_axi_bvalid_int = Signal(bool(False))
m_axi_bready_int = Signal(bool(False))
m_axi_rvalid_int = Signal(bool(False))
m_axi_rready_int = Signal(bool(False))
@always_comb
def pause_logic():
m_axi_bvalid_int.next = m_axi_bvalid and not (pause or bpause)
m_axi_bready.next = m_axi_bready_int and not (pause or bpause)
m_axi_rvalid_int.next = m_axi_rvalid and not (pause or rpause)
m_axi_rready.next = m_axi_rready_int and not (pause or rpause)
@instance
def write_logic():
while True:
if not self.write_command_queue:
yield self.write_command_sync
if m_axi_awaddr is None:
print("Error: attempted write on read-only interface")
raise StopSimulation
addr, data, burst, size, lock, cache, prot, qos, region, user = self.write_command_queue.pop(0)
self.in_flight_operations += 1
@ -269,12 +301,17 @@ class AXIMaster(object):
if n >= burst_length:
transfer_count += 1
n = 0
burst_length = min(cycles-k, 256) # max len
burst_length = min(burst_length, 0x1000-(cur_addr&0xfff)) # 4k align
burst_length = min(cycles-k, min(max(self.max_burst_len, 1), 256)) # max len
burst_length = int((min(burst_length*num_bytes, 0x1000-(cur_addr&0xfff))+num_bytes-1)/num_bytes) # 4k align
awid = self.cur_write_id
self.cur_write_id = (self.cur_write_id + 1) % 2**len(m_axi_awid)
if m_axi_awid is not None:
self.cur_write_id = (self.cur_write_id + 1) % 2**len(m_axi_awid)
else:
self.cur_write_id = 0
self.int_write_addr_queue.append((cur_addr, awid, burst_length-1, size, burst, lock, cache, prot, qos, region, user))
self.int_write_addr_sync.next = not self.int_write_addr_sync
if name is not None:
print("[%s] Write burst awid: 0x%x awaddr: 0x%08x awlen: %d awsize: %d" % (name, awid, cur_addr, burst_length-1, size))
n += 1
self.int_write_data_queue.append((val, strb, n >= burst_length))
self.int_write_data_sync.next = not self.int_write_data_sync
@ -299,7 +336,7 @@ class AXIMaster(object):
while not self.int_write_resp_queue:
yield clk.posedge
cycle_resp = self.int_write_resp_queue.pop(0)
cycle_id, cycle_resp, cycle_user = self.int_write_resp_queue.pop(0)
if cycle_resp != 0:
resp = cycle_resp
@ -315,7 +352,8 @@ class AXIMaster(object):
yield clk.posedge
addr, awid, length, size, burst, lock, cache, prot, qos, region, user = self.int_write_addr_queue.pop(0)
m_axi_awaddr.next = addr
if m_axi_awaddr is not None:
m_axi_awaddr.next = addr
m_axi_awid.next = awid
m_axi_awlen.next = length
m_axi_awsize.next = size
@ -327,11 +365,12 @@ class AXIMaster(object):
m_axi_awregion.next = region
if m_axi_awuser is not None:
m_axi_awuser.next = user
m_axi_awvalid.next = True
m_axi_awvalid.next = not (pause or awpause)
yield clk.posedge
while m_axi_awvalid and not m_axi_awready:
while not m_axi_awvalid or not m_axi_awready:
m_axi_awvalid.next = m_axi_awvalid or not (pause or awpause)
yield clk.posedge
m_axi_awvalid.next = False
@ -343,11 +382,12 @@ class AXIMaster(object):
yield clk.posedge
m_axi_wdata.next, m_axi_wstrb.next, m_axi_wlast.next = self.int_write_data_queue.pop(0)
m_axi_wvalid.next = True
m_axi_wvalid.next = not (pause or wpause)
yield clk.posedge
while m_axi_wvalid and not m_axi_wready:
while not m_axi_wvalid or not m_axi_wready:
m_axi_wvalid.next = m_axi_wvalid or not (pause or wpause)
yield clk.posedge
m_axi_wvalid.next = False
@ -355,12 +395,21 @@ class AXIMaster(object):
@instance
def write_resp_interface_logic():
while True:
m_axi_bready.next = True
m_axi_bready_int.next = True
yield clk.posedge
if m_axi_bready & m_axi_bvalid:
self.int_write_resp_queue.append(int(m_axi_bresp))
if m_axi_bready and m_axi_bvalid_int:
if m_axi_bid is not None:
bid = int(m_axi_bid)
else:
bid = 0
bresp = int(m_axi_bresp)
if m_axi_buser is not None:
buser = int(m_axi_buser)
else:
buser = 0
self.int_write_resp_queue.append((bid, bresp, buser))
self.int_write_resp_sync.next = not self.int_write_resp_sync
@instance
@ -369,6 +418,10 @@ class AXIMaster(object):
if not self.read_command_queue:
yield self.read_command_sync
if m_axi_araddr is None:
print("Error: attempted read on write-only interface")
raise StopSimulation
addr, length, burst, size, lock, cache, prot, qos, region, user = self.read_command_queue.pop(0)
self.in_flight_operations += 1
@ -385,7 +438,9 @@ class AXIMaster(object):
cycles = int((length + num_bytes-1 + (addr % num_bytes)) / num_bytes)
self.int_read_resp_command_queue.append((addr, length, size, cycles, prot))
burst_list = []
self.int_read_resp_command_queue.append((addr, length, size, cycles, prot, burst_list))
self.int_read_resp_command_sync.next = not self.int_read_resp_command_sync
cur_addr = aligned_addr
@ -398,22 +453,30 @@ class AXIMaster(object):
n += 1
if n >= burst_length:
n = 0
burst_length = min(cycles-k, 256) # max len
burst_length = min(burst_length, 0x1000-((aligned_addr+k*num_bytes)&0xfff))# 4k align
burst_length = min(cycles-k, min(max(self.max_burst_len, 1), 256)) # max len
burst_length = int((min(burst_length*num_bytes, 0x1000-(cur_addr&0xfff))+num_bytes-1)/num_bytes) # 4k align
arid = self.cur_read_id
self.cur_read_id = (self.cur_read_id + 1) % 2**len(m_axi_arid)
if m_axi_arid is not None:
self.cur_read_id = (self.cur_read_id + 1) % 2**len(m_axi_arid)
else:
self.cur_read_id = 0
burst_list.append((arid, burst_length))
self.int_read_addr_queue.append((cur_addr, arid, burst_length-1, size, burst, lock, cache, prot, qos, region, user))
self.int_read_addr_sync.next = not self.int_read_addr_sync
if name is not None:
print("[%s] Read burst arid: 0x%x araddr: 0x%08x arlen: %d arsize: %d" % (name, arid, cur_addr, burst_length-1, size))
cur_addr += num_bytes
burst_list.append(None)
@instance
def read_resp_logic():
while True:
if not self.int_read_resp_command_queue:
yield self.int_read_resp_command_sync
addr, length, size, cycles, prot = self.int_read_resp_command_queue.pop(0)
addr, length, size, cycles, prot, burst_list = self.int_read_resp_command_queue.pop(0)
num_bytes = 2**size
assert 0 <= size <= int(math.log(bw, 2))
@ -429,29 +492,46 @@ class AXIMaster(object):
resp = 0
for k in range(cycles):
if not self.int_read_resp_queue:
yield self.int_read_resp_sync
first = True
cycle_data, cycle_resp, cycle_last = self.int_read_resp_queue.pop(0)
while True:
while not burst_list:
yield clk.posedge
if cycle_resp != 0:
resp = cycle_resp
cur_burst = burst_list.pop(0)
start = cycle_offset
stop = cycle_offset+num_bytes
if cur_burst is None:
break
if k == 0:
start = start_offset
if k == cycles-1:
stop = end_offset
rid = cur_burst[0]
burst_length = cur_burst[1]
assert cycle_last == (k == cycles - 1)
for k in range(burst_length):
self.int_read_resp_queue_list.setdefault(rid, [])
while not self.int_read_resp_queue_list[rid]:
yield self.int_read_resp_sync
for j in range(start, stop):
data += bytearray([(cycle_data >> j*8) & 0xff])
cycle_id, cycle_data, cycle_resp, cycle_last, cycle_user = self.int_read_resp_queue_list[rid].pop(0)
cycle_offset = (cycle_offset + num_bytes) % bw
if cycle_resp != 0:
resp = cycle_resp
start = cycle_offset
stop = cycle_offset+num_bytes
if first:
start = start_offset
assert cycle_last == (k == burst_length - 1)
for j in range(start, stop):
data += bytearray([(cycle_data >> j*8) & 0xff])
cycle_offset = (cycle_offset + num_bytes) % bw
first = False
data = data[:length]
if name is not None:
print("[%s] Read data addr: 0x%08x prot: 0x%x data: %s" % (name, addr, prot, " ".join(("{:02x}".format(c) for c in bytearray(data)))))
@ -468,7 +548,8 @@ class AXIMaster(object):
addr, arid, length, size, burst, lock, cache, prot, qos, region, user = self.int_read_addr_queue.pop(0)
m_axi_araddr.next = addr
m_axi_arid.next = arid
if m_axi_arid is not None:
m_axi_arid.next = arid
m_axi_arlen.next = length
m_axi_arsize.next = size
m_axi_arburst.next = burst
@ -479,11 +560,12 @@ class AXIMaster(object):
m_axi_arregion.next = region
if m_axi_aruser is not None:
m_axi_aruser.next = user
m_axi_arvalid.next = True
m_axi_arvalid.next = not (pause or arpause)
yield clk.posedge
while m_axi_arvalid and not m_axi_arready:
while not m_axi_arvalid or not m_axi_arready:
m_axi_arvalid.next = m_axi_arvalid or not (pause or arpause)
yield clk.posedge
m_axi_arvalid.next = False
@ -491,12 +573,24 @@ class AXIMaster(object):
@instance
def read_resp_interface_logic():
while True:
m_axi_rready.next = True
m_axi_rready_int.next = True
yield clk.posedge
if m_axi_rready & m_axi_rvalid:
self.int_read_resp_queue.append((int(m_axi_rdata), int(m_axi_rresp), int(m_axi_rlast)))
if m_axi_rready and m_axi_rvalid_int:
if m_axi_rid is not None:
rid = int(m_axi_rid)
else:
rid = 0
rdata = int(m_axi_rdata)
rresp = int(m_axi_rresp)
rlast = int(m_axi_rlast)
if m_axi_buser is not None:
ruser = int(m_axi_ruser)
else:
ruser = 0
self.int_read_resp_queue_list.setdefault(rid, [])
self.int_read_resp_queue_list[rid].append((rid, rdata, rresp, rlast, ruser))
self.int_read_resp_sync.next = not self.int_read_resp_sync
return instances()
@ -525,7 +619,7 @@ class AXIRam(object):
def write_mem(self, address, data):
self.mem.seek(address % self.size)
self.mem.write(data)
self.mem.write(bytes(data))
def create_port(self,
clk,
@ -564,35 +658,56 @@ class AXIRam(object):
s_axi_rlast=Signal(bool(True)),
s_axi_rvalid=Signal(bool(False)),
s_axi_rready=Signal(bool(False)),
pause=False,
awpause=False,
wpause=False,
bpause=False,
arpause=False,
rpause=False,
name=None
):
if s_axi_wdata is not None:
assert s_axi_awid is not None
assert s_axi_bid is not None
assert len(s_axi_awid) == len(s_axi_bid)
if s_axi_awid is not None:
assert s_axi_bid is not None
assert len(s_axi_awid) == len(s_axi_bid)
assert s_axi_awaddr is not None
assert len(s_axi_wdata) % 8 == 0
assert len(s_axi_wdata) / 8 == len(s_axi_wstrb)
w = len(s_axi_wdata)
if s_axi_rdata is not None:
assert s_axi_arid is not None
assert s_axi_rid is not None
assert len(s_axi_arid) == len(s_axi_rid)
if s_axi_arid is not None:
assert s_axi_rid is not None
assert len(s_axi_arid) == len(s_axi_rid)
assert s_axi_araddr is not None
assert len(s_axi_rdata) % 8 == 0
w = len(s_axi_rdata)
if s_axi_wdata is not None:
assert len(s_axi_wdata) == len(s_axi_rdata)
assert len(s_axi_awid) == len(s_axi_arid)
assert len(s_axi_awaddr) == len(s_axi_araddr)
assert len(s_axi_wdata) == len(s_axi_rdata)
bw = int(w/8)
assert bw in (1, 2, 4, 8, 16, 32, 64, 128)
s_axi_awvalid_int = Signal(bool(False))
s_axi_awready_int = Signal(bool(False))
s_axi_wvalid_int = Signal(bool(False))
s_axi_wready_int = Signal(bool(False))
s_axi_arvalid_int = Signal(bool(False))
s_axi_arready_int = Signal(bool(False))
@always_comb
def pause_logic():
s_axi_awvalid_int.next = s_axi_awvalid and not (pause or awpause)
s_axi_awready.next = s_axi_awready_int and not (pause or awpause)
s_axi_wvalid_int.next = s_axi_wvalid and not (pause or wpause)
s_axi_wready.next = s_axi_wready_int and not (pause or wpause)
s_axi_arvalid_int.next = s_axi_arvalid and not (pause or arpause)
s_axi_arready.next = s_axi_arready_int and not (pause or arpause)
@instance
def write_logic():
while True:
@ -601,6 +716,9 @@ class AXIRam(object):
addr, awid, length, size, burst, lock, cache, prot = self.int_write_addr_queue.pop(0)
if name is not None:
print("[%s] Write burst awid: 0x%x awaddr: 0x%08x awlen: %d awsize: %d" % (name, awid, addr, length, size))
num_bytes = 2**size
assert 0 < num_bytes <= bw
@ -622,25 +740,28 @@ class AXIRam(object):
for n in range(length):
cur_word_addr = int(cur_addr/bw)*bw
self.mem.seek(cur_word_addr % self.size)
if not self.int_write_data_queue:
yield self.int_write_data_sync
wdata, strb, last = self.int_write_data_queue.pop(0)
self.mem.seek(cur_word_addr % self.size)
data = bytearray()
for i in range(bw):
data.extend(bytearray([wdata & 0xff]))
wdata >>= 8
for i in range(bw):
if strb & (1 << i):
self.mem.write(data[i:i+1])
self.mem.write(bytes(data[i:i+1]))
else:
self.mem.seek(1, 1)
if n == length-1:
self.int_write_resp_queue.append((awid, 0b00))
self.int_write_resp_sync.next = not self.int_write_resp_sync
if last != (n == length-1):
print("Error: bad last assert")
raise StopSimulation
assert last == (n == length-1)
if name is not None:
print("[%s] Write word id: %d addr: 0x%08x prot: 0x%x wstrb: 0x%02x data: %s" % (name, awid, cur_addr, prot, s_axi_wstrb, " ".join(("{:02x}".format(c) for c in bytearray(data)))))
@ -655,13 +776,16 @@ class AXIRam(object):
@instance
def write_addr_interface_logic():
while True:
s_axi_awready.next = True
s_axi_awready_int.next = True
yield clk.posedge
if s_axi_awready & s_axi_awvalid:
if s_axi_awready and s_axi_awvalid_int:
addr = int(s_axi_awaddr)
awid = int(s_axi_awid)
if s_axi_awid is not None:
awid = int(s_axi_awid)
else:
awid = 0
length = int(s_axi_awlen)
size = int(s_axi_awsize)
burst = int(s_axi_awburst)
@ -674,11 +798,11 @@ class AXIRam(object):
@instance
def write_data_interface_logic():
while True:
s_axi_wready.next = True
s_axi_wready_int.next = True
yield clk.posedge
if s_axi_wready & s_axi_wvalid:
if s_axi_wready and s_axi_wvalid_int:
data = int(s_axi_wdata)
strb = int(s_axi_wstrb)
last = bool(s_axi_wlast)
@ -691,12 +815,16 @@ class AXIRam(object):
while not self.int_write_resp_queue:
yield clk.posedge
s_axi_bid.next, s_axi_bresp.next = self.int_write_resp_queue.pop(0)
s_axi_bvalid.next = True
bid, bresp = self.int_write_resp_queue.pop(0)
if s_axi_bid is not None:
s_axi_bid.next = bid
s_axi_bresp.next = bresp
s_axi_bvalid.next = not (pause or bpause)
yield clk.posedge
while s_axi_bvalid and not s_axi_bready:
while not s_axi_bvalid or not s_axi_bready:
s_axi_bvalid.next = s_axi_bvalid or not (pause or bpause)
yield clk.posedge
s_axi_bvalid.next = False
@ -709,6 +837,9 @@ class AXIRam(object):
addr, arid, length, size, burst, lock, cache, prot = self.int_read_addr_queue.pop(0)
if name is not None:
print("[%s] Read burst arid: 0x%x araddr: 0x%08x arlen: %d arsize: %d" % (name, arid, addr, length, size))
num_bytes = 2**size
assert 0 < num_bytes <= bw
@ -752,13 +883,16 @@ class AXIRam(object):
@instance
def read_addr_interface_logic():
while True:
s_axi_arready.next = True
s_axi_arready_int.next = True
yield clk.posedge
if s_axi_arready & s_axi_arvalid:
if s_axi_arready and s_axi_arvalid_int:
addr = int(s_axi_araddr)
arid = int(s_axi_arid)
if s_axi_arid is not None:
arid = int(s_axi_arid)
else:
arid = 0
length = int(s_axi_arlen)
size = int(s_axi_arsize)
burst = int(s_axi_arburst)
@ -774,12 +908,18 @@ class AXIRam(object):
while not self.int_read_resp_queue:
yield clk.posedge
s_axi_rid.next, s_axi_rdata.next, s_axi_rresp.next, s_axi_rlast.next = self.int_read_resp_queue.pop(0)
s_axi_rvalid.next = True
rid, rdata, rresp, rlast = self.int_read_resp_queue.pop(0)
if s_axi_rid is not None:
s_axi_rid.next = rid
s_axi_rdata.next = rdata
s_axi_rresp.next = rresp
s_axi_rlast.next = rlast
s_axi_rvalid.next = not (pause or rpause)
yield clk.posedge
while s_axi_rvalid and not s_axi_rready:
while not s_axi_rvalid or not s_axi_rready:
s_axi_rvalid.next = s_axi_rvalid or not (pause or rpause)
yield clk.posedge
s_axi_rvalid.next = False