Merge pull request #436 from ghecko/spi-tpm-PD

adding tpm_spi stacked protocol decoder
This commit is contained in:
dreamsourcelabTAI 2022-06-16 09:25:30 +08:00 committed by GitHub
commit 9545d18b16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 716 additions and 0 deletions

View File

@ -0,0 +1,299 @@
# Source: https://raw.githubusercontent.com/WKPlus/rangedict/master/rangedict.py
__all__ = ['RangeDict']
class Color(object):
BLACK = 0
RED = 1
class Node(object):
__slots__ = ('r', 'left', 'right', 'value', 'color', 'parent')
def __init__(self, r, value, parent=None, color=Color.RED):
self.r = r
self.value = value
self.parent = parent
self.color = color
self.left = None
self.right = None
def value_copy(self, other):
self.r = other.r
self.value = other.value
class RangeDict(dict):
def __init__(self):
self._root = None
def __setitem__(self, r, v):
if r[1] < r[0]:
raise KeyError
node = self._insert(r, v)
self._insert_adjust(node)
def _insert(self, r, v):
if not self._root:
self._root = Node(r, v)
return self._root
cur = self._root
while True:
if r[1] < cur.r[0]:
if not cur.left:
cur.left = Node(r, v, cur)
return cur.left
cur = cur.left
elif r[0] > cur.r[1]:
if not cur.right:
cur.right = Node(r, v, cur)
return cur.right
cur = cur.right
else:
raise KeyError # overlap not supported
def _insert_adjust(self, node):
''' adjust to make the tree still a red black tree '''
if not node.parent:
node.color = Color.BLACK
return
if node.parent.color == Color.BLACK:
return
uncle = self.sibling(node.parent)
if node_color(uncle) == Color.RED:
node.parent.color = Color.BLACK
uncle.color = Color.BLACK
node.parent.parent.color = Color.RED
return self._insert_adjust(node.parent.parent)
#parent is red and uncle is black
# since parent is red, grandparent must exists and be black
parent = node.parent
grandparent = parent.parent
if self.is_left_son(parent, grandparent):
if self.is_left_son(node, parent):
self.right_rotate(grandparent)
grandparent.color = Color.RED
parent.color = Color.BLACK
else:
self.left_rotate(parent)
self.right_rotate(grandparent)
grandparent.color = Color.RED
node.color = Color.BLACK
else:
if self.is_left_son(node, parent):
self.right_rotate(parent)
self.left_rotate(grandparent)
grandparent.color = Color.RED
node.color = Color.BLACK
else:
self.left_rotate(grandparent)
grandparent.color = Color.RED
parent.color = Color.BLACK
def _find_key(self, key):
cur = self._root
while cur:
if key > cur.r[1]:
cur = cur.right
elif key < cur.r[0]:
cur = cur.left
else:
break
return cur
def _find_range(self, r):
cur = self._root
while cur:
if r[1] < cur.r[0]:
cur = cur.left
elif r[0] > cur.r[1]:
cur = cur.right
elif r[0] == cur.r[0] and r[1] == cur.r[1]:
return cur
else:
raise KeyError
raise KeyError
def __getitem__(self, key):
tar = self._find_key(key)
if tar:
return tar.value
raise KeyError
def __contains__(self, key):
return bool(self._find_key(key))
def __delitem__(self, r):
node = self._find_range(r)
if node.left and node.right:
left_rightest_child = self.find_rightest(node.left)
node.value_copy(left_rightest_child)
node = left_rightest_child
self._delete(node)
def _delete(self, node):
# node has at most one child
child = node.left if node.left else node.right
if not node.parent: # node is root
self._root = child
if self._root:
self._root.parent = None
self._root.color = Color.BLACK
return
parent = node.parent
if not child:
child = Node(None, None, parent, Color.BLACK)
if self.is_left_son(node, parent):
parent.left = child
else:
parent.right = child
child.parent = parent
if node.color == Color.RED:
# no need to adjust when deleting a red node
return
if node_color(child) == Color.RED:
child.color = Color.BLACK
return
self._delete_adjust(child)
if not child.r:
# mock a None node for adjust, need to delete it after that
parent = child.parent
if self.is_left_son(child, parent):
parent.left = None
else:
parent.right = None
def _delete_adjust(self, node):
if not node.parent:
node.color = Color.BLACK
return
parent = node.parent
sibling = self.sibling(node)
if node_color(sibling) == Color.RED:
if self.is_left_son(node, parent):
self.left_rotate(parent)
else:
self.right_rotate(parent)
parent.color = Color.RED
sibling.color = Color.BLACK
sibling = self.sibling(node) # must be black
# sibling must be black now
if not self.is_black(parent) and self.is_black(sibling.left) and \
self.is_black(sibling.right):
parent.color = Color.BLACK
sibling.color = Color.RED
return
if self.is_black(parent) and self.is_black(sibling.left) and \
self.is_black(sibling.right):
sibling.color = Color.RED
return self._delete_adjust(parent)
if self.is_left_son(node, parent):
if not self.is_black(sibling.left) and \
self.is_black(sibling.right):
sibling.left.color = Color.BLACK
sibling.color = Color.RED
self.right_rotate(sibling)
sibling = sibling.parent
# sibling.right must be red
sibling.color = parent.color
parent.color = Color.BLACK
sibling.right.color = Color.BLACK
self.left_rotate(parent)
else:
if not self.is_black(sibling.right) and \
self.is_black(sibling.left):
sibling.right.color = Color.BLACK
sibling.color = Color.RED
self.left_rotate(parent)
sibling = sibling.parent
# sibling.left must be red
sibling.color = parent.color
parent.color = Color.BLACK
sibling.left.color = Color.RED
self.right_rotate(parent)
def left_rotate(self, node):
right_son = node.right
if not node.parent:
self._root = right_son
elif self.is_left_son(node, node.parent):
node.parent.left = right_son
else:
node.parent.right = right_son
right_son.parent = node.parent
node.parent = right_son
node.right = right_son.left
right_son.left = node
def right_rotate(self, node):
left_son = node.left
if not node.parent:
self._root = left_son
elif self.is_left_son(node, node.parent):
node.parent.left = left_son
else:
node.parent.right = left_son
left_son.parent = node.parent
node.parent = left_son
node.left = left_son.right
left_son.right = node
@staticmethod
def sibling(node):
if node.parent.left == node:
return node.parent.right
else:
return node.parent.left
@staticmethod
def is_left_son(child, parent):
if parent.left == child:
return True
else:
return False
@staticmethod
def find_rightest(node):
while node.right:
node = node.right
return node
@staticmethod
def is_black(node):
return node_color(node) == Color.BLACK
def node_color(node):
if not node:
return Color.BLACK
return node.color
def in_order(root):
ret = []
if not root:
return []
return in_order(root.left) + [root.value] + in_order(root.right)
def height(root):
if not root:
return 0
return 1 + max(height(root.left), height(root.right))
if __name__ == '__main__':
pass

View File

@ -0,0 +1,25 @@
##
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2021 ghecko - Jordan Ovrè <ghecko78@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
'''
This decoder stacks on top of the 'spi' PD and decodes TPM transactions.
It automatically extract BitLocker Volume Master Key (VMK)
'''
from .pd import Decoder

View File

@ -0,0 +1,118 @@
##
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2021 ghecko - Jordan Ovrè <ghecko78@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
from .RangeDict import RangeDict
# RangeDict which maps range of FIFO Register space to their names.
# Register space Addresses is defined on the paragraph 6.3.2 of the following document: https://trustedcomputinggroup.org/wp-content/uploads/PC-Client-Specific-Platform-TPM-Profile-for-TPM-2p0-v1p05p_r14_pub.pdf
# Credit: https://github.com/FSecureLABS/bitlocker-spi-toolkit
fifo_registers = RangeDict()
fifo_registers[(0x0000, 0x0000)] = "TPM_ACCESS_0"
fifo_registers[(0x0001, 0x0007)] = "Reserved"
fifo_registers[(0x0008, 0x000b)] = "TPM_INT_ENABLE_0"
fifo_registers[(0x000c, 0x000c)] = "TPM_INT_VECTOR_0"
fifo_registers[(0x000d, 0x000f)] = "Reserved"
fifo_registers[(0x0010, 0x0013)] = "TPM_INT_STATUS_0"
fifo_registers[(0x0014, 0x0017)] = "TPM_INTF_CAPABILITY_0"
fifo_registers[(0x0018, 0x001b)] = "TPM_STS_0"
fifo_registers[(0x001c, 0x0023)] = "Reserved"
fifo_registers[(0x0024, 0x0027)] = "TPM_DATA_FIFO_0"
fifo_registers[(0x0028, 0x002f)] = "Reserved"
fifo_registers[(0x0030, 0x0033)] = "TPM_INTERFACE_ID_0"
fifo_registers[(0x0034, 0x007f)] = "Reserved"
fifo_registers[(0x0080, 0x0083)] = "TPM_XDATA_FIFO_0"
fifo_registers[(0x0084, 0x0881)] = "Reserved"
fifo_registers[(0x0f00, 0x0f03)] = "TPM_DID_VID_0"
fifo_registers[(0x0f04, 0x0f04)] = "TPM_RID_0"
fifo_registers[(0x0f90, 0x0fff)] = "Reserved"
fifo_registers[(0x1000, 0x1000)] = "TPM_ACCESS_1"
fifo_registers[(0x1001, 0x1007)] = "Reserved"
fifo_registers[(0x1008, 0x100b)] = "TPM_INT_ENABLE_1"
fifo_registers[(0x100c, 0x100c)] = "TPM_INT_VECTOR_1"
fifo_registers[(0x100d, 0x100f)] = "Reserved"
fifo_registers[(0x1010, 0x1013)] = "TPM_INT_STATUS_1"
fifo_registers[(0x1014, 0x1017)] = "TPM_INTF_CAPABILITY_1"
fifo_registers[(0x1018, 0x101b)] = "TPM_STS_1"
fifo_registers[(0x101c, 0x1023)] = "Reserved"
fifo_registers[(0x1024, 0x1027)] = "TPM_DATA_FIFO_1"
fifo_registers[(0x1028, 0x102f)] = "Reserved"
fifo_registers[(0x1030, 0x1030)] = "TPM_INTERFACE_ID_1"
fifo_registers[(0x1037, 0x107f)] = "Reserved"
fifo_registers[(0x1080, 0x1083)] = "TPM_XDATA_FIFO_1"
fifo_registers[(0x1084, 0x1eff)] = "Reserved"
fifo_registers[(0x1f00, 0x1f03)] = "TPM_DID_VID_1"
fifo_registers[(0x1f04, 0x1f04)] = "TPM_RID_1"
fifo_registers[(0x1f05, 0x1fff)] = "Reserved"
fifo_registers[(0x2000, 0x2000)] = "TPM_ACCESS_2"
fifo_registers[(0x2001, 0x2007)] = "Reserved"
fifo_registers[(0x2008, 0x200b)] = "TPM_INT_ENABLE_2"
fifo_registers[(0x200c, 0x200c)] = "TPM_INT_VECTOR_2"
fifo_registers[(0x200d, 0x200f)] = "Reserved"
fifo_registers[(0x2010, 0x2013)] = "TPM_INT_STATUS_2"
fifo_registers[(0x2014, 0x2017)] = "TPM_INTF_CAPABILITY_2"
fifo_registers[(0x2018, 0x201b)] = "TPM_STS_2"
fifo_registers[(0x201c, 0x2023)] = "Reserved"
fifo_registers[(0x2024, 0x2027)] = "TPM_DATA_FIFO_2"
fifo_registers[(0x2028, 0x202f)] = "Reserved"
fifo_registers[(0x2030, 0x2033)] = "TPM_INTERFACE_ID_2"
fifo_registers[(0x2034, 0x207f)] = "Reserved"
fifo_registers[(0x2080, 0x2083)] = "TPM_XDATA_FIFO_2"
fifo_registers[(0x2084, 0x2eff)] = "Reserved"
fifo_registers[(0x2f00, 0x2f03)] = "TPM_DID_VID_2"
fifo_registers[(0x2f04, 0x2f04)] = "TPM_RID_2"
fifo_registers[(0x2f05, 0x2fff)] = "Reserved"
fifo_registers[(0x3000, 0x3000)] = "TPM_ACCESS_3"
fifo_registers[(0x3001, 0x3007)] = "Reserved"
fifo_registers[(0x3008, 0x300b)] = "TPM_INT_ENABLE_3"
fifo_registers[(0x300c, 0x300c)] = "TPM_INT_VECTOR_3"
fifo_registers[(0x300d, 0x300f)] = "Reserved"
fifo_registers[(0x3010, 0x3013)] = "TPM_INT_STATUS_3"
fifo_registers[(0x3014, 0x3017)] = "TPM_INTF_CAPABILITY_3"
fifo_registers[(0x3018, 0x301b)] = "TPM_STS_3"
fifo_registers[(0x301c, 0x3023)] = "Reserved"
fifo_registers[(0x3024, 0x3027)] = "TPM_DATA_FIFO_3"
fifo_registers[(0x3028, 0x302f)] = "Reserved"
fifo_registers[(0x3030, 0x3033)] = "TPM_INTERFACE_ID_3"
fifo_registers[(0x3034, 0x307f)] = "Reserved"
fifo_registers[(0x3080, 0x3083)] = "TPM_XDATA_FIFO_3"
fifo_registers[(0x3084, 0x3eff)] = "Reserved"
fifo_registers[(0x3f00, 0x3f03)] = "TPM_DID_VID_3"
fifo_registers[(0x3f04, 0x3f04)] = "TPM_RID_3"
fifo_registers[(0x3f05, 0x3fff)] = "Reserved"
fifo_registers[(0x4000, 0x4000)] = "TPM_ACCESS_4"
fifo_registers[(0x4001, 0x4007)] = "Reserved"
fifo_registers[(0x4008, 0x400b)] = "TPM_INT_ENABLE_4"
fifo_registers[(0x400c, 0x400c)] = "TPM_INT_VECTOR_4"
fifo_registers[(0x400d, 0x400f)] = "Reserved"
fifo_registers[(0x4010, 0x4013)] = "TPM_INT_STATUS_4"
fifo_registers[(0x4014, 0x4017)] = "TPM_INTF_CAPABILITY_4"
fifo_registers[(0x4018, 0x401b)] = "TPM_STS_4"
fifo_registers[(0x401c, 0x401f)] = "Reserved"
fifo_registers[(0x4020, 0x4023)] = "TPM_HASH_END"
fifo_registers[(0x4024, 0x4027)] = "TPM_DATA_FIFO_4"
fifo_registers[(0x4028, 0x402f)] = "TPM_HASH_START"
fifo_registers[(0x4030, 0x4033)] = "TPM_INTERFACE_ID_4"
fifo_registers[(0x4034, 0x407f)] = "Reserved"
fifo_registers[(0x4080, 0x4083)] = "TPM_XDATA_FIFO_4"
fifo_registers[(0x4084, 0x4eff)] = "Reserved"
fifo_registers[(0x4f00, 0x4f03)] = "TPM_DID_VID_4"
fifo_registers[(0x4f04, 0x4f04)] = "TPM_RID_4"
fifo_registers[(0x4f05, 0x4fff)] = "Reserved"
fifo_registers[(0x5000, 0x5fff)] = "Reserved"

View File

@ -0,0 +1,274 @@
##
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2021 ghecko - Jordan Ovrè <ghecko78@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
import sigrokdecode as srd
import re
from collections import deque
from .lists import fifo_registers
class Ann:
""" Annotation ID """
READ, WRITE, ADDRESS, DATA, VMK = range(5)
class Operation:
""" TPM transaction type """
READ = 0x80
WRITE = 0x00
class TransactionState:
""" State of the transaction """
READ = 0
WRITE = 1
READ_ADDRESS = 2
WAIT = 3
TRANSFER_DATA = 4
class Transaction:
"""Represent one TPM SPI transaction
Args:
start_sample: The absolute samplenumber of the first sample of this transaction.
operation: Transaction type.
transfer_size: The number of data bytes.
Attributes:
start_sample (int): The absolute samplenumber of the first sample of this transaction.
end_sample (int): The absolute samplenumber of the last sample of this transaction.
operation (Operation): Transaction type.
address (bytearray): The register address in the transaction. (big-endian).
data (bytearray): Data in the transaction.
size (int): The number of data bytes.
wait_count (int): Holds the number of wait states between the address and data .
"""
def __init__(self, start_sample, operation, transfer_size):
self.start_sample = start_sample
self.end_sample_op = None
self.end_sample_addr = None
self.end_sample_data = None
self.operation = operation
self.address = bytearray()
self.data = bytearray()
self.size = transfer_size
self.wait_count = 0
def is_complete(self):
"""
Check if the transaction is complete.
A transaction is complete when all address and data bytes are captured.
"""
return self.is_address_complete() and self.is_data_complete()
def is_data_complete(self):
""" Check if all data bytes are captured """
return len(self.data) == self.size
def is_address_complete(self):
""" Check if all address bytes are captured. """
return len(self.address) == 3
def frame(self):
""" Return address and data annotation if the transaction is complete. """
if self.is_complete():
register_name = ""
try:
register_name = fifo_registers[int.from_bytes(self.address, "big") & 0xffff]
except KeyError:
register_name = "Unknown"
wait_str = '' if self.wait_count == 0 else ' (Waits: {})'.format(self.wait_count)
data_str = ''.join('{:02x}'.format(x) for x in self.data)
op_ann = ['Read', 'Rd'] if self.operation == Operation.READ else ['Write', 'Wr']
addr_ann = ['Register: {}'.format(register_name), '{}'.format(register_name)]
data_ann = ['{}{}'.format(data_str, wait_str), '{}'.format(data_str), data_str]
return ((self.start_sample, self.end_sample_op, op_ann),
(self.end_sample_op, self.end_sample_addr, addr_ann),
(self.end_sample_addr, self.end_sample_data, data_ann))
return None
class Decoder(srd.Decoder):
api_version = 3
id = 'spi_tpm'
name = 'SPI TPM'
longname = 'SPI TPM transactions'
desc = 'Parses TPM transactions from SPI bus with automatic VMK extraction for BitLocker.'
license = 'gplv2+'
inputs = ['spi']
outputs = []
tags = ['IC', 'TPM', 'BitLocker']
annotations = (
('Read', 'Read register operation'),
('Write', 'Write register operation'),
('Address', 'Register address'),
('Data', 'Data'),
('VMK', 'Extracted BitLocker VMK'),
)
annotation_rows = (
('Transactions', 'TPM transactions', (0, 1, 2, 3)),
('B-VMK', 'BitLocker Volume Master Key', (4,)),
)
options = (
{'id': 'wait_mask', 'desc': 'TPM Wait transfer Mask', 'default': '0x00',
'values': ('0x00', '0xFE')},
)
def __init__(self):
self.end_wait = 0x01
self.operation_mask = 0x80
self.address_mask = 0x3f
# Circular buffer to detect VMK header on transaction data
self.queue = deque(maxlen=12)
self.vmk_meta = {"s_queue": deque(maxlen=12), "vmk_ss": 0, "vmk_es": 0}
self.saving_vmk = False
self.vmk = []
self.reset()
self.state_machine = None
self.init_state_machine()
def reset(self):
self.ss = self.es = 0
self.state = None
self.current_transaction = None
def end_current_transaction(self):
self.reset()
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
def init_state_machine(self):
self.state_machine = {
TransactionState.READ: self._transaction_read,
TransactionState.WRITE: self._transaction_write,
TransactionState.READ_ADDRESS: self._transaction_read_addr,
TransactionState.WAIT: self._transaction_wait,
TransactionState.TRANSFER_DATA: self._transaction_data,
None: self._return
}
def _return(self, mosi, miso):
return
def transaction_state(self, mosi):
if (mosi & self.operation_mask) == Operation.READ:
return TransactionState.READ
elif (mosi & self.operation_mask) == Operation.WRITE:
return TransactionState.WRITE
else:
return None
def _transaction_wait(self, mosi, miso):
self.current_transaction.wait_count += 1
if miso == self.end_wait:
self.state = TransactionState.TRANSFER_DATA
def _transaction_read(self, mosi, miso):
# TPM operation is defined on the 7th bit of the first byte of the transaction (1=read / 0=write)
# transfer size is defined on bits 0 to 5 of the first byte of the transaction
transfer_size = (mosi & 0x3f) + 1
self.current_transaction = Transaction(self.ss, Operation.READ, transfer_size)
self.current_transaction.end_sample_op = self.es
self.state = TransactionState.READ_ADDRESS
def _transaction_write(self, mosi, miso):
# TPM operation is defined on the 7th bit of the first byte of the transaction (1=read / 0=write)
# transfer size is defined on bits 0 to 5 of the first byte of the transaction
transfer_size = (mosi & 0x3f) + 1
self.current_transaction = Transaction(self.ss, Operation.WRITE, transfer_size)
self.current_transaction.end_sample_op = self.es
self.state = TransactionState.READ_ADDRESS
def _transaction_read_addr(self, mosi, miso):
# Get address bytes
# Address is 3 bytes long
self.current_transaction.address.extend(mosi.to_bytes(1, byteorder='big'))
# The transfer size byte is sent at the same time than the last byte address
if self.current_transaction.is_address_complete():
self.current_transaction.end_sample_addr = self.es
self.state = TransactionState.TRANSFER_DATA
return
def _transaction_data(self, mosi, miso):
self.current_transaction.end_sample_data = self.es
if miso == self.wait_mask:
self.state = TransactionState.WAIT
return
if self.current_transaction.operation == Operation.READ:
self.current_transaction.data.extend(miso.to_bytes(1, byteorder='big'))
self.recover_vmk(miso)
elif self.current_transaction.operation == Operation.WRITE:
self.current_transaction.data.extend(mosi.to_bytes(1, byteorder='big'))
# Check if the transaction is complete
annotation = self.current_transaction.frame()
if annotation:
(op_ss, op_es, op_ann), (addr_ss, addr_es, addr_ann), (data_ss, data_es, data_ann) = annotation
self.put(op_ss, op_es, self.out_ann,
[Ann.READ if self.current_transaction.operation == Operation.READ else Ann.WRITE, op_ann])
self.put(addr_ss, addr_es, self.out_ann, [Ann.ADDRESS, addr_ann])
self.put(data_ss, data_es, self.out_ann, [Ann.DATA, data_ann])
self.end_current_transaction()
def check_vmk_header(self):
""" Check for VMK header """
if self.queue[0] == 0x2c:
potential_header = ''.join('{:02x}'.format(x) for x in self.queue)
if re.findall(r'2c000[0-6]000[1-9]000[0-1]000[0-5]200000', potential_header):
self.put(self.vmk_meta["s_queue"][0], self.es, self.out_ann,
[Ann.VMK, ['VMK header: {}'.format(potential_header)]])
self.saving_vmk = True
def recover_vmk(self, miso):
""" Check if VMK is releasing """
if not self.saving_vmk:
# Add data to the circular buffer
self.queue.append(miso)
# Add sample number to meta queue
self.vmk_meta["s_queue"].append(self.ss)
# Check if VMK header retrieved
self.check_vmk_header()
else:
if len(self.vmk) == 0:
self.vmk_meta["vmk_ss"] = self.ss
if len(self.vmk) < 32:
self.vmk.append(miso)
self.vmk_meta["vmk_es"] = self.es
else:
self.saving_vmk = False
self.put(self.vmk_meta["vmk_ss"], self.vmk_meta["vmk_es"], self.out_ann,
[Ann.VMK, ['VMK: {}'.format(''.join('{:02x}'.format(x) for x in self.vmk))]])
def decode(self, ss, es, data):
self.wait_mask = bytes.fromhex(self.options['wait_mask'].strip("0x"))
self.ss, self.es = ss, es
ptype, mosi, miso = data
if ptype == 'CS-CHANGE':
self.end_current_transaction()
if ptype != 'DATA':
return
if self.state is None:
self.state = self.transaction_state(mosi)
self.state_machine[self.state](mosi, miso)