mirror of
https://github.com/corundum/corundum.git
synced 2025-01-16 08:12:53 +08:00
471 lines
15 KiB
Python
471 lines
15 KiB
Python
|
"""
|
||
|
|
||
|
Copyright (c) 2018 Alex Forencich
|
||
|
|
||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||
|
of this software and associated documentation files (the "Software"), to deal
|
||
|
in the Software without restriction, including without limitation the rights
|
||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||
|
copies of the Software, and to permit persons to whom the Software is
|
||
|
furnished to do so, subject to the following conditions:
|
||
|
|
||
|
The above copyright notice and this permission notice shall be included in
|
||
|
all copies or substantial portions of the Software.
|
||
|
|
||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
|
||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||
|
THE SOFTWARE.
|
||
|
|
||
|
"""
|
||
|
|
||
|
from myhdl import *
|
||
|
import mmap
|
||
|
|
||
|
class AXILiteMaster(object):
|
||
|
def __init__(self):
|
||
|
self.write_command_queue = []
|
||
|
self.write_resp_queue = []
|
||
|
|
||
|
self.read_command_queue = []
|
||
|
self.read_data_queue = []
|
||
|
|
||
|
self.int_write_addr_queue = []
|
||
|
self.int_write_data_queue = []
|
||
|
self.int_write_resp_command_queue = []
|
||
|
self.int_write_resp_queue = []
|
||
|
|
||
|
self.int_read_addr_queue = []
|
||
|
self.int_read_resp_command_queue = []
|
||
|
self.int_read_resp_queue = []
|
||
|
|
||
|
self.in_flight_operations = 0
|
||
|
|
||
|
self.has_logic = False
|
||
|
self.clk = None
|
||
|
|
||
|
def init_read(self, address, length, prot=0b010):
|
||
|
self.read_command_queue.append((address, length, prot))
|
||
|
|
||
|
def init_write(self, address, data, prot=0b010):
|
||
|
self.write_command_queue.append((address, data, prot))
|
||
|
|
||
|
def idle(self):
|
||
|
return not self.write_command_queue and not self.read_command_queue and not self.in_flight_operations
|
||
|
|
||
|
def wait(self):
|
||
|
while not self.idle():
|
||
|
yield self.clk.posedge
|
||
|
|
||
|
def read_data_ready(self):
|
||
|
return bool(self.read_data_queue)
|
||
|
|
||
|
def get_read_data(self):
|
||
|
if self.read_data_queue:
|
||
|
return self.read_data_queue.pop(0)
|
||
|
return None
|
||
|
|
||
|
def create_logic(self,
|
||
|
clk,
|
||
|
rst,
|
||
|
m_axil_awaddr=None,
|
||
|
m_axil_awprot=Signal(intbv(0)[3:]),
|
||
|
m_axil_awvalid=Signal(bool(False)),
|
||
|
m_axil_awready=Signal(bool(True)),
|
||
|
m_axil_wdata=None,
|
||
|
m_axil_wstrb=Signal(intbv(1)[1:]),
|
||
|
m_axil_wvalid=Signal(bool(False)),
|
||
|
m_axil_wready=Signal(bool(True)),
|
||
|
m_axil_bresp=Signal(intbv(0)[2:]),
|
||
|
m_axil_bvalid=Signal(bool(False)),
|
||
|
m_axil_bready=Signal(bool(False)),
|
||
|
m_axil_araddr=None,
|
||
|
m_axil_arprot=Signal(intbv(0)[3:]),
|
||
|
m_axil_arvalid=Signal(bool(False)),
|
||
|
m_axil_arready=Signal(bool(True)),
|
||
|
m_axil_rdata=None,
|
||
|
m_axil_rresp=Signal(intbv(0)[2:]),
|
||
|
m_axil_rvalid=Signal(bool(False)),
|
||
|
m_axil_rready=Signal(bool(False)),
|
||
|
name=None
|
||
|
):
|
||
|
|
||
|
if self.has_logic:
|
||
|
raise Exception("Logic already instantiated!")
|
||
|
|
||
|
if m_axil_wdata is not None:
|
||
|
assert m_axil_awaddr is not None
|
||
|
assert len(m_axil_wdata) % 8 == 0
|
||
|
assert len(m_axil_wdata) / 8 == len(m_axil_wstrb)
|
||
|
w = len(m_axil_wdata)
|
||
|
|
||
|
if m_axil_rdata is not None:
|
||
|
assert m_axil_araddr is not None
|
||
|
assert len(m_axil_rdata) % 8 == 0
|
||
|
w = len(m_axil_rdata)
|
||
|
|
||
|
if m_axil_wdata is not None:
|
||
|
assert len(m_axil_wdata) == len(m_axil_rdata)
|
||
|
assert len(m_axil_awaddr) == len(m_axil_araddr)
|
||
|
|
||
|
bw = int(w/8)
|
||
|
|
||
|
assert bw in (1, 2, 4, 8, 16, 32, 64, 128)
|
||
|
|
||
|
self.has_logic = True
|
||
|
self.clk = clk
|
||
|
|
||
|
@instance
|
||
|
def write_logic():
|
||
|
while True:
|
||
|
while not self.write_command_queue:
|
||
|
yield clk.posedge
|
||
|
|
||
|
addr, data, prot = self.write_command_queue.pop(0)
|
||
|
self.in_flight_operations += 1
|
||
|
|
||
|
word_addr = int(addr/bw)*bw
|
||
|
|
||
|
start_offset = addr % bw
|
||
|
end_offset = ((addr + len(data) - 1) % bw) + 1
|
||
|
|
||
|
strb_start = ((2**bw-1) << start_offset) & (2**bw-1)
|
||
|
strb_end = (2**bw-1) >> (bw - end_offset)
|
||
|
|
||
|
cycles = int((len(data) + bw-1 + (addr % bw)) / bw)
|
||
|
|
||
|
self.int_write_resp_command_queue.append((addr, len(data), cycles, prot))
|
||
|
|
||
|
offset = 0
|
||
|
|
||
|
if name is not None:
|
||
|
print("[%s] Write data addr: 0x%08x prot: 0x%x data: %s" % (name, addr, prot, " ".join(("{:02x}".format(c) for c in bytearray(data)))))
|
||
|
|
||
|
for k in range(cycles):
|
||
|
start = 0
|
||
|
stop = bw
|
||
|
strb = 2**bw-1
|
||
|
|
||
|
if k == 0:
|
||
|
start = start_offset
|
||
|
strb &= strb_start
|
||
|
if k == cycles-1:
|
||
|
stop = end_offset
|
||
|
strb &= strb_end
|
||
|
|
||
|
val = 0
|
||
|
for j in range(start, stop):
|
||
|
val |= bytearray(data)[offset] << j*8
|
||
|
offset += 1
|
||
|
|
||
|
self.int_write_addr_queue.append((word_addr + start + k*bw, prot))
|
||
|
self.int_write_data_queue.append((val, strb))
|
||
|
|
||
|
@instance
|
||
|
def write_resp_logic():
|
||
|
while True:
|
||
|
while not self.int_write_resp_command_queue:
|
||
|
yield clk.posedge
|
||
|
|
||
|
addr, length, cycles, prot = self.int_write_resp_command_queue.pop(0)
|
||
|
|
||
|
resp = 0
|
||
|
|
||
|
for k in range(cycles):
|
||
|
while not self.int_write_resp_queue:
|
||
|
yield clk.posedge
|
||
|
|
||
|
cycle_resp = self.int_write_resp_queue.pop(0)
|
||
|
|
||
|
if cycle_resp != 0:
|
||
|
resp = cycle_resp
|
||
|
|
||
|
self.write_resp_queue.append((addr, length, prot, resp))
|
||
|
self.in_flight_operations -= 1
|
||
|
|
||
|
@instance
|
||
|
def write_addr_interface_logic():
|
||
|
while True:
|
||
|
while not self.int_write_addr_queue:
|
||
|
yield clk.posedge
|
||
|
|
||
|
m_axil_awaddr.next, m_axil_awprot.next = self.int_write_addr_queue.pop(0)
|
||
|
m_axil_awvalid.next = True
|
||
|
|
||
|
yield clk.posedge
|
||
|
|
||
|
while m_axil_awvalid and not m_axil_awready:
|
||
|
yield clk.posedge
|
||
|
|
||
|
m_axil_awvalid.next = False
|
||
|
|
||
|
@instance
|
||
|
def write_data_interface_logic():
|
||
|
while True:
|
||
|
while not self.int_write_data_queue:
|
||
|
yield clk.posedge
|
||
|
|
||
|
m_axil_wdata.next, m_axil_wstrb.next = self.int_write_data_queue.pop(0)
|
||
|
m_axil_wvalid.next = True
|
||
|
|
||
|
yield clk.posedge
|
||
|
|
||
|
while m_axil_wvalid and not m_axil_wready:
|
||
|
yield clk.posedge
|
||
|
|
||
|
m_axil_wvalid.next = False
|
||
|
|
||
|
@instance
|
||
|
def write_resp_interface_logic():
|
||
|
while True:
|
||
|
m_axil_bready.next = True
|
||
|
|
||
|
yield clk.posedge
|
||
|
|
||
|
if m_axil_bready & m_axil_bvalid:
|
||
|
self.int_write_resp_queue.append(int(m_axil_bresp))
|
||
|
|
||
|
@instance
|
||
|
def read_logic():
|
||
|
while True:
|
||
|
while not self.read_command_queue:
|
||
|
yield clk.posedge
|
||
|
|
||
|
addr, length, prot = self.read_command_queue.pop(0)
|
||
|
self.in_flight_operations += 1
|
||
|
|
||
|
word_addr = int(addr/bw)*bw
|
||
|
|
||
|
start_offset = addr % bw
|
||
|
|
||
|
cycles = int((length + bw-1 + (addr % bw)) / bw)
|
||
|
|
||
|
self.int_read_resp_command_queue.append((addr, length, cycles, prot))
|
||
|
|
||
|
# first cycle
|
||
|
self.int_read_addr_queue.append((word_addr+start_offset, prot))
|
||
|
|
||
|
for k in range(1, cycles):
|
||
|
# middle and last cycles
|
||
|
self.int_read_addr_queue.append((word_addr + k*bw, prot))
|
||
|
|
||
|
@instance
|
||
|
def read_resp_logic():
|
||
|
while True:
|
||
|
while not self.int_read_resp_command_queue:
|
||
|
yield clk.posedge
|
||
|
|
||
|
addr, length, cycles, prot = self.int_read_resp_command_queue.pop(0)
|
||
|
|
||
|
word_addr = int(addr/bw)*bw
|
||
|
|
||
|
start_offset = addr % bw
|
||
|
end_offset = ((addr + length - 1) % bw) + 1
|
||
|
|
||
|
data = b''
|
||
|
|
||
|
resp = 0
|
||
|
|
||
|
for k in range(cycles):
|
||
|
while not self.int_read_resp_queue:
|
||
|
yield clk.posedge
|
||
|
|
||
|
cycle_data, cycle_resp = self.int_read_resp_queue.pop(0)
|
||
|
|
||
|
if cycle_resp != 0:
|
||
|
resp = cycle_resp
|
||
|
|
||
|
start = 0
|
||
|
stop = bw
|
||
|
|
||
|
if k == 0:
|
||
|
start = start_offset
|
||
|
if k == cycles-1:
|
||
|
stop = end_offset
|
||
|
|
||
|
for j in range(start, stop):
|
||
|
data += bytearray([(cycle_data >> j*8) & 0xff])
|
||
|
|
||
|
if name is not None:
|
||
|
print("[%s] Read data addr: 0x%08x prot: 0x%x data: %s" % (name, addr, prot, " ".join(("{:02x}".format(c) for c in bytearray(data)))))
|
||
|
|
||
|
self.read_data_queue.append((addr, data, prot, resp))
|
||
|
self.in_flight_operations -= 1
|
||
|
|
||
|
@instance
|
||
|
def read_addr_interface_logic():
|
||
|
while True:
|
||
|
while not self.int_read_addr_queue:
|
||
|
yield clk.posedge
|
||
|
|
||
|
m_axil_araddr.next, m_axil_arprot.next = self.int_read_addr_queue.pop(0)
|
||
|
m_axil_arvalid.next = True
|
||
|
|
||
|
yield clk.posedge
|
||
|
|
||
|
while m_axil_arvalid and not m_axil_arready:
|
||
|
yield clk.posedge
|
||
|
|
||
|
m_axil_arvalid.next = False
|
||
|
|
||
|
@instance
|
||
|
def read_resp_interface_logic():
|
||
|
while True:
|
||
|
m_axil_rready.next = True
|
||
|
|
||
|
yield clk.posedge
|
||
|
|
||
|
if m_axil_rready & m_axil_rvalid:
|
||
|
self.int_read_resp_queue.append((int(m_axil_rdata), int(m_axil_rresp)))
|
||
|
|
||
|
return instances()
|
||
|
|
||
|
|
||
|
class AXILiteRam(object):
|
||
|
def __init__(self, size = 1024):
|
||
|
self.size = size
|
||
|
self.mem = mmap.mmap(-1, size)
|
||
|
|
||
|
def read_mem(self, address, length):
|
||
|
self.mem.seek(address)
|
||
|
return self.mem.read(length)
|
||
|
|
||
|
def write_mem(self, address, data):
|
||
|
self.mem.seek(address)
|
||
|
self.mem.write(bytes(data))
|
||
|
|
||
|
def create_port(self,
|
||
|
clk,
|
||
|
s_axil_awaddr=None,
|
||
|
s_axil_awprot=Signal(intbv(0)[3:]),
|
||
|
s_axil_awvalid=Signal(bool(False)),
|
||
|
s_axil_awready=Signal(bool(True)),
|
||
|
s_axil_wdata=None,
|
||
|
s_axil_wstrb=Signal(intbv(1)[1:]),
|
||
|
s_axil_wvalid=Signal(bool(False)),
|
||
|
s_axil_wready=Signal(bool(True)),
|
||
|
s_axil_bresp=Signal(intbv(0)[2:]),
|
||
|
s_axil_bvalid=Signal(bool(False)),
|
||
|
s_axil_bready=Signal(bool(False)),
|
||
|
s_axil_araddr=None,
|
||
|
s_axil_arprot=Signal(intbv(0)[3:]),
|
||
|
s_axil_arvalid=Signal(bool(False)),
|
||
|
s_axil_arready=Signal(bool(True)),
|
||
|
s_axil_rdata=None,
|
||
|
s_axil_rresp=Signal(intbv(0)[2:]),
|
||
|
s_axil_rvalid=Signal(bool(False)),
|
||
|
s_axil_rready=Signal(bool(False)),
|
||
|
latency=1,
|
||
|
name=None
|
||
|
):
|
||
|
|
||
|
if s_axil_wdata is not None:
|
||
|
assert s_axil_awaddr is not None
|
||
|
assert len(s_axil_wdata) % 8 == 0
|
||
|
assert len(s_axil_wdata) / 8 == len(s_axil_wstrb)
|
||
|
w = len(s_axil_wdata)
|
||
|
|
||
|
if s_axil_rdata is not None:
|
||
|
assert s_axil_araddr is not None
|
||
|
assert len(s_axil_rdata) % 8 == 0
|
||
|
w = len(s_axil_rdata)
|
||
|
|
||
|
if s_axil_wdata is not None:
|
||
|
assert len(s_axil_wdata) == len(s_axil_rdata)
|
||
|
assert len(s_axil_awaddr) == len(s_axil_araddr)
|
||
|
|
||
|
bw = int(w/8)
|
||
|
|
||
|
assert bw in (1, 2, 4, 8, 16, 32, 64, 128)
|
||
|
|
||
|
@instance
|
||
|
def write_logic():
|
||
|
while True:
|
||
|
s_axil_awready.next = True
|
||
|
|
||
|
yield clk.posedge
|
||
|
|
||
|
addr = int(int(s_axil_awaddr)/bw)*bw
|
||
|
prot = int(s_axil_awprot)
|
||
|
|
||
|
if s_axil_awready & s_axil_awvalid:
|
||
|
s_axil_awready.next = False
|
||
|
|
||
|
for i in range(latency):
|
||
|
yield clk.posedge
|
||
|
|
||
|
self.mem.seek(addr % self.size)
|
||
|
|
||
|
s_axil_wready.next = True
|
||
|
|
||
|
yield clk.posedge
|
||
|
|
||
|
while not s_axil_wvalid:
|
||
|
yield clk.posedge
|
||
|
|
||
|
s_axil_wready.next = False
|
||
|
|
||
|
data = bytearray()
|
||
|
val = int(s_axil_wdata)
|
||
|
for i in range(bw):
|
||
|
data.extend(bytearray([val & 0xff]))
|
||
|
val >>= 8
|
||
|
for i in range(bw):
|
||
|
if s_axil_wstrb & (1 << i):
|
||
|
self.mem.write(bytes(data[i:i+1]))
|
||
|
else:
|
||
|
self.mem.seek(1, 1)
|
||
|
s_axil_bresp.next = 0b00
|
||
|
s_axil_bvalid.next = True
|
||
|
if name is not None:
|
||
|
print("[%s] Write word addr: 0x%08x prot: 0x%x wstrb: 0x%02x data: %s" % (name, addr, prot, s_axil_wstrb, " ".join(("{:02x}".format(c) for c in bytearray(data)))))
|
||
|
|
||
|
yield clk.posedge
|
||
|
|
||
|
while not s_axil_bready:
|
||
|
yield clk.posedge
|
||
|
|
||
|
s_axil_bvalid.next = False
|
||
|
|
||
|
@instance
|
||
|
def read_logic():
|
||
|
while True:
|
||
|
s_axil_arready.next = True
|
||
|
|
||
|
yield clk.posedge
|
||
|
|
||
|
addr = int(int(s_axil_araddr)/bw)*bw
|
||
|
prot = int(s_axil_arprot)
|
||
|
|
||
|
if s_axil_arready & s_axil_arvalid:
|
||
|
s_axil_arready.next = False
|
||
|
|
||
|
for i in range(latency):
|
||
|
yield clk.posedge
|
||
|
|
||
|
self.mem.seek(addr % self.size)
|
||
|
|
||
|
data = bytearray(self.mem.read(bw))
|
||
|
val = 0
|
||
|
for i in range(bw-1,-1,-1):
|
||
|
val <<= 8
|
||
|
val += data[i]
|
||
|
s_axil_rdata.next = val
|
||
|
s_axil_rresp.next = 0b00
|
||
|
s_axil_rvalid.next = True
|
||
|
if name is not None:
|
||
|
print("[%s] Read word addr: 0x%08x prot: 0x%x data: %s" % (name, addr, prot, " ".join(("{:02x}".format(c) for c in bytearray(data)))))
|
||
|
|
||
|
yield clk.posedge
|
||
|
|
||
|
while not s_axil_rready:
|
||
|
yield clk.posedge
|
||
|
|
||
|
s_axil_rvalid.next = False
|
||
|
|
||
|
return instances()
|
||
|
|