mirror of
https://github.com/DreamSourceLab/DSView.git
synced 2025-02-02 13:52:55 +08:00
313 lines
12 KiB
Python
313 lines
12 KiB
Python
##
|
|
## This file is part of the libsigrokdecode project.
|
|
##
|
|
## Copyright (C) 2021 ghecko - Jordan Ovrè <ghecko78@gmail.com>
|
|
##
|
|
## This program is free software; you can redistribute it and/or modify
|
|
## it under the terms of the GNU General Public License as published by
|
|
## the Free Software Foundation; either version 2 of the License, or
|
|
## (at your option) any later version.
|
|
##
|
|
## This program is distributed in the hope that it will be useful,
|
|
## but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
## GNU General Public License for more details.
|
|
##
|
|
## You should have received a copy of the GNU General Public License
|
|
## along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
##
|
|
|
|
import sigrokdecode as srd
|
|
import re
|
|
from collections import deque
|
|
from .lists import fifo_registers2, fifo_registers1
|
|
|
|
|
|
class Ann:
|
|
""" Annotation ID """
|
|
READ, WRITE, ADDRESS, WAIT, DATA, VMK = range(6)
|
|
|
|
|
|
class Operation:
|
|
""" TPM transaction type """
|
|
READ = 0x80
|
|
WRITE = 0x00
|
|
|
|
|
|
class TransactionState:
|
|
""" State of the transaction """
|
|
READ = 0
|
|
WRITE = 1
|
|
READ_ADDRESS = 2
|
|
WAIT = 3
|
|
TRANSFER_DATA = 4
|
|
|
|
|
|
class Transaction:
|
|
"""Represent one TPM SPI transaction
|
|
Args:
|
|
start_sample: The absolute samplenumber of the first sample of this transaction.
|
|
operation: Transaction type.
|
|
transfer_size: The number of data bytes.
|
|
Attributes:
|
|
start_sample (int): The absolute samplenumber of the first sample of this transaction.
|
|
end_sample (int): The absolute samplenumber of the last sample of this transaction.
|
|
operation (Operation): Transaction type.
|
|
address (bytearray): The register address in the transaction. (big-endian).
|
|
data (bytearray): Data in the transaction.
|
|
size (int): The number of data bytes.
|
|
wait_count (int): Holds the number of wait states between the address and data .
|
|
"""
|
|
|
|
def __init__(self, start_sample, operation, transfer_size):
|
|
self.start_sample = start_sample
|
|
self.end_sample_op = None
|
|
self.end_sample_addr = None
|
|
self.end_sample_data = None
|
|
self.end_sample_wait = None
|
|
self.operation = operation
|
|
self.address = bytearray()
|
|
self.data = bytearray()
|
|
self.size = transfer_size
|
|
self.wait_count = 0
|
|
|
|
def is_complete(self):
|
|
"""
|
|
Check if the transaction is complete.
|
|
A transaction is complete when all address and data bytes are captured.
|
|
"""
|
|
return self.is_address_complete() and self.is_data_complete()
|
|
|
|
def is_data_complete(self):
|
|
""" Check if all data bytes are captured """
|
|
return len(self.data) == self.size
|
|
|
|
def is_address_complete(self):
|
|
""" Check if all address bytes are captured. """
|
|
return len(self.address) == 3
|
|
|
|
def frame(self, fifo_registers):
|
|
""" Return address and data annotation if the transaction is complete. """
|
|
if self.is_complete():
|
|
register_name = ""
|
|
try:
|
|
register_name = fifo_registers[int.from_bytes(self.address, "big") & 0xffff]
|
|
except KeyError:
|
|
register_name = "Unknown"
|
|
data_str = ''.join('{:02x}'.format(x) for x in self.data)
|
|
op_ann = ['Read', 'Rd'] if self.operation == Operation.READ else ['Write', 'Wr']
|
|
wait_ann = ['Wait', 'Wt']
|
|
addr_ann = ['Register: {}'.format(register_name), '{}'.format(register_name)]
|
|
data_ann = ['{}'.format(data_str), '{}'.format(data_str), data_str]
|
|
if self.wait_count > 0:
|
|
return ((self.start_sample, self.end_sample_op, op_ann),
|
|
(self.end_sample_op, self.end_sample_addr, addr_ann),
|
|
(self.end_sample_addr, self.end_sample_wait, wait_ann),
|
|
(self.end_sample_wait, self.end_sample_data, data_ann))
|
|
else:
|
|
return ((self.start_sample, self.end_sample_op, op_ann),
|
|
(self.end_sample_op, self.end_sample_addr, addr_ann),
|
|
(self.end_sample_addr, self.end_sample_data, data_ann))
|
|
return None
|
|
|
|
|
|
class Decoder(srd.Decoder):
|
|
api_version = 3
|
|
id = 'spi_tpm'
|
|
name = 'SPI TPM'
|
|
longname = 'SPI TPM transactions'
|
|
desc = 'Parses TPM transactions from SPI bus with automatic VMK extraction for BitLocker.'
|
|
license = 'gplv2+'
|
|
inputs = ['spi']
|
|
outputs = []
|
|
tags = ['IC', 'TPM', 'BitLocker']
|
|
annotations = (
|
|
('Read', 'Read register operation'),
|
|
('Write', 'Write register operation'),
|
|
('Address', 'Register address'),
|
|
('Data', 'Data'),
|
|
('Wait', 'Wait'),
|
|
('VMK', 'Extracted BitLocker VMK'),
|
|
)
|
|
annotation_rows = (
|
|
('Transactions', 'TPM transactions', (0, 1, 2, 3, 4)),
|
|
('B-VMK', 'BitLocker Volume Master Key', (5,)),
|
|
)
|
|
options = (
|
|
{'id': 'tpm_version', 'desc': 'TPM Version 1.2 or 2.0', 'default': '2.0',
|
|
'values': ('2.0', '1.2')},
|
|
)
|
|
|
|
def __init__(self):
|
|
# TPM Profile Specification for TPM 2.0 page 133-134
|
|
self.end_wait = 0x01
|
|
self.operation_mask = 0x80
|
|
self.address_mask = 0x3f
|
|
# Circular buffer to detect VMK header on transaction data
|
|
self.queue = deque(maxlen=12)
|
|
self.vmk_meta = {"s_queue": deque(maxlen=12), "vmk_ss": 0, "vmk_es": 0}
|
|
self.saving_vmk = False
|
|
self.vmk = []
|
|
self.reset()
|
|
self.state_machine = None
|
|
self.init_state_machine()
|
|
self.fifo_registers = None
|
|
|
|
def reset(self):
|
|
self.ss = self.es = 0
|
|
self.state = None
|
|
self.current_transaction = None
|
|
|
|
def end_current_transaction(self):
|
|
self.reset()
|
|
|
|
def start(self):
|
|
self.out_ann = self.register(srd.OUTPUT_ANN)
|
|
if self.options['tpm_version'] == "2.0":
|
|
self.wait_mask = 0x00
|
|
self.fifo_registers = fifo_registers2
|
|
else:
|
|
self.wait_mask = 0xFE
|
|
self.fifo_registers = fifo_registers1
|
|
|
|
def init_state_machine(self):
|
|
self.state_machine = {
|
|
TransactionState.READ: self._transaction_read,
|
|
TransactionState.WRITE: self._transaction_write,
|
|
TransactionState.READ_ADDRESS: self._transaction_read_addr,
|
|
TransactionState.WAIT: self._transaction_wait,
|
|
TransactionState.TRANSFER_DATA: self._transaction_data,
|
|
None: self._return
|
|
}
|
|
|
|
def _return(self, mosi, miso):
|
|
return
|
|
|
|
def transaction_state(self, mosi):
|
|
if (mosi & self.operation_mask) == Operation.READ:
|
|
return TransactionState.READ
|
|
elif (mosi & self.operation_mask) == Operation.WRITE:
|
|
return TransactionState.WRITE
|
|
else:
|
|
return None
|
|
|
|
def _transaction_wait(self, mosi, miso):
|
|
self.current_transaction.wait_count += 1
|
|
if miso == self.end_wait:
|
|
self.current_transaction.end_sample_wait = self.es
|
|
self.state = TransactionState.TRANSFER_DATA
|
|
return
|
|
|
|
def _transaction_read(self, mosi, miso):
|
|
# TPM operation is defined on the 7th bit of the first byte of the transaction (1=read / 0=write)
|
|
# transfer size is defined on bits 0 to 5 of the first byte of the transaction
|
|
transfer_size = (mosi & 0x3f) + 1
|
|
self.current_transaction = Transaction(self.ss, Operation.READ, transfer_size)
|
|
self.current_transaction.end_sample_op = self.es
|
|
self.state = TransactionState.READ_ADDRESS
|
|
|
|
def _transaction_write(self, mosi, miso):
|
|
# TPM operation is defined on the 7th bit of the first byte of the transaction (1=read / 0=write)
|
|
# transfer size is defined on bits 0 to 5 of the first byte of the transaction
|
|
transfer_size = (mosi & 0x3f) + 1
|
|
self.current_transaction = Transaction(self.ss, Operation.WRITE, transfer_size)
|
|
self.current_transaction.end_sample_op = self.es
|
|
self.state = TransactionState.READ_ADDRESS
|
|
|
|
def _transaction_read_addr(self, mosi, miso):
|
|
# Get address bytes
|
|
# Address is 3 bytes long
|
|
self.current_transaction.address.extend(mosi.to_bytes(1, byteorder='big'))
|
|
if self.current_transaction.is_address_complete():
|
|
self.current_transaction.end_sample_addr = self.es
|
|
if miso == self.wait_mask:
|
|
self.state = TransactionState.WAIT
|
|
else:
|
|
self.state = TransactionState.TRANSFER_DATA
|
|
return
|
|
|
|
def _transaction_data(self, mosi, miso):
|
|
self.current_transaction.end_sample_data = self.es
|
|
if self.current_transaction.operation == Operation.READ:
|
|
self.current_transaction.data.extend(miso.to_bytes(1, byteorder='big'))
|
|
self.recover_vmk(miso)
|
|
elif self.current_transaction.operation == Operation.WRITE:
|
|
self.current_transaction.data.extend(mosi.to_bytes(1, byteorder='big'))
|
|
# Check if the transaction is complete
|
|
annotation = self.current_transaction.frame(self.fifo_registers)
|
|
if annotation:
|
|
if self.current_transaction.wait_count == 0:
|
|
(op_ss, op_es, op_ann), (addr_ss, addr_es, addr_ann), (data_ss, data_es, data_ann) = annotation
|
|
self.put(op_ss, op_es, self.out_ann,
|
|
[Ann.READ if self.current_transaction.operation == Operation.READ else Ann.WRITE, op_ann])
|
|
self.put(addr_ss, addr_es, self.out_ann, [Ann.ADDRESS, addr_ann])
|
|
self.put(data_ss, data_es, self.out_ann, [Ann.DATA, data_ann])
|
|
else:
|
|
(op_ss, op_es, op_ann), (addr_ss, addr_es, addr_ann), (wait_ss, wait_es, wait_ann), (data_ss, data_es, data_ann) = annotation
|
|
self.put(op_ss, op_es, self.out_ann,
|
|
[Ann.READ if self.current_transaction.operation == Operation.READ else Ann.WRITE, op_ann])
|
|
self.put(addr_ss, addr_es, self.out_ann, [Ann.ADDRESS, addr_ann])
|
|
self.put(wait_ss, wait_es, self.out_ann, [Ann.WAIT, wait_ann])
|
|
self.put(data_ss, data_es, self.out_ann, [Ann.DATA, data_ann])
|
|
self.end_current_transaction()
|
|
|
|
def _is_vmk_transaction(self):
|
|
try:
|
|
if self.fifo_registers[int.from_bytes(self.current_transaction.address, "big") & 0xffff] == "TPM_DATA_FIFO_0":
|
|
return True
|
|
else:
|
|
return False
|
|
except KeyError:
|
|
return False
|
|
|
|
def check_vmk_header(self):
|
|
""" Check for VMK header """
|
|
if self.queue[0] == 0x2c:
|
|
potential_header = ''.join('{:02x}'.format(x) for x in self.queue)
|
|
if re.findall(r'2c000[0-6]000[1-9]000[0-1]000[0-5]200000', potential_header):
|
|
self.put(self.vmk_meta["s_queue"][0], self.es, self.out_ann,
|
|
[Ann.VMK, ['VMK header: {}'.format(potential_header)]])
|
|
self.saving_vmk = True
|
|
|
|
def recover_vmk(self, miso):
|
|
""" Check if VMK is releasing """
|
|
if not self.saving_vmk:
|
|
# Add data to the circular buffer
|
|
# Check if the transaction actually got the VMK.
|
|
# Sometimes, other TPM transactions occurs when recovering the VMK
|
|
if self._is_vmk_transaction():
|
|
self.queue.append(miso)
|
|
# Add sample number to meta queue
|
|
self.vmk_meta["s_queue"].append(self.ss)
|
|
# Check if VMK header retrieved
|
|
self.check_vmk_header()
|
|
else:
|
|
if len(self.vmk) == 0:
|
|
self.vmk_meta["vmk_ss"] = self.ss
|
|
if len(self.vmk) < 32:
|
|
# Check if the transaction actually got the VMK.
|
|
# Sometimes, other TPM transactions occurs when recovering the VMK
|
|
if self._is_vmk_transaction():
|
|
self.vmk.append(miso)
|
|
self.vmk_meta["vmk_es"] = self.es
|
|
else:
|
|
self.saving_vmk = False
|
|
self.put(self.vmk_meta["vmk_ss"], self.vmk_meta["vmk_es"], self.out_ann,
|
|
[Ann.VMK, ['VMK: {}'.format(''.join('{:02x}'.format(x) for x in self.vmk))]])
|
|
|
|
def decode(self, ss, es, data):
|
|
self.ss, self.es = ss, es
|
|
|
|
ptype, mosi, miso = data
|
|
|
|
if ptype == 'CS-CHANGE':
|
|
self.end_current_transaction()
|
|
|
|
if ptype != 'DATA':
|
|
return
|
|
|
|
if self.state is None:
|
|
self.state = self.transaction_state(mosi)
|
|
|
|
self.state_machine[self.state](mosi, miso)
|