mirror of
https://github.com/corundum/corundum.git
synced 2025-01-16 08:12:53 +08:00
789 lines
29 KiB
Python
789 lines
29 KiB
Python
"""
|
|
|
|
Copyright (c) 2018 Alex Forencich
|
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
of this software and associated documentation files (the "Software"), to deal
|
|
in the Software without restriction, including without limitation the rights
|
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
copies of the Software, and to permit persons to whom the Software is
|
|
furnished to do so, subject to the following conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included in
|
|
all copies or substantial portions of the Software.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
|
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
THE SOFTWARE.
|
|
|
|
"""
|
|
|
|
from myhdl import *
|
|
import math
|
|
import mmap
|
|
|
|
BURST_FIXED = 0b00
|
|
BURST_INCR = 0b01
|
|
BURST_WRAP = 0b10
|
|
|
|
BURST_SIZE_1 = 0b000
|
|
BURST_SIZE_2 = 0b001
|
|
BURST_SIZE_4 = 0b010
|
|
BURST_SIZE_8 = 0b011
|
|
BURST_SIZE_16 = 0b100
|
|
BURST_SIZE_32 = 0b101
|
|
BURST_SIZE_64 = 0b110
|
|
BURST_SIZE_128 = 0b111
|
|
|
|
LOCK_NORMAL = 0b0
|
|
LOCK_EXCLUSIVE = 0b1
|
|
|
|
CACHE_B = 0b0001
|
|
CACHE_M = 0b0010
|
|
CACHE_RA = 0b0100
|
|
CACHE_WA = 0b1000
|
|
|
|
ARCACHE_DEVICE_NON_BUFFERABLE = 0b0000
|
|
ARCACHE_DEVICE_BUFFERABLE = 0b0001
|
|
ARCACHE_NORMAL_NON_CACHEABLE_NON_BUFFERABLE = 0b0010
|
|
ARCACHE_NORMAL_NON_CACHEABLE_BUFFERABLE = 0b0011
|
|
ARCACHE_WRITE_THROUGH_NO_ALLOC = 0b1010
|
|
ARCACHE_WRITE_THROUGH_READ_ALLOC = 0b1110
|
|
ARCACHE_WRITE_THROUGH_WRITE_ALLOC = 0b1010
|
|
ARCACHE_WRITE_THROUGH_READ_AND_WRITE_ALLOC = 0b1110
|
|
ARCACHE_WRITE_BACK_NO_ALLOC = 0b1011
|
|
ARCACHE_WRITE_BACK_READ_ALLOC = 0b1111
|
|
ARCACHE_WRITE_BACK_WRITE_ALLOC = 0b1011
|
|
ARCACHE_WRITE_BACK_READ_AND_WRIE_ALLOC = 0b1111
|
|
|
|
AWCACHE_DEVICE_NON_BUFFERABLE = 0b0000
|
|
AWCACHE_DEVICE_BUFFERABLE = 0b0001
|
|
AWCACHE_NORMAL_NON_CACHEABLE_NON_BUFFERABLE = 0b0010
|
|
AWCACHE_NORMAL_NON_CACHEABLE_BUFFERABLE = 0b0011
|
|
AWCACHE_WRITE_THROUGH_NO_ALLOC = 0b0110
|
|
AWCACHE_WRITE_THROUGH_READ_ALLOC = 0b0110
|
|
AWCACHE_WRITE_THROUGH_WRITE_ALLOC = 0b1110
|
|
AWCACHE_WRITE_THROUGH_READ_AND_WRITE_ALLOC = 0b1110
|
|
AWCACHE_WRITE_BACK_NO_ALLOC = 0b0111
|
|
AWCACHE_WRITE_BACK_READ_ALLOC = 0b0111
|
|
AWCACHE_WRITE_BACK_WRITE_ALLOC = 0b1111
|
|
AWCACHE_WRITE_BACK_READ_AND_WRIE_ALLOC = 0b1111
|
|
|
|
class AXIMaster(object):
|
|
def __init__(self):
|
|
self.write_command_queue = []
|
|
self.write_command_sync = Signal(False)
|
|
self.write_resp_queue = []
|
|
self.write_resp_sync = Signal(False)
|
|
|
|
self.read_command_queue = []
|
|
self.read_command_sync = Signal(False)
|
|
self.read_data_queue = []
|
|
self.read_data_sync = Signal(False)
|
|
|
|
self.cur_write_id = 0
|
|
self.cur_read_id = 0
|
|
|
|
self.int_write_addr_queue = []
|
|
self.int_write_addr_sync = Signal(False)
|
|
self.int_write_data_queue = []
|
|
self.int_write_data_sync = Signal(False)
|
|
self.int_write_resp_command_queue = []
|
|
self.int_write_resp_command_sync = Signal(False)
|
|
self.int_write_resp_queue = []
|
|
self.int_write_resp_sync = Signal(False)
|
|
|
|
self.int_read_addr_queue = []
|
|
self.int_read_addr_sync = Signal(False)
|
|
self.int_read_resp_command_queue = []
|
|
self.int_read_resp_command_sync = Signal(False)
|
|
self.int_read_resp_queue = []
|
|
self.int_read_resp_sync = Signal(False)
|
|
|
|
self.in_flight_operations = 0
|
|
|
|
self.has_logic = False
|
|
self.clk = None
|
|
|
|
def init_read(self, address, length, burst=0b01, size=None, lock=0b0, cache=0b0000, prot=0b010, qos=0b0000, region=0b0000, user=None):
|
|
self.read_command_queue.append((address, length, burst, size, lock, cache, prot, qos, region, user))
|
|
self.read_command_sync.next = not self.read_command_sync
|
|
|
|
def init_write(self, address, data, burst=0b01, size=None, lock=0b0, cache=0b0000, prot=0b010, qos=0b0000, region=0b0000, user=None):
|
|
self.write_command_queue.append((address, data, burst, size, lock, cache, prot, qos, region, user))
|
|
self.write_command_sync.next = not self.write_command_sync
|
|
|
|
def idle(self):
|
|
return not self.write_command_queue and not self.read_command_queue and not self.in_flight_operations
|
|
|
|
def wait(self):
|
|
while not self.idle():
|
|
yield self.clk.posedge
|
|
|
|
def read_data_ready(self):
|
|
return bool(self.read_data_queue)
|
|
|
|
def get_read_data(self):
|
|
if self.read_data_queue:
|
|
return self.read_data_queue.pop(0)
|
|
return None
|
|
|
|
def create_logic(self,
|
|
clk,
|
|
rst,
|
|
m_axi_awid=None,
|
|
m_axi_awaddr=None,
|
|
m_axi_awlen=Signal(intbv(0)[8:]),
|
|
m_axi_awsize=Signal(intbv(0)[3:]),
|
|
m_axi_awburst=Signal(intbv(0)[2:]),
|
|
m_axi_awlock=Signal(intbv(0)[1:]),
|
|
m_axi_awcache=Signal(intbv(0)[4:]),
|
|
m_axi_awprot=Signal(intbv(0)[3:]),
|
|
m_axi_awqos=Signal(intbv(0)[4:]),
|
|
m_axi_awregion=Signal(intbv(0)[4:]),
|
|
m_axi_awuser=None,
|
|
m_axi_awvalid=Signal(bool(False)),
|
|
m_axi_awready=Signal(bool(True)),
|
|
m_axi_wdata=None,
|
|
m_axi_wstrb=Signal(intbv(1)[1:]),
|
|
m_axi_wlast=Signal(bool(True)),
|
|
m_axi_wuser=None,
|
|
m_axi_wvalid=Signal(bool(False)),
|
|
m_axi_wready=Signal(bool(True)),
|
|
m_axi_bid=None,
|
|
m_axi_bresp=Signal(intbv(0)[2:]),
|
|
m_axi_buser=None,
|
|
m_axi_bvalid=Signal(bool(False)),
|
|
m_axi_bready=Signal(bool(False)),
|
|
m_axi_arid=None,
|
|
m_axi_araddr=None,
|
|
m_axi_arlen=Signal(intbv(0)[8:]),
|
|
m_axi_arsize=Signal(intbv(0)[3:]),
|
|
m_axi_arburst=Signal(intbv(0)[2:]),
|
|
m_axi_arlock=Signal(intbv(0)[1:]),
|
|
m_axi_arcache=Signal(intbv(0)[4:]),
|
|
m_axi_arprot=Signal(intbv(0)[3:]),
|
|
m_axi_arqos=Signal(intbv(0)[4:]),
|
|
m_axi_arregion=Signal(intbv(0)[4:]),
|
|
m_axi_aruser=None,
|
|
m_axi_arvalid=Signal(bool(False)),
|
|
m_axi_arready=Signal(bool(True)),
|
|
m_axi_rid=None,
|
|
m_axi_rdata=None,
|
|
m_axi_rresp=Signal(intbv(0)[2:]),
|
|
m_axi_rlast=Signal(bool(True)),
|
|
m_axi_ruser=None,
|
|
m_axi_rvalid=Signal(bool(False)),
|
|
m_axi_rready=Signal(bool(False)),
|
|
name=None
|
|
):
|
|
|
|
if self.has_logic:
|
|
raise Exception("Logic already instantiated!")
|
|
|
|
if m_axi_wdata is not None:
|
|
assert m_axi_awid is not None
|
|
assert m_axi_bid is not None
|
|
assert len(m_axi_awid) == len(m_axi_bid)
|
|
assert m_axi_awaddr is not None
|
|
assert len(m_axi_wdata) % 8 == 0
|
|
assert len(m_axi_wdata) / 8 == len(m_axi_wstrb)
|
|
w = len(m_axi_wdata)
|
|
|
|
if m_axi_rdata is not None:
|
|
assert m_axi_arid is not None
|
|
assert m_axi_rid is not None
|
|
assert len(m_axi_arid) == len(m_axi_rid)
|
|
assert m_axi_araddr is not None
|
|
assert len(m_axi_rdata) % 8 == 0
|
|
w = len(m_axi_rdata)
|
|
|
|
if m_axi_wdata is not None:
|
|
assert len(m_axi_wdata) == len(m_axi_rdata)
|
|
assert len(m_axi_awid) == len(m_axi_arid)
|
|
assert len(m_axi_awaddr) == len(m_axi_araddr)
|
|
|
|
bw = int(w/8)
|
|
|
|
assert bw in (1, 2, 4, 8, 16, 32, 64, 128)
|
|
|
|
self.has_logic = True
|
|
self.clk = clk
|
|
|
|
@instance
|
|
def write_logic():
|
|
while True:
|
|
if not self.write_command_queue:
|
|
yield self.write_command_sync
|
|
|
|
addr, data, burst, size, lock, cache, prot, qos, region, user = self.write_command_queue.pop(0)
|
|
self.in_flight_operations += 1
|
|
|
|
num_bytes = bw
|
|
|
|
if size is None:
|
|
size = int(math.log(bw, 2))
|
|
else:
|
|
num_bytes = 2**size
|
|
assert 0 < num_bytes <= bw
|
|
|
|
aligned_addr = int(addr/num_bytes)*num_bytes
|
|
word_addr = int(addr/bw)*bw
|
|
|
|
start_offset = addr % bw
|
|
end_offset = ((addr + len(data) - 1) % bw) + 1
|
|
|
|
cycles = int((len(data) + num_bytes-1 + (addr % num_bytes)) / num_bytes)
|
|
|
|
cur_addr = aligned_addr
|
|
offset = 0
|
|
cycle_offset = aligned_addr-word_addr
|
|
n = 0
|
|
transfer_count = 0
|
|
|
|
burst_length = 0
|
|
|
|
if name is not None:
|
|
print("[%s] Write data addr: 0x%08x prot: 0x%x data: %s" % (name, addr, prot, " ".join(("{:02x}".format(c) for c in bytearray(data)))))
|
|
|
|
for k in range(cycles):
|
|
start = cycle_offset
|
|
stop = cycle_offset+num_bytes
|
|
|
|
if k == 0:
|
|
start = start_offset
|
|
if k == cycles-1:
|
|
stop = end_offset
|
|
|
|
strb = ((2**bw-1) << start) & (2**bw-1) & (2**bw-1) >> (bw - stop)
|
|
|
|
val = 0
|
|
for j in range(start, stop):
|
|
val |= bytearray(data)[offset] << j*8
|
|
offset += 1
|
|
|
|
if n >= burst_length:
|
|
transfer_count += 1
|
|
n = 0
|
|
burst_length = min(cycles-k, 256) # max len
|
|
burst_length = min(burst_length, 0x1000-(cur_addr&0xfff)) # 4k align
|
|
awid = self.cur_write_id
|
|
self.cur_write_id = (self.cur_write_id + 1) % 2**len(m_axi_awid)
|
|
self.int_write_addr_queue.append((cur_addr, awid, burst_length-1, size, burst, lock, cache, prot, qos, region, user))
|
|
self.int_write_addr_sync.next = not self.int_write_addr_sync
|
|
n += 1
|
|
self.int_write_data_queue.append((val, strb, n >= burst_length))
|
|
self.int_write_data_sync.next = not self.int_write_data_sync
|
|
|
|
cur_addr += num_bytes
|
|
cycle_offset = (cycle_offset + num_bytes) % bw
|
|
|
|
self.int_write_resp_command_queue.append((addr, len(data), transfer_count, prot))
|
|
self.int_write_resp_command_sync.next = not self.int_write_resp_command_sync
|
|
|
|
@instance
|
|
def write_resp_logic():
|
|
while True:
|
|
if not self.int_write_resp_command_queue:
|
|
yield self.int_write_resp_command_sync
|
|
|
|
addr, length, transfer_count, prot = self.int_write_resp_command_queue.pop(0)
|
|
|
|
resp = 0
|
|
|
|
for k in range(transfer_count):
|
|
while not self.int_write_resp_queue:
|
|
yield clk.posedge
|
|
|
|
cycle_resp = self.int_write_resp_queue.pop(0)
|
|
|
|
if cycle_resp != 0:
|
|
resp = cycle_resp
|
|
|
|
self.write_resp_queue.append((addr, length, prot, resp))
|
|
self.write_resp_sync.next = not self.write_resp_sync
|
|
self.in_flight_operations -= 1
|
|
|
|
@instance
|
|
def write_addr_interface_logic():
|
|
while True:
|
|
while not self.int_write_addr_queue:
|
|
yield clk.posedge
|
|
|
|
addr, awid, length, size, burst, lock, cache, prot, qos, region, user = self.int_write_addr_queue.pop(0)
|
|
m_axi_awaddr.next = addr
|
|
m_axi_awid.next = awid
|
|
m_axi_awlen.next = length
|
|
m_axi_awsize.next = size
|
|
m_axi_awburst.next = burst
|
|
m_axi_awlock.next = lock
|
|
m_axi_awcache.next = cache
|
|
m_axi_awprot.next = prot
|
|
m_axi_awqos.next = qos
|
|
m_axi_awregion.next = region
|
|
if m_axi_awuser is not None:
|
|
m_axi_awuser.next = user
|
|
m_axi_awvalid.next = True
|
|
|
|
yield clk.posedge
|
|
|
|
while m_axi_awvalid and not m_axi_awready:
|
|
yield clk.posedge
|
|
|
|
m_axi_awvalid.next = False
|
|
|
|
@instance
|
|
def write_data_interface_logic():
|
|
while True:
|
|
while not self.int_write_data_queue:
|
|
yield clk.posedge
|
|
|
|
m_axi_wdata.next, m_axi_wstrb.next, m_axi_wlast.next = self.int_write_data_queue.pop(0)
|
|
m_axi_wvalid.next = True
|
|
|
|
yield clk.posedge
|
|
|
|
while m_axi_wvalid and not m_axi_wready:
|
|
yield clk.posedge
|
|
|
|
m_axi_wvalid.next = False
|
|
|
|
@instance
|
|
def write_resp_interface_logic():
|
|
while True:
|
|
m_axi_bready.next = True
|
|
|
|
yield clk.posedge
|
|
|
|
if m_axi_bready & m_axi_bvalid:
|
|
self.int_write_resp_queue.append(int(m_axi_bresp))
|
|
self.int_write_resp_sync.next = not self.int_write_resp_sync
|
|
|
|
@instance
|
|
def read_logic():
|
|
while True:
|
|
if not self.read_command_queue:
|
|
yield self.read_command_sync
|
|
|
|
addr, length, burst, size, lock, cache, prot, qos, region, user = self.read_command_queue.pop(0)
|
|
self.in_flight_operations += 1
|
|
|
|
num_bytes = bw
|
|
|
|
if size is None:
|
|
size = int(math.log(bw, 2))
|
|
else:
|
|
num_bytes = 2**size
|
|
assert 0 < num_bytes <= bw
|
|
|
|
aligned_addr = int(addr/num_bytes)*num_bytes
|
|
word_addr = int(addr/bw)*bw
|
|
|
|
cycles = int((length + num_bytes-1 + (addr % num_bytes)) / num_bytes)
|
|
|
|
self.int_read_resp_command_queue.append((addr, length, size, cycles, prot))
|
|
self.int_read_resp_command_sync.next = not self.int_read_resp_command_sync
|
|
|
|
cur_addr = aligned_addr
|
|
n = 0
|
|
|
|
burst_length = 0
|
|
|
|
for k in range(cycles):
|
|
|
|
n += 1
|
|
if n >= burst_length:
|
|
n = 0
|
|
burst_length = min(cycles-k, 256) # max len
|
|
burst_length = min(burst_length, 0x1000-((aligned_addr+k*num_bytes)&0xfff))# 4k align
|
|
arid = self.cur_read_id
|
|
self.cur_read_id = (self.cur_read_id + 1) % 2**len(m_axi_arid)
|
|
self.int_read_addr_queue.append((cur_addr, arid, burst_length-1, size, burst, lock, cache, prot, qos, region, user))
|
|
self.int_read_addr_sync.next = not self.int_read_addr_sync
|
|
|
|
cur_addr += num_bytes
|
|
|
|
@instance
|
|
def read_resp_logic():
|
|
while True:
|
|
if not self.int_read_resp_command_queue:
|
|
yield self.int_read_resp_command_sync
|
|
|
|
addr, length, size, cycles, prot = self.int_read_resp_command_queue.pop(0)
|
|
|
|
num_bytes = 2**size
|
|
assert 0 <= size <= int(math.log(bw, 2))
|
|
|
|
aligned_addr = int(addr/num_bytes)*num_bytes
|
|
word_addr = int(addr/bw)*bw
|
|
|
|
start_offset = addr % bw
|
|
end_offset = ((addr + length - 1) % bw) + 1
|
|
|
|
cycle_offset = aligned_addr-word_addr
|
|
data = b''
|
|
|
|
resp = 0
|
|
|
|
for k in range(cycles):
|
|
if not self.int_read_resp_queue:
|
|
yield self.int_read_resp_sync
|
|
|
|
cycle_data, cycle_resp, cycle_last = self.int_read_resp_queue.pop(0)
|
|
|
|
if cycle_resp != 0:
|
|
resp = cycle_resp
|
|
|
|
start = cycle_offset
|
|
stop = cycle_offset+num_bytes
|
|
|
|
if k == 0:
|
|
start = start_offset
|
|
if k == cycles-1:
|
|
stop = end_offset
|
|
|
|
assert cycle_last == (k == cycles - 1)
|
|
|
|
for j in range(start, stop):
|
|
data += bytearray([(cycle_data >> j*8) & 0xff])
|
|
|
|
cycle_offset = (cycle_offset + num_bytes) % bw
|
|
|
|
if name is not None:
|
|
print("[%s] Read data addr: 0x%08x prot: 0x%x data: %s" % (name, addr, prot, " ".join(("{:02x}".format(c) for c in bytearray(data)))))
|
|
|
|
self.read_data_queue.append((addr, data, prot, resp))
|
|
self.read_data_sync.next = not self.read_data_sync
|
|
self.in_flight_operations -= 1
|
|
|
|
@instance
|
|
def read_addr_interface_logic():
|
|
while True:
|
|
while not self.int_read_addr_queue:
|
|
yield clk.posedge
|
|
|
|
addr, arid, length, size, burst, lock, cache, prot, qos, region, user = self.int_read_addr_queue.pop(0)
|
|
m_axi_araddr.next = addr
|
|
m_axi_arid.next = arid
|
|
m_axi_arlen.next = length
|
|
m_axi_arsize.next = size
|
|
m_axi_arburst.next = burst
|
|
m_axi_arlock.next = lock
|
|
m_axi_arcache.next = cache
|
|
m_axi_arprot.next = prot
|
|
m_axi_arqos.next = qos
|
|
m_axi_arregion.next = region
|
|
if m_axi_aruser is not None:
|
|
m_axi_aruser.next = user
|
|
m_axi_arvalid.next = True
|
|
|
|
yield clk.posedge
|
|
|
|
while m_axi_arvalid and not m_axi_arready:
|
|
yield clk.posedge
|
|
|
|
m_axi_arvalid.next = False
|
|
|
|
@instance
|
|
def read_resp_interface_logic():
|
|
while True:
|
|
m_axi_rready.next = True
|
|
|
|
yield clk.posedge
|
|
|
|
if m_axi_rready & m_axi_rvalid:
|
|
self.int_read_resp_queue.append((int(m_axi_rdata), int(m_axi_rresp), int(m_axi_rlast)))
|
|
self.int_read_resp_sync.next = not self.int_read_resp_sync
|
|
|
|
return instances()
|
|
|
|
|
|
class AXIRam(object):
|
|
def __init__(self, size = 1024):
|
|
self.size = size
|
|
self.mem = mmap.mmap(-1, size)
|
|
|
|
self.int_write_addr_queue = []
|
|
self.int_write_addr_sync = Signal(False)
|
|
self.int_write_data_queue = []
|
|
self.int_write_data_sync = Signal(False)
|
|
self.int_write_resp_queue = []
|
|
self.int_write_resp_sync = Signal(False)
|
|
|
|
self.int_read_addr_queue = []
|
|
self.int_read_addr_sync = Signal(False)
|
|
self.int_read_resp_queue = []
|
|
self.int_read_resp_sync = Signal(False)
|
|
|
|
def read_mem(self, address, length):
|
|
self.mem.seek(address % self.size)
|
|
return self.mem.read(length)
|
|
|
|
def write_mem(self, address, data):
|
|
self.mem.seek(address % self.size)
|
|
self.mem.write(data)
|
|
|
|
def create_port(self,
|
|
clk,
|
|
s_axi_awid=None,
|
|
s_axi_awaddr=None,
|
|
s_axi_awlen=Signal(intbv(0)[8:]),
|
|
s_axi_awsize=Signal(intbv(0)[3:]),
|
|
s_axi_awburst=Signal(intbv(0)[2:]),
|
|
s_axi_awlock=Signal(intbv(0)[1:]),
|
|
s_axi_awcache=Signal(intbv(0)[4:]),
|
|
s_axi_awprot=Signal(intbv(0)[3:]),
|
|
s_axi_awvalid=Signal(bool(False)),
|
|
s_axi_awready=Signal(bool(True)),
|
|
s_axi_wdata=None,
|
|
s_axi_wstrb=Signal(intbv(1)[1:]),
|
|
s_axi_wlast=Signal(bool(True)),
|
|
s_axi_wvalid=Signal(bool(False)),
|
|
s_axi_wready=Signal(bool(True)),
|
|
s_axi_bid=None,
|
|
s_axi_bresp=Signal(intbv(0)[2:]),
|
|
s_axi_bvalid=Signal(bool(False)),
|
|
s_axi_bready=Signal(bool(False)),
|
|
s_axi_arid=None,
|
|
s_axi_araddr=None,
|
|
s_axi_arlen=Signal(intbv(0)[8:]),
|
|
s_axi_arsize=Signal(intbv(0)[3:]),
|
|
s_axi_arburst=Signal(intbv(0)[2:]),
|
|
s_axi_arlock=Signal(intbv(0)[1:]),
|
|
s_axi_arcache=Signal(intbv(0)[4:]),
|
|
s_axi_arprot=Signal(intbv(0)[3:]),
|
|
s_axi_arvalid=Signal(bool(False)),
|
|
s_axi_arready=Signal(bool(True)),
|
|
s_axi_rid=None,
|
|
s_axi_rdata=None,
|
|
s_axi_rresp=Signal(intbv(0)[2:]),
|
|
s_axi_rlast=Signal(bool(True)),
|
|
s_axi_rvalid=Signal(bool(False)),
|
|
s_axi_rready=Signal(bool(False)),
|
|
name=None
|
|
):
|
|
|
|
if s_axi_wdata is not None:
|
|
assert s_axi_awid is not None
|
|
assert s_axi_bid is not None
|
|
assert len(s_axi_awid) == len(s_axi_bid)
|
|
assert s_axi_awaddr is not None
|
|
assert len(s_axi_wdata) % 8 == 0
|
|
assert len(s_axi_wdata) / 8 == len(s_axi_wstrb)
|
|
w = len(s_axi_wdata)
|
|
|
|
if s_axi_rdata is not None:
|
|
assert s_axi_arid is not None
|
|
assert s_axi_rid is not None
|
|
assert len(s_axi_arid) == len(s_axi_rid)
|
|
assert s_axi_araddr is not None
|
|
assert len(s_axi_rdata) % 8 == 0
|
|
w = len(s_axi_rdata)
|
|
|
|
if s_axi_wdata is not None:
|
|
assert len(s_axi_wdata) == len(s_axi_rdata)
|
|
assert len(s_axi_awid) == len(s_axi_arid)
|
|
assert len(s_axi_awaddr) == len(s_axi_araddr)
|
|
|
|
bw = int(w/8)
|
|
|
|
assert bw in (1, 2, 4, 8, 16, 32, 64, 128)
|
|
|
|
@instance
|
|
def write_logic():
|
|
while True:
|
|
if not self.int_write_addr_queue:
|
|
yield self.int_write_addr_sync
|
|
|
|
addr, awid, length, size, burst, lock, cache, prot = self.int_write_addr_queue.pop(0)
|
|
|
|
num_bytes = 2**size
|
|
assert 0 < num_bytes <= bw
|
|
|
|
aligned_addr = int(addr/num_bytes)*num_bytes
|
|
length = length+1
|
|
|
|
transfer_size = num_bytes*length
|
|
|
|
if burst == BURST_WRAP:
|
|
lower_wrap_boundary = int(addr/transfer_size)*transfer_size
|
|
upper_wrap_boundary = lower_wrap_boundary+transfer_size
|
|
|
|
if burst == BURST_INCR:
|
|
# check for 4k boundary crossing
|
|
assert 0x1000-(aligned_addr&0xfff) >= transfer_size
|
|
|
|
cur_addr = aligned_addr
|
|
|
|
for n in range(length):
|
|
cur_word_addr = int(cur_addr/bw)*bw
|
|
|
|
self.mem.seek(cur_word_addr % self.size)
|
|
|
|
if not self.int_write_data_queue:
|
|
yield self.int_write_data_sync
|
|
|
|
wdata, strb, last = self.int_write_data_queue.pop(0)
|
|
|
|
data = bytearray()
|
|
for i in range(bw):
|
|
data.extend(bytearray([wdata & 0xff]))
|
|
wdata >>= 8
|
|
for i in range(bw):
|
|
if strb & (1 << i):
|
|
self.mem.write(data[i:i+1])
|
|
else:
|
|
self.mem.seek(1, 1)
|
|
if n == length-1:
|
|
self.int_write_resp_queue.append((awid, 0b00))
|
|
self.int_write_resp_sync.next = not self.int_write_resp_sync
|
|
assert last == (n == length-1)
|
|
if name is not None:
|
|
print("[%s] Write word id: %d addr: 0x%08x prot: 0x%x wstrb: 0x%02x data: %s" % (name, awid, cur_addr, prot, s_axi_wstrb, " ".join(("{:02x}".format(c) for c in bytearray(data)))))
|
|
|
|
if burst != BURST_FIXED:
|
|
cur_addr += num_bytes
|
|
|
|
if burst == BURST_WRAP:
|
|
if cur_addr == upper_wrap_boundary:
|
|
cur_addr = lower_wrap_boundary
|
|
|
|
@instance
|
|
def write_addr_interface_logic():
|
|
while True:
|
|
s_axi_awready.next = True
|
|
|
|
yield clk.posedge
|
|
|
|
if s_axi_awready & s_axi_awvalid:
|
|
addr = int(s_axi_awaddr)
|
|
awid = int(s_axi_awid)
|
|
length = int(s_axi_awlen)
|
|
size = int(s_axi_awsize)
|
|
burst = int(s_axi_awburst)
|
|
lock = int(s_axi_awlock)
|
|
cache = int(s_axi_awcache)
|
|
prot = int(s_axi_awprot)
|
|
self.int_write_addr_queue.append((addr, awid, length, size, burst, lock, cache, prot))
|
|
self.int_write_addr_sync.next = not self.int_write_addr_sync
|
|
|
|
@instance
|
|
def write_data_interface_logic():
|
|
while True:
|
|
s_axi_wready.next = True
|
|
|
|
yield clk.posedge
|
|
|
|
if s_axi_wready & s_axi_wvalid:
|
|
data = int(s_axi_wdata)
|
|
strb = int(s_axi_wstrb)
|
|
last = bool(s_axi_wlast)
|
|
self.int_write_data_queue.append((data, strb, last))
|
|
self.int_write_data_sync.next = not self.int_write_data_sync
|
|
|
|
@instance
|
|
def write_resp_interface_logic():
|
|
while True:
|
|
while not self.int_write_resp_queue:
|
|
yield clk.posedge
|
|
|
|
s_axi_bid.next, s_axi_bresp.next = self.int_write_resp_queue.pop(0)
|
|
s_axi_bvalid.next = True
|
|
|
|
yield clk.posedge
|
|
|
|
while s_axi_bvalid and not s_axi_bready:
|
|
yield clk.posedge
|
|
|
|
s_axi_bvalid.next = False
|
|
|
|
@instance
|
|
def read_logic():
|
|
while True:
|
|
if not self.int_read_addr_queue:
|
|
yield self.int_read_addr_sync
|
|
|
|
addr, arid, length, size, burst, lock, cache, prot = self.int_read_addr_queue.pop(0)
|
|
|
|
num_bytes = 2**size
|
|
assert 0 < num_bytes <= bw
|
|
|
|
aligned_addr = int(addr/num_bytes)*num_bytes
|
|
length = length+1
|
|
|
|
transfer_size = num_bytes*length
|
|
|
|
if burst == BURST_WRAP:
|
|
lower_wrap_boundary = int(addr/transfer_size)*transfer_size
|
|
upper_wrap_boundary = lower_wrap_boundary+transfer_size
|
|
|
|
if burst == BURST_INCR:
|
|
# check for 4k boundary crossing
|
|
assert 0x1000-(aligned_addr&0xfff) >= transfer_size
|
|
|
|
cur_addr = aligned_addr
|
|
|
|
for n in range(length):
|
|
cur_word_addr = int(cur_addr/bw)*bw
|
|
|
|
self.mem.seek(cur_word_addr % self.size)
|
|
|
|
data = bytearray(self.mem.read(bw))
|
|
val = 0
|
|
for i in range(bw-1,-1,-1):
|
|
val <<= 8
|
|
val += data[i]
|
|
self.int_read_resp_queue.append((arid, val, 0x00, n == length-1))
|
|
self.int_read_resp_sync.next = not self.int_read_resp_sync
|
|
if name is not None:
|
|
print("[%s] Read word id: %d addr: 0x%08x prot: 0x%x data: %s" % (name, arid, cur_addr, prot, " ".join(("{:02x}".format(c) for c in bytearray(data)))))
|
|
|
|
if burst != BURST_FIXED:
|
|
cur_addr += num_bytes
|
|
|
|
if burst == BURST_WRAP:
|
|
if cur_addr == upper_wrap_boundary:
|
|
cur_addr = lower_wrap_boundary
|
|
|
|
@instance
|
|
def read_addr_interface_logic():
|
|
while True:
|
|
s_axi_arready.next = True
|
|
|
|
yield clk.posedge
|
|
|
|
if s_axi_arready & s_axi_arvalid:
|
|
addr = int(s_axi_araddr)
|
|
arid = int(s_axi_arid)
|
|
length = int(s_axi_arlen)
|
|
size = int(s_axi_arsize)
|
|
burst = int(s_axi_arburst)
|
|
lock = int(s_axi_arlock)
|
|
cache = int(s_axi_arcache)
|
|
prot = int(s_axi_arprot)
|
|
self.int_read_addr_queue.append((addr, arid, length, size, burst, lock, cache, prot))
|
|
self.int_read_addr_sync.next = not self.int_read_addr_sync
|
|
|
|
@instance
|
|
def read_resp_interface_logic():
|
|
while True:
|
|
while not self.int_read_resp_queue:
|
|
yield clk.posedge
|
|
|
|
s_axi_rid.next, s_axi_rdata.next, s_axi_rresp.next, s_axi_rlast.next = self.int_read_resp_queue.pop(0)
|
|
s_axi_rvalid.next = True
|
|
|
|
yield clk.posedge
|
|
|
|
while s_axi_rvalid and not s_axi_rready:
|
|
yield clk.posedge
|
|
|
|
s_axi_rvalid.next = False
|
|
|
|
return instances()
|
|
|