mirror of
https://github.com/myhdl/myhdl.git
synced 2024-12-14 07:44:38 +08:00
refactored
This commit is contained in:
parent
23b5c1183d
commit
08acc140b5
@ -32,7 +32,31 @@ from Signal import Signal
|
||||
import myhdl
|
||||
|
||||
_flag = 0
|
||||
MAXLINE = 4096
|
||||
_MAXLINE = 4096
|
||||
|
||||
class Error(Exception):
|
||||
"""Cosimulation Error"""
|
||||
def __init__(self, arg=""):
|
||||
self.arg = arg
|
||||
def __str__(self):
|
||||
msg = self.__doc__
|
||||
if self.arg:
|
||||
msg = msg + ": " + str(self.arg)
|
||||
return msg
|
||||
|
||||
class MultipleCosimError(Error):
|
||||
"""Only a single cosimulator allowed"""
|
||||
class DuplicateSigNamesError(Error):
|
||||
"""Duplicate signal name in myhdl vpi call"""
|
||||
class SigNotFoundError(Error):
|
||||
"""Signal not found in Cosimulation arguments"""
|
||||
class TimeZeroError(Error):
|
||||
"""myhdl vpi call when not at time 0"""
|
||||
class NoCommunicationError(Error):
|
||||
"""No communicating signals"""
|
||||
class SimulationEndError(Error):
|
||||
"""Premature simulation end"""
|
||||
|
||||
|
||||
class Cosimulation(object):
|
||||
|
||||
@ -44,16 +68,18 @@ class Cosimulation(object):
|
||||
|
||||
global _flag
|
||||
if _flag:
|
||||
raise myhdl.Error, "Cosimulation: Only a single cosimulator allowed"
|
||||
raise MultipleCosimError
|
||||
_flag = 1
|
||||
|
||||
# Error = myhdl.Error
|
||||
|
||||
self.rt, self.wt = rt, wt = os.pipe()
|
||||
self.rf, self.wf = rf, wf = os.pipe()
|
||||
|
||||
self._fromSigs = []
|
||||
self._fromSizes = []
|
||||
self._toSigs = []
|
||||
self._toSizes = []
|
||||
self._fromSignames = fromSignames = []
|
||||
self._fromSizes = fromSizes = []
|
||||
self._toSignames = toSignames = []
|
||||
self._toSizes = toSizes = []
|
||||
|
||||
child_pid = os.fork()
|
||||
|
||||
@ -66,33 +92,42 @@ class Cosimulation(object):
|
||||
try:
|
||||
os.execvp(arglist[0], arglist)
|
||||
except OSError, e:
|
||||
raise myhdl.Error, "Cosimulation: " + str(e)
|
||||
raise Error, str(e)
|
||||
else:
|
||||
os.close(wt)
|
||||
os.close(rf)
|
||||
while 1:
|
||||
s = os.read(rt, MAXLINE)
|
||||
s = os.read(rt, _MAXLINE)
|
||||
if not s:
|
||||
raise myhdl.Error, "Cosimulation down"
|
||||
raise SimulationEndError
|
||||
e = s.split()
|
||||
if e[0] == "FROM":
|
||||
self._fromSigs = [e[i] for i in range(1, len(e), 2)]
|
||||
self._fromSizes = [int(e[i]) for i in range(2, len(e), 2)]
|
||||
for i in range(1, len(e)-1, 2):
|
||||
if e[i] in fromSignames:
|
||||
raise DuplicateSigNamesError, e[i]
|
||||
if not e[i] in kwargs:
|
||||
raise SigNotFoundError, e[i]
|
||||
fromSignames.append(e[i])
|
||||
fromSizes.append(int(e[i+1]))
|
||||
os.write(wf, "OK")
|
||||
elif e[0] == "TO":
|
||||
self._toSigs = [e[i] for i in range(1, len(e), 2)]
|
||||
self._toSizes = [int(e[i]) for i in range(2, len(e), 2)]
|
||||
for i in range(1, len(e)-1, 2):
|
||||
if e[i] in toSignames:
|
||||
raise DuplicateSigNamesError, e[i]
|
||||
if not e[i] in kwargs:
|
||||
raise SigNotFoundError, e[i]
|
||||
toSignames.append(e[i])
|
||||
toSizes.append(int(e[i+1]))
|
||||
os.write(wf, "OK")
|
||||
else:
|
||||
self.buf = e
|
||||
break
|
||||
if long(e[0]) != 0:
|
||||
raise myhdl.Error, "Cosimulation: myhdl call when not at time 0"
|
||||
if not self._fromSigs and not self._toSigs:
|
||||
raise myhdl.Error, "Cosimulation: no communicating signals"
|
||||
raise TimeZeroError
|
||||
if not fromSignames and not toSignames:
|
||||
raise NoCommunicationError
|
||||
|
||||
|
||||
|
||||
def __del__(self):
|
||||
""" Clear flag when this object destroyed - to suite unittest. """
|
||||
global _flag
|
||||
|
@ -35,46 +35,65 @@ random.seed(1) # random, but deterministic
|
||||
|
||||
MAXLINE = 4096
|
||||
|
||||
from myhdl import Cosimulation, Error
|
||||
from myhdl import Signal
|
||||
|
||||
from Cosimulation import Cosimulation, \
|
||||
Error, \
|
||||
MultipleCosimError, \
|
||||
DuplicateSigNamesError, \
|
||||
SigNotFoundError, \
|
||||
TimeZeroError, \
|
||||
NoCommunicationError, \
|
||||
SimulationEndError
|
||||
|
||||
exe = "python test_Cosimulation.py CosimulationTest"
|
||||
|
||||
fromSignames = ['a', 'bb', 'ccc']
|
||||
fromSizes = [1, 11, 63]
|
||||
fromSigs = {}
|
||||
for s in fromSignames:
|
||||
fromSigs[s] = Signal(0)
|
||||
toSignames = ['d', 'ee', 'fff', 'g']
|
||||
toSizes = [32, 12, 3, 6]
|
||||
toSigs = {}
|
||||
for s in toSignames:
|
||||
toSigs[s] = Signal(0)
|
||||
allSigs = fromSigs.copy()
|
||||
allSigs.update(toSigs)
|
||||
|
||||
class CosimulationTest(TestCase):
|
||||
|
||||
exe = "python test_Cosimulation.py CosimulationTest"
|
||||
|
||||
fromSigs = ['a', 'bb', 'ccc']
|
||||
fromSizes = [1, 11, 63]
|
||||
toSigs = ['d', 'ee', 'fff', 'g']
|
||||
toSizes = [32, 12, 3, 6]
|
||||
|
||||
|
||||
def testWrongExe(self):
|
||||
self.assertRaises(Error, Cosimulation, "bla -x 45")
|
||||
self.assertRaises(Error, \
|
||||
Cosimulation, "bla -x 45")
|
||||
|
||||
def testNotUnique(self):
|
||||
cosim1 = Cosimulation(self.exe + ".cosimNotUnique")
|
||||
self.assertRaises(Error, Cosimulation, self.exe + ".cosimNotUnique")
|
||||
cosim1 = Cosimulation(exe + ".cosimNotUnique", **allSigs)
|
||||
self.assertRaises(MultipleCosimError,
|
||||
Cosimulation, exe + ".cosimNotUnique", **allSigs)
|
||||
|
||||
def cosimNotUnique(self):
|
||||
wt = int(os.environ['MYHDL_TO_PIPE'])
|
||||
rf = int(os.environ['MYHDL_FROM_PIPE'])
|
||||
os.write(wt, "TO a")
|
||||
os.write(wt, "TO a 1")
|
||||
os.read(rf, MAXLINE)
|
||||
os.write(wt, "FROM d")
|
||||
os.write(wt, "FROM d 1")
|
||||
os.read(rf, MAXLINE)
|
||||
os.write(wt, "0000")
|
||||
os.read(rf, MAXLINE)
|
||||
|
||||
def testFromSignals(self):
|
||||
cosim = Cosimulation(self.exe + ".cosimFromSignals")
|
||||
self.assertEqual(cosim._fromSigs, self.fromSigs)
|
||||
self.assertEqual(cosim._fromSizes, self.fromSizes)
|
||||
self.assertEqual(cosim._toSigs, [])
|
||||
cosim = Cosimulation(exe + ".cosimFromSignals", **fromSigs)
|
||||
self.assertEqual(cosim._fromSignames, fromSignames)
|
||||
self.assertEqual(cosim._fromSizes, fromSizes)
|
||||
self.assertEqual(cosim._toSignames, [])
|
||||
self.assertEqual(cosim._toSizes, [])
|
||||
|
||||
def cosimFromSignals(self):
|
||||
wt = int(os.environ['MYHDL_TO_PIPE'])
|
||||
rf = int(os.environ['MYHDL_FROM_PIPE'])
|
||||
buf = "FROM "
|
||||
for s, w in zip(self.fromSigs, self.fromSizes):
|
||||
for s, w in zip(fromSignames, fromSizes):
|
||||
buf += "%s %s " % (s, w)
|
||||
os.write(wt, buf)
|
||||
os.read(rf, MAXLINE)
|
||||
@ -82,17 +101,17 @@ class CosimulationTest(TestCase):
|
||||
os.read(rf, MAXLINE)
|
||||
|
||||
def testToSignals(self):
|
||||
cosim = Cosimulation(self.exe + ".cosimToSignals")
|
||||
self.assertEqual(cosim._fromSigs, [])
|
||||
cosim = Cosimulation(exe + ".cosimToSignals", **toSigs)
|
||||
self.assertEqual(cosim._fromSignames, [])
|
||||
self.assertEqual(cosim._fromSizes, [])
|
||||
self.assertEqual(cosim._toSigs, self.toSigs)
|
||||
self.assertEqual(cosim._toSizes, self.toSizes)
|
||||
self.assertEqual(cosim._toSignames, toSignames)
|
||||
self.assertEqual(cosim._toSizes, toSizes)
|
||||
|
||||
def cosimToSignals(self):
|
||||
wt = int(os.environ['MYHDL_TO_PIPE'])
|
||||
rf = int(os.environ['MYHDL_FROM_PIPE'])
|
||||
buf = "TO "
|
||||
for s, w in zip(self.toSigs, self.toSizes):
|
||||
for s, w in zip(toSignames, toSizes):
|
||||
buf += "%s %s " % (s, w)
|
||||
os.write(wt, buf)
|
||||
os.read(rf, MAXLINE)
|
||||
@ -100,22 +119,22 @@ class CosimulationTest(TestCase):
|
||||
os.read(rf, MAXLINE)
|
||||
|
||||
def testFromToSignals(self):
|
||||
cosim = Cosimulation(self.exe + ".cosimFromToSignals")
|
||||
self.assertEqual(cosim._fromSigs, self.fromSigs)
|
||||
self.assertEqual(cosim._fromSizes, self.fromSizes)
|
||||
self.assertEqual(cosim._toSigs, self.toSigs)
|
||||
self.assertEqual(cosim._toSizes, self.toSizes)
|
||||
cosim = Cosimulation(exe + ".cosimFromToSignals", **allSigs)
|
||||
self.assertEqual(cosim._fromSignames, fromSignames)
|
||||
self.assertEqual(cosim._fromSizes, fromSizes)
|
||||
self.assertEqual(cosim._toSignames, toSignames)
|
||||
self.assertEqual(cosim._toSizes, toSizes)
|
||||
|
||||
def cosimFromToSignals(self):
|
||||
wt = int(os.environ['MYHDL_TO_PIPE'])
|
||||
rf = int(os.environ['MYHDL_FROM_PIPE'])
|
||||
buf = "FROM "
|
||||
for s, w in zip(self.fromSigs, self.fromSizes):
|
||||
for s, w in zip(fromSignames, fromSizes):
|
||||
buf += "%s %s " % (s, w)
|
||||
os.write(wt, buf)
|
||||
os.read(rf, MAXLINE)
|
||||
buf = "TO "
|
||||
for s, w in zip(self.toSigs, self.toSizes):
|
||||
for s, w in zip(toSignames, toSizes):
|
||||
buf += "%s %s " % (s, w)
|
||||
os.write(wt, buf)
|
||||
os.read(rf, MAXLINE)
|
||||
@ -123,13 +142,14 @@ class CosimulationTest(TestCase):
|
||||
os.read(rf, MAXLINE)
|
||||
|
||||
def testTimeZero(self):
|
||||
self.assertRaises(Error, Cosimulation, self.exe + ".cosimTimeZero")
|
||||
self.assertRaises(TimeZeroError, \
|
||||
Cosimulation, exe + ".cosimTimeZero", **allSigs)
|
||||
|
||||
def cosimTimeZero(self):
|
||||
wt = int(os.environ['MYHDL_TO_PIPE'])
|
||||
rf = int(os.environ['MYHDL_FROM_PIPE'])
|
||||
buf = "FROM "
|
||||
for s, w in zip(self.fromSigs, self.fromSizes):
|
||||
for s, w in zip(fromSignames, fromSizes):
|
||||
buf += "%s %s " % (s, w)
|
||||
os.write(wt, buf)
|
||||
os.read(rf, MAXLINE)
|
||||
@ -137,19 +157,52 @@ class CosimulationTest(TestCase):
|
||||
os.read(rf, MAXLINE)
|
||||
|
||||
def testNoComm(self):
|
||||
self.assertRaises(Error, Cosimulation, self.exe + ".cosimNoComm")
|
||||
self.assertRaises(NoCommunicationError, \
|
||||
Cosimulation, exe + ".cosimNoComm", **allSigs)
|
||||
|
||||
def cosimNoComm(self):
|
||||
wt = int(os.environ['MYHDL_TO_PIPE'])
|
||||
rf = int(os.environ['MYHDL_FROM_PIPE'])
|
||||
os.write(wt, "0000")
|
||||
os.read(rf, MAXLINE)
|
||||
|
||||
|
||||
def testFromSignalsDupl(self):
|
||||
self.assertRaises(DuplicateSigNamesError, \
|
||||
Cosimulation, exe + ".cosimFromSignalsDupl", **allSigs)
|
||||
|
||||
|
||||
def cosimFromSignalsDupl(self):
|
||||
wt = int(os.environ['MYHDL_TO_PIPE'])
|
||||
rf = int(os.environ['MYHDL_FROM_PIPE'])
|
||||
buf = "FROM "
|
||||
for s, w in zip(fromSignames, fromSizes):
|
||||
buf += "%s %s " % (s, w)
|
||||
buf += "bb 5"
|
||||
os.write(wt, buf)
|
||||
os.read(rf, MAXLINE)
|
||||
try:
|
||||
os.write(wt, "0000")
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def testToSignalsDupl(self):
|
||||
self.assertRaises(DuplicateSigNamesError, \
|
||||
Cosimulation, exe + ".cosimToSignalsDupl", **allSigs)
|
||||
|
||||
def cosimToSignalsDupl(self):
|
||||
wt = int(os.environ['MYHDL_TO_PIPE'])
|
||||
rf = int(os.environ['MYHDL_FROM_PIPE'])
|
||||
buf = "TO "
|
||||
for s, w in zip(toSignames, toSizes):
|
||||
buf += "%s %s " % (s, w)
|
||||
buf += "fff 6"
|
||||
os.write(wt, buf)
|
||||
os.read(rf, MAXLINE)
|
||||
try:
|
||||
os.write(wt, "0000")
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user