1
0
mirror of https://github.com/myhdl/myhdl.git synced 2025-01-24 21:52:56 +08:00

use subprocess instead of forking for cosim

This commit is contained in:
Keerthan Jaic 2015-04-08 16:49:58 -04:00
parent d65bdbfd72
commit 5fa590a632
3 changed files with 63 additions and 60 deletions

View File

@ -21,12 +21,13 @@
from __future__ import absolute_import
import sys
import os
import shlex
import subprocess
from myhdl._intbv import intbv
from myhdl import _simulator, CosimulationError
from myhdl._compat import PY2, to_bytes, to_str
from myhdl._compat import PY2, string_types, to_bytes, to_str
_MAXLINE = 4096
@ -67,67 +68,69 @@ class Cosimulation(object):
self._toSizes = toSizes = []
self._toSigs = toSigs = []
self._toSigDict = toSigDict = {}
self._hasChange = 0
self._getMode = 1
child_pid = self._child_pid = os.fork()
if child_pid == 0:
def close_rt_wf():
os.close(rt)
os.close(wf)
os.environ['MYHDL_TO_PIPE'] = str(wt)
os.environ['MYHDL_FROM_PIPE'] = str(rf)
if isinstance(exe, list): arglist = exe
else: arglist = exe.split()
p = arglist[0]
arglist[0] = os.path.basename(p)
try:
os.execvp(p, arglist)
except OSError as e:
raise CosimulationError(_error.OSError, str(e))
else:
os.close(wt)
os.close(rf)
while 1:
s = to_str(os.read(rt, _MAXLINE))
if not s:
raise CosimulationError(_error.SimulationEnd)
e = s.split()
if e[0] == "FROM":
if int(e[1]) != 0:
raise CosimulationError(_error.TimeZero, "$from_myhdl")
for i in range(2, len(e)-1, 2):
n = e[i]
if n in fromSignames:
raise CosimulationError(_error.DuplicateSigNames, n)
if not n in kwargs:
raise CosimulationError(_error.SigNotFound, n)
fromSignames.append(n)
fromSigs.append(kwargs[n])
fromSizes.append(int(e[i+1]))
os.write(wf, b"OK")
elif e[0] == "TO":
if int(e[1]) != 0:
raise CosimulationError(_error.TimeZero, "$to_myhdl")
for i in range(2, len(e)-1, 2):
n = e[i]
if n in toSignames:
raise CosimulationError(_error.DuplicateSigNames, n)
if not n in kwargs:
raise CosimulationError(_error.SigNotFound, n)
toSignames.append(n)
toSigs.append(kwargs[n])
toSigDict[n] = kwargs[n]
toSizes.append(int(e[i+1]))
os.write(wf, b"OK")
elif e[0] == "START":
if not toSignames:
raise CosimulationError(_error.NoCommunication)
os.write(wf, b"OK")
break
else:
raise CosimulationError("Unexpected cosim input")
env = os.environ.copy()
env['MYHDL_TO_PIPE'] = str(wt)
env['MYHDL_FROM_PIPE'] = str(rf)
if isinstance(exe, string_types):
exe = shlex.split(exe)
try:
sp = subprocess.Popen(exe, env=env, close_fds=False,
preexec_fn=close_rt_wf)
except OSError as e:
raise CosimulationError(_error.OSError, str(e))
self._child = sp
os.close(wt)
os.close(rf)
while 1:
s = to_str(os.read(rt, _MAXLINE))
if not s:
raise CosimulationError(_error.SimulationEnd)
e = s.split()
if e[0] == "FROM":
if int(e[1]) != 0:
raise CosimulationError(_error.TimeZero, "$from_myhdl")
for i in range(2, len(e)-1, 2):
n = e[i]
if n in fromSignames:
raise CosimulationError(_error.DuplicateSigNames, n)
if not n in kwargs:
raise CosimulationError(_error.SigNotFound, n)
fromSignames.append(n)
fromSigs.append(kwargs[n])
fromSizes.append(int(e[i+1]))
os.write(wf, b"OK")
elif e[0] == "TO":
if int(e[1]) != 0:
raise CosimulationError(_error.TimeZero, "$to_myhdl")
for i in range(2, len(e)-1, 2):
n = e[i]
if n in toSignames:
raise CosimulationError(_error.DuplicateSigNames, n)
if not n in kwargs:
raise CosimulationError(_error.SigNotFound, n)
toSignames.append(n)
toSigs.append(kwargs[n])
toSigDict[n] = kwargs[n]
toSizes.append(int(e[i+1]))
os.write(wf, b"OK")
elif e[0] == "START":
if not toSignames:
raise CosimulationError(_error.NoCommunication)
os.write(wf, b"OK")
break
else:
raise CosimulationError("Unexpected cosim input")
def _get(self):
if not self._getMode:

View File

@ -78,7 +78,7 @@ class Simulation(object):
_simulator._cosim = 0
os.close(cosim._rt)
os.close(cosim._wf)
os.waitpid(cosim._child_pid, 0)
cosim._child.wait()
if _simulator._tracing:
_simulator._tracing = 0
_simulator._tf.close()

View File

@ -60,7 +60,7 @@ class CosimulationTest(TestCase):
try:
Cosimulation("bla -x 45")
except CosimulationError as e:
self.assertTrue(e.kind in(_error.OSError, _error.SimulationEnd))
self.assertEqual(e.kind, _error.OSError)
else:
self.fail()