mirror of
synced 2025-02-02 13:52:55 +08:00
Decoder Fixed: avr_pdi,lin,spdif,can-fd and dmx512
This commit is contained in:
@ -18,74 +18,102 @@
import sigrokdecode as srd
from common.srdhelper import bitpack_lsb
def disabled_enabled(v):
return ['Disabled', 'Enabled'][v]
def output_power(v):
return '%+ddBm' % [-4, -1, 2, 5][v]
return '{:+d}dBm'.format([-4, -1, 2, 5][v])
# Notes on the implementation:
# - A register's description is an iterable of tuples which contain:
# The starting bit position, the bit count, the name of a field, and
# an optional parser which interprets the field's content. Parser are
# expected to yield a single text string when they exist. Other types
# of output are passed to Python's .format() routine as is.
# - Bit fields' width in registers determines the range of indices in
# table/tuple lookups. Keep the implementation as robust as possible
# during future maintenance. Avoid Python runtime errors when adjusting
# the decoder.
regs = {
# reg: name offset width parser
0: [
('FRAC', 3, 12, None),
('INT', 15, 16, lambda v: 'Not Allowed' if v < 32 else v)
1: [
('MOD', 3, 12, None),
('Phase', 15, 12, None),
('Prescalar', 27, 1, lambda v: ['4/5', '8/9'][v]),
('Phase Adjust', 28, 1, lambda v: ['Off', 'On'][v]),
2: [
('Counter Reset', 3, 1, disabled_enabled),
('Charge Pump Three-State', 4, 1, disabled_enabled),
('Power-Down', 5, 1, disabled_enabled),
('PD Polarity', 6, 1, lambda v: ['Negative', 'Positive'][v]),
('LDP', 7, 1, lambda v: ['10ns', '6ns'][v]),
('LDF', 8, 1, lambda v: ['FRAC-N', 'INT-N'][v]),
('Charge Pump Current Setting', 9, 4, lambda v: '%0.2fmA @ 5.1kΩ' %
[0.31, 0.63, 0.94, 1.25, 1.56, 1.88, 2.19, 2.50,
2.81, 3.13, 3.44, 3.75, 4.06, 4.38, 4.69, 5.00][v]),
('Double Buffer', 13, 1, disabled_enabled),
('R Counter', 14, 10, None),
('RDIV2', 24, 1, disabled_enabled),
('Reference Doubler', 25, 1, disabled_enabled),
('MUXOUT', 26, 3, lambda v:
['Three-State Output', 'DVdd', 'DGND', 'R Counter Output', 'N Divider Output',
'Analog Lock Detect', 'Digital Lock Detect', 'Reserved'][v]),
('Low Noise and Low Spur Modes', 29, 2, lambda v:
['Low Noise Mode', 'Reserved', 'Reserved', 'Low Spur Mode'][v])
3: [
('Clock Divider', 3, 12, None),
('Clock Divider Mode', 15, 2, lambda v:
['Clock Divider Off', 'Fast Lock Enable', 'Resync Enable', 'Reserved'][v]),
('CSR Enable', 18, 1, disabled_enabled),
('Charge Cancellation', 21, 1, disabled_enabled),
('ABP', 22, 1, lambda v: ['6ns (FRAC-N)', '3ns (INT-N)'][v]),
('Band Select Clock Mode', 23, 1, lambda v: ['Low', 'High'][v])
4: [
('Output Power', 3, 2, output_power),
('Output Enable', 5, 1, disabled_enabled),
('AUX Output Power', 6, 2, output_power),
('AUX Output Select', 8, 1, lambda v: ['Divided Output', 'Fundamental'][v]),
('AUX Output Enable', 9, 1, disabled_enabled),
('MTLD', 10, 1, disabled_enabled),
('VCO Power-Down', 11, 1, lambda v:
'VCO Powered ' + ('Down' if v == 1 else 'Up')),
('Band Select Clock Divider', 12, 8, None),
('RF Divider Select', 20, 3, lambda v: '÷' + str(2**v)),
('Feedback Select', 23, 1, lambda v: ['Divided', 'Fundamental'][v]),
5: [
('LD Pin Mode', 22, 2, lambda v:
['Low', 'Digital Lock Detect', 'Low', 'High'][v])
# Register description fields:
# offset, width, name, parser.
0: (
( 3, 12, 'FRAC'),
(15, 16, 'INT',
None, lambda v: 'Not Allowed' if v < 23 else None,
1: (
( 3, 12, 'MOD'),
(15, 12, 'Phase'),
(27, 1, 'Prescalar', lambda v: ('4/5', '8/9',)[v]),
(28, 1, 'Phase Adjust', lambda v: ('Off', 'On',)[v]),
2: (
( 3, 1, 'Counter Reset', disabled_enabled),
( 4, 1, 'Charge Pump Three-State', disabled_enabled),
( 5, 1, 'Power-Down', disabled_enabled),
( 6, 1, 'PD Polarity', lambda v: ('Negative', 'Positive',)[v]),
( 7, 1, 'LDP', lambda v: ('10ns', '6ns',)[v]),
( 8, 1, 'LDF', lambda v: ('FRAC-N', 'INT-N',)[v]),
( 9, 4, 'Charge Pump Current Setting',
lambda v: '{curr:0.2f}mA @ 5.1kΩ'.format(curr = (
0.31, 0.63, 0.94, 1.25, 1.56, 1.88, 2.19, 2.50,
2.81, 3.13, 3.44, 3.75, 4.06, 4.38, 4.69, 5.00,
(13, 1, 'Double Buffer', disabled_enabled),
(14, 10, 'R Counter'),
(24, 1, 'RDIV2', disabled_enabled),
(25, 1, 'Reference Doubler', disabled_enabled),
(26, 3, 'MUXOUT',
lambda v: '{text}'.format(text = (
'Three-State Output', 'DVdd', 'DGND',
'R Counter Output', 'N Divider Output',
'Analog Lock Detect', 'Digital Lock Detect',
(29, 2, 'Low Noise and Low Spur Modes',
lambda v: '{text}'.format(text = (
'Low Noise Mode', 'Reserved', 'Reserved', 'Low Spur Mode',
3: (
( 3, 12, 'Clock Divider'),
(15, 2, 'Clock Divider Mode',
lambda v: '{text}'.format(text = (
'Clock Divider Off', 'Fast Lock Enable',
'Resync Enable', 'Reserved',
(18, 1, 'CSR Enable', disabled_enabled),
(21, 1, 'Charge Cancellation', disabled_enabled),
(22, 1, 'ABP', lambda v: ('6ns (FRAC-N)', '3ns (INT-N)',)[v]),
(23, 1, 'Band Select Clock Mode', lambda v: ('Low', 'High',)[v]),
4: (
( 3, 2, 'Output Power', output_power),
( 5, 1, 'Output Enable', disabled_enabled),
( 6, 2, 'AUX Output Power', output_power),
( 8, 1, 'AUX Output Select',
lambda v: ('Divided Output', 'Fundamental',)[v]),
( 9, 1, 'AUX Output Enable', disabled_enabled),
(10, 1, 'MTLD', disabled_enabled),
(11, 1, 'VCO Power-Down',
lambda v: 'VCO Powered {ud}'.format(ud = 'Down' if v else 'Up')),
(12, 8, 'Band Select Clock Divider'),
(20, 3, 'RF Divider Select', lambda v: '÷{:d}'.format(2 ** v)),
(23, 1, 'Feedback Select', lambda v: ('Divided', 'Fundamental',)[v]),
5: (
(22, 2, 'LD Pin Mode',
lambda v: '{text}'.format(text = (
'Low', 'Digital Lock Detect', 'Low', 'High',
( ANN_REG, ANN_WARN, ) = range(2)
class Decoder(srd.Decoder):
api_version = 3
@ -99,10 +127,12 @@ class Decoder(srd.Decoder):
tags = ['Clock/timing', 'IC', 'Wireless/RF']
annotations = (
# Sent from the host to the chip.
('register', 'Register written to the device'),
('write', 'Register write'),
('warning', "Warnings"),
annotation_rows = (
('registers', 'Register writes', (ANN_REG,)),
('writes', 'Register writes', (ANN_REG,)),
('warnings', 'Warnings', (ANN_WARN,)),
def __init__(self):
@ -114,31 +144,87 @@ class Decoder(srd.Decoder):
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
def decode_bits(self, offset, width):
return (sum([(1 << i) if self.bits[offset + i][0] else 0 for i in range(width)]),
(self.bits[offset + width - 1][1], self.bits[offset][2]))
def putg(self, ss, es, cls, data):
self.put(ss, es, self.out_ann, [ cls, data, ])
def decode_field(self, name, offset, width, parser):
val, pos = self.decode_bits(offset, width)
self.put(pos[0], pos[1], self.out_ann, [ANN_REG,
['%s: %s' % (name, parser(val) if parser else str(val))]])
return val
def decode_bits(self, offset, width):
'''Extract a bit field. Expects LSB input data.'''
bits = self.bits[offset:][:width]
ss, es = bits[-1][1], bits[0][2]
value = bitpack_lsb(bits, 0)
return ( value, ( ss, es, ))
def decode_field(self, name, offset, width, parser = None, checker = None):
'''Interpret a bit field. Emits an annotation.'''
# Get the register field's content and position.
val, ( ss, es, ) = self.decode_bits(offset, width)
# Have the field's content formatted, emit an annotation.
formatted = parser(val) if parser else '{}'.format(val)
if formatted is not None:
text = ['{name}: {val}'.format(name = name, val = formatted)]
text = ['{name}'.format(name = name)]
if text:
self.putg(ss, es, ANN_REG, text)
# Have the field's content checked, emit an optional warning.
warn = checker(val) if checker else None
if warn:
text = ['{}'.format(warn)]
self.putg(ss, es, ANN_WARN, text)
def decode_word(self, ss, es, bits):
'''Interpret a 32bit word after accumulation completes.'''
# SPI transfer content must be exactly one 32bit word.
count = len(self.bits)
if count != 32:
text = [
'Frame error: Bit count: want 32, got {}'.format(count),
'Frame error: Bit count',
'Frame error',
self.putg(ss, es, ANN_WARN, text)
# Holding bits in LSB order during interpretation simplifies
# bit field extraction. And annotation emitting routines expect
# this reverse order of bits' timestamps.
# Determine which register was accessed.
reg_addr, ( reg_ss, reg_es, ) = self.decode_bits(0, 3)
text = [
'Register: {addr}'.format(addr = reg_addr),
'Reg: {addr}'.format(addr = reg_addr),
'[{addr}]'.format(addr = reg_addr),
self.putg(reg_ss, reg_es, ANN_REG, text)
# Interpret the register's content (when parsers are available).
field_descs = regs.get(reg_addr, None)
if not field_descs:
for field_desc in field_descs:
parser = None
checker = None
if len(field_desc) == 3:
start, count, name, = field_desc
elif len(field_desc) == 4:
start, count, name, parser = field_desc
elif len(field_desc) == 5:
start, count, name, parser, checker = field_desc
# Unsupported regs{} syntax, programmer's error.
self.decode_field(name, start, count, parser, checker)
def decode(self, ss, es, data):
ptype, _, _ = data
ptype, data1, data2 = data
if ptype == 'TRANSFER':
# Process accumulated bits after completion of a transfer.
self.decode_word(ss, es, self.bits)
if ptype == 'CS-CHANGE':
if data1 == 1:
if len(self.bits) == 32:
reg_value, reg_pos = self.decode_bits(0, 3)
self.put(reg_pos[0], reg_pos[1], self.out_ann, [ANN_REG,
['Register: %d' % reg_value, 'Reg: %d' % reg_value,
'[%d]' % reg_value]])
if reg_value < len(regs):
field_descs = regs[reg_value]
for field_desc in field_descs:
field = self.decode_field(*field_desc)
self.bits = []
if ptype == 'BITS':
self.bits = data1 + self.bits
_, mosi_bits, miso_bits = data
# Accumulate bits in MSB order as they are seen in SPI frames.
msb_bits = mosi_bits.copy()
@ -20,6 +20,10 @@
import sigrokdecode as srd
from .parts import *
class Ann:
RLB, REEM, RP, LPMP, WP, WARN, DEV, = range(15)
class Decoder(srd.Decoder):
@ -41,14 +45,20 @@ class Decoder(srd.Decoder):
('rfb', 'Read fuse bits'),
('rhfb', 'Read high fuse bits'),
('refb', 'Read extended fuse bits'),
('warnings', 'Warnings'),
('rlb', 'Read lock bits'),
('reem', 'Read EEPROM memory'),
('rp', 'Read program memory'),
('lpmp' , 'Load program memory page'),
('wp', 'Write program memory'),
('warning', 'Warning'),
('dev', 'Device'),
annotation_rows = (
('bits', 'Bits', ()),
('commands', 'Commands', tuple(range(7 + 1))),
('warnings', 'Warnings', (8,)),
('dev', 'Device', (9,)),
('commands', 'Commands', (Ann.PE, Ann.RSB0, Ann.RSB1, Ann.RSB2,
Ann.CE, Ann.RFB, Ann.RHFB, Ann.REFB,
Ann.RLB, Ann.REEM, Ann.RP, Ann.LPMP, Ann.WP,)),
('warnings', 'Warnings', (Ann.WARN,)),
('devs', 'Devices', (Ann.DEV,)),
def __init__(self):
@ -70,17 +80,17 @@ class Decoder(srd.Decoder):
def handle_cmd_programming_enable(self, cmd, ret):
# Programming enable.
# Note: The chip doesn't send any ACK for 'Programming enable'.
self.putx([0, ['Programming enable']])
self.putx([Ann.PE, ['Programming enable']])
# Sanity check on reply.
if ret[1:4] != [0xac, 0x53, cmd[2]]:
self.putx([8, ['Warning: Unexpected bytes in reply!']])
self.putx([Ann.WARN, ['Warning: Unexpected bytes in reply!']])
def handle_cmd_read_signature_byte_0x00(self, cmd, ret):
# Signature byte 0x00: vendor code.
self.vendor_code = ret[3]
v = vendor_code[self.vendor_code]
self.putx([1, ['Vendor code: 0x%02x (%s)' % (ret[3], v)]])
self.putx([Ann.RSB0, ['Vendor code: 0x%02x (%s)' % (ret[3], v)]])
# Store for later.
self.xx = cmd[1] # Same as ret[2].
@ -89,16 +99,16 @@ class Decoder(srd.Decoder):
# Sanity check on reply.
if ret[1] != 0x30 or ret[2] != cmd[1]:
self.putx([8, ['Warning: Unexpected bytes in reply!']])
self.putx([Ann.WARN, ['Warning: Unexpected bytes in reply!']])
# Sanity check for the vendor code.
if self.vendor_code != VENDOR_CODE_ATMEL:
self.putx([8, ['Warning: Vendor code was not 0x1e (Atmel)!']])
self.putx([Ann.WARN, ['Warning: Vendor code was not 0x1e (Atmel)!']])
def handle_cmd_read_signature_byte_0x01(self, cmd, ret):
# Signature byte 0x01: part family and memory size.
self.part_fam_flash_size = ret[3]
self.putx([2, ['Part family / memory size: 0x%02x' % ret[3]]])
self.putx([Ann.RSB1, ['Part family / memory size: 0x%02x' % ret[3]]])
# Store for later.
self.mm = cmd[3]
@ -106,20 +116,20 @@ class Decoder(srd.Decoder):
# Sanity check on reply.
if ret[1] != 0x30 or ret[2] != cmd[1] or ret[0] != self.yy:
self.putx([8, ['Warning: Unexpected bytes in reply!']])
self.putx([Ann.WARN, ['Warning: Unexpected bytes in reply!']])
def handle_cmd_read_signature_byte_0x02(self, cmd, ret):
# Signature byte 0x02: part number.
self.part_number = ret[3]
self.putx([3, ['Part number: 0x%02x' % ret[3]]])
self.putx([Ann.RSB2, ['Part number: 0x%02x' % ret[3]]])
p = part[(self.part_fam_flash_size, self.part_number)]
data = [9, ['Device: Atmel %s' % p]]
data = [Ann.DEV, ['Device: Atmel %s' % p]]
self.put(self.ss_device, self.es_cmd, self.out_ann, data)
# Sanity check on reply.
if ret[1] != 0x30 or ret[2] != self.xx or ret[0] != self.mm:
self.putx([8, ['Warning: Unexpected bytes in reply!']])
self.putx([Ann.WARN, ['Warning: Unexpected bytes in reply!']])
self.xx, self.yy, self.zz, self.mm = 0, 0, 0, 0
@ -127,36 +137,78 @@ class Decoder(srd.Decoder):
# Chip erase (erases both flash an EEPROM).
# Upon successful chip erase, the lock bits will also be erased.
# The only way to end a Chip Erase cycle is to release RESET#.
self.putx([4, ['Chip erase']])
self.putx([Ann.CE, ['Chip erase']])
# TODO: Check/handle RESET#.
# Sanity check on reply.
bit = (ret[2] & (1 << 7)) >> 7
if ret[1] != 0xac or bit != 1 or ret[3] != cmd[2]:
self.putx([8, ['Warning: Unexpected bytes in reply!']])
self.putx([Ann.WARN, ['Warning: Unexpected bytes in reply!']])
def handle_cmd_read_fuse_bits(self, cmd, ret):
# Read fuse bits.
self.putx([5, ['Read fuse bits: 0x%02x' % ret[3]]])
self.putx([Ann.RFB, ['Read fuse bits: 0x%02x' % ret[3]]])
# TODO: Decode fuse bits.
# TODO: Sanity check on reply.
def handle_cmd_read_fuse_high_bits(self, cmd, ret):
# Read fuse high bits.
self.putx([6, ['Read fuse high bits: 0x%02x' % ret[3]]])
self.putx([Ann.RHFB, ['Read fuse high bits: 0x%02x' % ret[3]]])
# TODO: Decode fuse bits.
# TODO: Sanity check on reply.
def handle_cmd_read_extended_fuse_bits(self, cmd, ret):
# Read extended fuse bits.
self.putx([7, ['Read extended fuse bits: 0x%02x' % ret[3]]])
self.putx([Ann.REFB, ['Read extended fuse bits: 0x%02x' % ret[3]]])
# TODO: Decode fuse bits.
# TODO: Sanity check on reply.
def handle_cmd_read_lock_bits(self, cmd, ret):
# Read lock bits
self.putx([Ann.RLB, ['Read lock bits: 0x%02x' % ret[3]]])
def handle_cmd_read_eeprom_memory(self, cmd, ret):
# Read EEPROM Memory
_addr = ((cmd[1] & 1) << 8) + cmd[2]
self.putx([Ann.REEM, ['Read EEPROM Memory: [0x%03x]: 0x%02x' % (_addr, ret[3])]])
def handle_cmd_read_program_memory(self, cmd, ret):
# Read Program Memory
_HL = 'Low'
_H = 'L'
if cmd[0] & 0x08:
_HL = 'High'
_H = 'H'
_addr = ((cmd[1] & 0x0f) << 8) + cmd[2]
self.putx([Ann.RP, [
'Read program memory %s: [0x%03x]: 0x%02x' % (_HL, _addr, ret[3]),
'[%03x%s]:%02x' % (_addr, _H, ret[3]),
'%02x' % ret[3]
def handle_cmd_load_program_memory_page(self, cmd, ret):
# Load Program Memory Page
_HL = 'Low'
_H = 'L'
if cmd[0] & 0x08:
_HL = 'High'
_H = 'H'
_addr = cmd[2] & 0x1F
self.putx([Ann.LPMP, [
'Load program memory page %s: [0x%03x]: 0x%02x' % (_HL, _addr, cmd[3]),
'[%03x%s]=%02x' % (_addr, _H, cmd[3]),
'%02x' % cmd[3]
def handle_cmd_write_program_memory_page(self, cmd, ret):
# Write Program Memory Page
_addr = ((cmd[1] & 0x0F) << 3) + (cmd[2] << 5)
self.putx([Ann.WP, ['Write program memory page: 0x%02x' % _addr]])
def handle_command(self, cmd, ret):
if cmd[:2] == [0xac, 0x53]:
self.handle_cmd_programming_enable(cmd, ret)
@ -174,10 +226,20 @@ class Decoder(srd.Decoder):
self.handle_cmd_read_signature_byte_0x01(cmd, ret)
elif cmd[0] == 0x30 and cmd[2] == 0x02:
self.handle_cmd_read_signature_byte_0x02(cmd, ret)
elif cmd[:2] == [0x58, 0x00]:
elif cmd[0] == 0xa0 and (cmd[1] & (3 << 6)) == (0 << 6):
self.handle_cmd_read_eeprom_memory(cmd, ret)
elif (cmd[0] == 0x20 or cmd[0] == 0x28) and ((cmd[1] & 0xf0) == 0x00):
self.handle_cmd_read_program_memory(cmd, ret)
elif (cmd[0] == 0x40 or cmd[0] == 0x48) and ((cmd[1] & 0xf0) == 0x00):
self.handle_cmd_load_program_memory_page(cmd, ret)
elif (cmd[0] == 0x4C and ((cmd[1] & 0xf0) == 0x00)):
self.handle_cmd_write_program_memory_page(cmd, ret)
c = '%02x %02x %02x %02x' % tuple(cmd)
r = '%02x %02x %02x %02x' % tuple(ret)
self.putx([0, ['Unknown command: %s (reply: %s)!' % (c, r)]])
self.putx([Ann.WARN, ['Unknown command: %s (reply: %s)!' % (c, r)]])
def decode(self, ss, es, data):
ptype, mosi, miso = data
@ -3,7 +3,7 @@
## Copyright (C) 2011-2014 Uwe Hermann <uwe@hermann-uwe.de>
## Copyright (C) 2016 Gerhard Sittig <gerhard.sittig@gmx.net>
## Copyright (C) 2019 DreamSourceLab <support@dreamsourcelab.com>
## Copyright (C) 2023 DreamSourceLab <support@dreamsourcelab.com>
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
@ -71,6 +71,10 @@
# to component names, or even register names and bit fields(?). It's
# quite deep a rabbithole though...
# 2023/12/29 bug fixed: memory erroy when the value of insn_rep_count too large
import sigrokdecode as srd
from collections import namedtuple
@ -86,7 +90,6 @@ class Ann:
) = range(1)
Bit = namedtuple('Bit', 'val ss es')
class PDI:
@ -176,17 +179,12 @@ class Decoder(srd.Decoder):
self.break_es = None
def clear_insn(self):
# Collect instructions and their arguments,
# properties of the current instructions.
self.insn_rep_count = 0
self.insn_opcode = None
self.insn_wr_counts = []
self.insn_rd_counts = []
# Accumulation of data items as bytes pass by.
self.insn_dat_bytes = []
self.insn_dat_count = 0
@ -196,7 +194,6 @@ class Decoder(srd.Decoder):
self.cmd_insn_parts_nice = []
self.cmd_insn_parts_terse = []
self.insn_write_counts = 0
self.insn_read_counts = 0
self.width_addr = 0
@ -667,4 +664,4 @@ class Decoder(srd.Decoder):
def decode(self):
while True:
(clock_pin, data_pin) = self.wait({0: 'e'})
self.handle_clk_edge(clock_pin, data_pin)
self.handle_clk_edge(clock_pin, data_pin)
@ -19,6 +19,10 @@
## along with this program; if not, see <http://www.gnu.org/licenses/>.
## 2023/12/29 add parity check and graycode check
from common.srdhelper import bitpack_msb
import sigrokdecode as srd
@ -28,6 +32,24 @@ class SamplerateError(Exception):
def dlc2len(dlc):
return [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64][dlc]
def ParityCheck(value):
paritycheck = False
parity = [0,3,5,6,9,10,12,15]
for parityvalue in parity:
if parityvalue == value:
paritycheck = True
return paritycheck
def gray_to_binary(gray_code):
binary_code = gray_code[0]
for i in range(1, len(gray_code)):
if gray_code[i] == '0':
binary_code += binary_code[i - 1]
binary_code += '1' if binary_code[i - 1] == '0' else '0'
return binary_code
class Decoder(srd.Decoder):
api_version = 3
id = 'can-fd'
@ -90,12 +112,15 @@ class Decoder(srd.Decoder):
def set_nominal_bitrate(self):
def set_fast_bitrate(self):
def metadata(self, key, value):
if key == srd.SRD_CONF_SAMPLERATE:
self.samplerate = value
self.bit_width = float(self.samplerate) / float(self.options['nominal_bitrate'])
self.sample_point = (self.bit_width / 100.0) * self.options['sample_point']
@ -140,6 +165,11 @@ class Decoder(srd.Decoder):
self.rtr = None
self.crc_data = []
#Parity Calculate
self.stuff_count_start = None
self.stuff_count = 0
# Poor man's clock synchronization. Use signal edges which change to
# dominant state in rather simple ways. This naive approach is neither
# aware of the SYNC phase's width nor the specific location of the edge,
@ -150,7 +180,9 @@ class Decoder(srd.Decoder):
self.dom_edge_bcount = self.curbit
# Determine the position of the next desired bit's sample point.
def get_sample_point(self, bitnum):
samplenum = self.dom_edge_snum
samplenum += self.bit_width * (bitnum - self.dom_edge_bcount)
samplenum += self.sample_point
@ -160,14 +192,20 @@ class Decoder(srd.Decoder):
# CAN uses NRZ encoding and bit stuffing.
# After 5 identical bits, a stuff bit of opposite value is added.
# But not in the CRC delimiter, ACK, and end of frame fields.
if len(self.bits) > self.last_databit + 17:
return False
if self.fd and len(self.bits) > self.last_databit:
if self.fd and len(self.bits) > self.last_databit + 1:
return False
last_6_bits = self.rawbits[-6:]
if last_6_bits not in ([0, 0, 0, 0, 0, 1], [1, 1, 1, 1, 1, 0]):
return False
self.stuff_count = self.stuff_count + 1
# Stuff bit. Keep it in self.rawbits, but drop it from self.bits.
self.bits.pop() # Drop last bit.
return True
@ -184,11 +222,16 @@ class Decoder(srd.Decoder):
# Both standard and extended frames end with CRC, CRC delimiter, ACK,
# ACK delimiter, and EOF fields. Handle them in a common function.
# Returns True if the frame ended (EOF), False otherwise.
def decode_frame_end(self, can_rx, bitnum):
# Remember start of CRC sequence (see below).
if bitnum == (self.last_databit + 1):
self.ss_block = self.samplenum
if self.fd:
if dlc2len(self.dlc) <= 16:
self.crc_len = 27 # 17 + SBC + stuff bits
@ -201,18 +244,35 @@ class Decoder(srd.Decoder):
elif self.fd and bitnum == (self.last_databit + 2):
self.ss_block = self.samplenum
# stuff count start
self.stuff_count_start = self.samplenum
elif self.fd and bitnum == (self.last_databit + 4):
stuff_bits = self.bits[-3:]
stuff_count = '{0:b}'.format(bitpack_msb(stuff_bits))
while len(stuff_count) != 3:
stuff_count = '0' + stuff_count
stuff_count_bin = gray_to_binary(stuff_count)
stuff_count_bin_val = int(stuff_count_bin,2)
if self.stuff_count % 8 != stuff_count_bin_val:
self.putb([16,['Gray Code Error','GCE','GCE']])
self.stuff_count = 0
self.putb([11,['Stuff count: %s' % stuff_count,
'Stuff count: %s' % stuff_count, 'Stuff count']])
self.ss_block = self.samplenum +int(self.bit_width)
elif self.fd and bitnum == (self.last_databit + 5):
stuff_and_parity_bits = self.bits[-4:]
stuff_and_parity = bitpack_msb(stuff_and_parity_bits)
if ParityCheck(stuff_and_parity) == False:
self.putg(self.stuff_count_start,self.samplenum,[16,['Parity Check Error','PCE','PCE']])
self.putx([11,['Parity:%d' % can_rx, 'P:%d' % can_rx, 'P']])
self.ss_block = self.samplenum +int(self.bit_width)
@ -223,7 +283,6 @@ class Decoder(srd.Decoder):
if(index%5 == 0):
self.crc_data += self.bits[-4:]
# CRC sequence (15 bits, 17 bits or 21 bits)
elif bitnum == (self.last_databit + self.crc_len):
if self.fd:
@ -329,14 +388,12 @@ class Decoder(srd.Decoder):
self.putb([10, ['Data length code: %d' % self.dlc,
'DLC: %d' % self.dlc, 'DLC']])
self.last_databit = self.dlc_start + 3 + (dlc2len(self.dlc) * 8)
#if rtr == remote then dlc = 0
if self.dlc != 0 and self.rtr_type == 'remote':
self.putb([16, ['Data length code (DLC) != 0 is not allowed']])
self.dlc = 0
self.last_databit = self.dlc_start + 3 + (dlc2len(self.dlc) * 8)
#if dlc > 8 then dlc = 8
elif self.dlc > 8 and not self.fd:
self.putb([16, ['Data length code (DLC) > 8 is not allowed']])
self.putb([16, ['CAN Data length code (DLC) > 8 is not allowed']])
self.dlc = 8
self.last_databit = self.dlc_start + 3 + (dlc2len(self.dlc) * 8)
@ -369,7 +426,6 @@ class Decoder(srd.Decoder):
# Remember start of EID (see below).
if bitnum == 14:
self.ss_block = self.samplenum
self.dlc_start = 35
@ -481,7 +537,7 @@ class Decoder(srd.Decoder):
or bitnum == 35 and self.frame_type == 'extended':
if self.is_stuff_bit():
self.putx([15, [str(can_rx)]])
self.curbit += 1 # Increase self.curbit (bitnum is not affected).
@ -518,7 +574,7 @@ class Decoder(srd.Decoder):
self.ss_bit12 = self.samplenum
# Bit 13: Identifier extension (IDE) bit
# Standard frame: dominant, extended frame: recessive
# Standard frame: recessive , extended frame: dominant
elif bitnum == 13:
ide = self.frame_type = 'standard' if can_rx == 0 else 'extended'
self.putx([6, ['Identifier extension bit: %s frame' % ide,
@ -548,12 +604,14 @@ class Decoder(srd.Decoder):
# State machine.
if self.state == 'IDLE':
# Wait for a dominant state (logic 0) on the bus.
(can_rx,) = self.wait({0: 'f'})
self.sof = self.samplenum
self.dom_edge_seen(force = True)
self.state = 'GET BITS'
elif self.state == 'GET BITS':
# Wait until we're in the correct bit/sampling position.
pos = self.get_sample_point(self.curbit)
(can_rx,) = self.wait([{'skip': pos - self.samplenum}, {0: 'f'}])
if (self.matched & (0b1 << 1)):
Normal file
Normal file
@ -0,0 +1,36 @@
## This file is part of the libsigrokdecode project.
## Copyright (C) 2012 Uwe Hermann <uwe@hermann-uwe.de>
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## GNU General Public License for more details.
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
JTAG (Joint Test Action Group), a.k.a. "IEEE 1149.1: Standard Test Access Port
and Boundary-Scan Architecture", is a protocol used for testing, debugging,
and flashing various digital ICs.
This decoder handles a tiny part of IEEE 1149.7, the CJTAG OSCAN1 format.
ZBS is currently not supported.
from .pd import Decoder
Normal file
Normal file
@ -0,0 +1,323 @@
## This file is part of the libsigrokdecode project.
## Copyright (C) 2012-2020 Uwe Hermann <uwe@hermann-uwe.de>
## Copyright (C) 2019 Zhiyuan Wan <dv.xw@qq.com>
## Copyright (C) 2019 Kongou Hikari <hikari@iloli.bid>
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## GNU General Public License for more details.
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
import sigrokdecode as srd
from common.srdhelper import SrdStrEnum
[<ptype>, <pdata>]
- 'NEW STATE': <pdata> is the new state of the JTAG state machine.
- 'IR TDI': Bitstring that was clocked into the IR register.
- 'IR TDO': Bitstring that was clocked out of the IR register.
- 'DR TDI': Bitstring that was clocked into the DR register.
- 'DR TDO': Bitstring that was clocked out of the DR register.
All bitstrings are a list consisting of two items. The first is a sequence
of '1' and '0' characters (the right-most character is the LSB. Example:
'01110001', where 1 is the LSB). The second item is a list of ss/es values
for each bit that is in the bitstring.
St = SrdStrEnum.from_str('St', s)
jtag_states = [s.value for s in St]
s = ['CJTAG_' + x for x in s] + ['OSCAN1', 'FOUR_WIRE']
CSt = SrdStrEnum.from_list('CSt', s)
cjtag_states = [s.value for s in CSt]
class Decoder(srd.Decoder):
api_version = 3
id = 'cjtag'
name = 'cJTAG'
longname = 'Compact Joint Test Action Group (IEEE 1149.7)'
desc = 'Protocol for testing, debugging, and flashing ICs.'
license = 'gplv2+'
inputs = ['logic']
outputs = ['jtag']
tags = ['Debug/trace']
channels = (
{'id': 'tckc', 'name': 'TCKC', 'desc': 'Test clock'},
{'id': 'tmsc', 'name': 'TMSC', 'desc': 'Test mode select'},
annotations = \
tuple([tuple([s.lower(), s]) for s in jtag_states]) + \
tuple([tuple([s.lower(), s]) for s in cjtag_states]) + ( \
('bit-tdi', 'Bit (TDI)'),
('bit-tdo', 'Bit (TDO)'),
('bitstring-tdi', 'Bitstring (TDI)'),
('bitstring-tdo', 'Bitstring (TDO)'),
('bit-tms', 'Bit (TMS)'),
annotation_rows = (
('bits-tdi', 'Bits (TDI)', (28,)),
('bits-tdo', 'Bits (TDO)', (29,)),
('bitstrings-tdi', 'Bitstrings (TDI)', (30,)),
('bitstrings-tdo', 'Bitstrings (TDO)', (31,)),
('bits-tms', 'Bits (TMS)', (32,)),
('cjtag-states', 'CJTAG states',
tuple(range(len(jtag_states), len(jtag_states + cjtag_states)))),
('jtag-states', 'JTAG states', tuple(range(len(jtag_states)))),
def __init__(self):
def reset(self):
# self.state = St.TEST_LOGIC_RESET
self.state = St.RUN_TEST_IDLE
self.cjtagstate = CSt.FOUR_WIRE
self.oldcjtagstate = None
self.escape_edges = 0
self.oaclen = 0
self.oldtms = 0
self.oacp = 0
self.oscan1cycle = 0
self.oldstate = None
self.bits_tdi = []
self.bits_tdo = []
self.bits_samplenums_tdi = []
self.bits_samplenums_tdo = []
self.ss_item = self.es_item = None
self.ss_bitstring = self.es_bitstring = None
self.saved_item = None
self.first = True
self.first_bit = True
def start(self):
self.out_python = self.register(srd.OUTPUT_PYTHON)
self.out_ann = self.register(srd.OUTPUT_ANN)
def putx(self, data):
self.put(self.ss_item, self.es_item, self.out_ann, data)
def putp(self, data):
self.put(self.ss_item, self.es_item, self.out_python, data)
def putx_bs(self, data):
self.put(self.ss_bitstring, self.es_bitstring, self.out_ann, data)
def putp_bs(self, data):
self.put(self.ss_bitstring, self.es_bitstring, self.out_python, data)
def advance_state_machine(self, tms):
self.oldstate = self.state
if self.cjtagstate.value.startswith('CJTAG_'):
self.oacp += 1
if self.oacp > 4 and self.oaclen == 12:
self.cjtagstate = CSt.CJTAG_EC
if self.oacp == 8 and tms == 0:
self.oaclen = 36
if self.oacp > 8 and self.oaclen == 36:
self.cjtagstate = CSt.CJTAG_SPARE
if self.oacp > 13 and self.oaclen == 36:
self.cjtagstate = CSt.CJTAG_TPDEL
if self.oacp > 16 and self.oaclen == 36:
self.cjtagstate = CSt.CJTAG_TPREV
if self.oacp > 18 and self.oaclen == 36:
self.cjtagstate = CSt.CJTAG_TPST
if self.oacp > 23 and self.oaclen == 36:
self.cjtagstate = CSt.CJTAG_RDYC
if self.oacp > 25 and self.oaclen == 36:
self.cjtagstate = CSt.CJTAG_DLYC
if self.oacp > 27 and self.oaclen == 36:
self.cjtagstate = CSt.CJTAG_SCNFMT
if self.oacp > 8 and self.oaclen == 12:
self.cjtagstate = CSt.CJTAG_CP
if self.oacp > 32 and self.oaclen == 36:
self.cjtagstate = CSt.CJTAG_CP
if self.oacp > self.oaclen:
self.cjtagstate = CSt.OSCAN1
self.oscan1cycle = 1
# Because Nuclei cJTAG device asserts a reset during cJTAG
# online activating.
self.state = St.TEST_LOGIC_RESET
# Intro "tree"
if self.state == St.TEST_LOGIC_RESET:
self.state = St.TEST_LOGIC_RESET if (tms) else St.RUN_TEST_IDLE
elif self.state == St.RUN_TEST_IDLE:
self.state = St.SELECT_DR_SCAN if (tms) else St.RUN_TEST_IDLE
# DR "tree"
elif self.state == St.SELECT_DR_SCAN:
self.state = St.SELECT_IR_SCAN if (tms) else St.CAPTURE_DR
elif self.state == St.CAPTURE_DR:
self.state = St.EXIT1_DR if (tms) else St.SHIFT_DR
elif self.state == St.SHIFT_DR:
self.state = St.EXIT1_DR if (tms) else St.SHIFT_DR
elif self.state == St.EXIT1_DR:
self.state = St.UPDATE_DR if (tms) else St.PAUSE_DR
elif self.state == St.PAUSE_DR:
self.state = St.EXIT2_DR if (tms) else St.PAUSE_DR
elif self.state == St.EXIT2_DR:
self.state = St.UPDATE_DR if (tms) else St.SHIFT_DR
elif self.state == St.UPDATE_DR:
self.state = St.SELECT_DR_SCAN if (tms) else St.RUN_TEST_IDLE
# IR "tree"
elif self.state == St.SELECT_IR_SCAN:
self.state = St.TEST_LOGIC_RESET if (tms) else St.CAPTURE_IR
elif self.state == St.CAPTURE_IR:
self.state = St.EXIT1_IR if (tms) else St.SHIFT_IR
elif self.state == St.SHIFT_IR:
self.state = St.EXIT1_IR if (tms) else St.SHIFT_IR
elif self.state == St.EXIT1_IR:
self.state = St.UPDATE_IR if (tms) else St.PAUSE_IR
elif self.state == St.PAUSE_IR:
self.state = St.EXIT2_IR if (tms) else St.PAUSE_IR
elif self.state == St.EXIT2_IR:
self.state = St.UPDATE_IR if (tms) else St.SHIFT_IR
elif self.state == St.UPDATE_IR:
self.state = St.SELECT_DR_SCAN if (tms) else St.RUN_TEST_IDLE
def handle_rising_tckc_edge(self, tdi, tdo, tck, tms):
# Rising TCK edges always advance the state machine.
if self.first:
# Save the start sample and item for later (no output yet).
self.ss_item = self.samplenum
self.first = False
# Output the saved item (from the last CLK edge to the current).
self.es_item = self.samplenum
# Output the old state (from last rising TCK edge to current one).
self.putx([jtag_states.index(self.oldstate.value), [self.oldstate.value]])
self.putp(['NEW STATE', self.state.value])
self.putx([len(jtag_states) + cjtag_states.index(self.oldcjtagstate.value),
if (self.oldcjtagstate.value.startswith('CJTAG_')):
self.putx([32, [str(self.oldtms)]])
self.oldtms = tms
# Upon SHIFT-*/EXIT1-* collect the current TDI/TDO values.
if self.oldstate.value.startswith('SHIFT-') or \
if self.first_bit:
self.ss_bitstring = self.samplenum
self.first_bit = False
self.putx([28, [str(self.bits_tdi[0])]])
self.putx([29, [str(self.bits_tdo[0])]])
# Use self.samplenum as ES of the previous bit.
self.bits_samplenums_tdi[0][1] = self.samplenum
self.bits_samplenums_tdo[0][1] = self.samplenum
self.bits_tdi.insert(0, tdi)
self.bits_tdo.insert(0, tdo)
# Use self.samplenum as SS of the current bit.
self.bits_samplenums_tdi.insert(0, [self.samplenum, -1])
self.bits_samplenums_tdo.insert(0, [self.samplenum, -1])
# Output all TDI/TDO bits if we just switched to UPDATE-*.
if self.state.value.startswith('UPDATE-'):
self.es_bitstring = self.samplenum
t = self.state.value[-2:] + ' TDI'
b = ''.join(map(str, self.bits_tdi[1:]))
h = ' (0x%x' % int('0b0' + b, 2) + ')'
s = t + ': ' + b + h + ', ' + str(len(self.bits_tdi[1:])) + ' bits'
self.putx_bs([30, [s]])
self.putp_bs([t, [b, self.bits_samplenums_tdi[1:]]])
self.bits_tdi = []
self.bits_samplenums_tdi = []
t = self.state.value[-2:] + ' TDO'
b = ''.join(map(str, self.bits_tdo[1:]))
h = ' (0x%x' % int('0b0' + b, 2) + ')'
s = t + ': ' + b + h + ', ' + str(len(self.bits_tdo[1:])) + ' bits'
self.putx_bs([31, [s]])
self.putp_bs([t, [b, self.bits_samplenums_tdo[1:]]])
self.bits_tdo = []
self.bits_samplenums_tdo = []
self.first_bit = True
self.ss_bitstring = self.samplenum
self.ss_item = self.samplenum
def handle_tmsc_edge(self):
self.escape_edges += 1
def handle_tapc_state(self):
self.oldcjtagstate = self.cjtagstate
if self.escape_edges >= 8:
self.cjtagstate = CSt.FOUR_WIRE
if self.escape_edges == 6:
self.cjtagstate = CSt.CJTAG_OAC
self.oacp = 0
self.oaclen = 12
self.escape_edges = 0
def decode(self):
tdi = tms = tdo = 0
while True:
# Wait for a rising edge on TCKC.
tckc, tmsc = self.wait({0: 'r'})
if self.cjtagstate == CSt.OSCAN1:
if self.oscan1cycle == 0: # nTDI
tdi = 1 if (tmsc == 0) else 0
self.oscan1cycle = 1
elif self.oscan1cycle == 1: # TMS
tms = tmsc
self.oscan1cycle = 2
elif self.oscan1cycle == 2: # TDO
tdo = tmsc
self.handle_rising_tckc_edge(tdi, tdo, tckc, tms)
self.oscan1cycle = 0
self.handle_rising_tckc_edge(None, None, tckc, tmsc)
while (tckc == 1):
tckc, tmsc_n = self.wait([{0: 'f'}, {1: 'e'}])
if tmsc_n != tmsc:
tmsc = tmsc_n
@ -2,7 +2,8 @@
## This file is part of the libsigrokdecode project.
## Copyright (C) 2016 Fabian J. Stumpf <sigrok@fabianstumpf.de>
## Copyright (C) 2023 DreamSourceLab <support@dreamsourcelab.com>
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
@ -17,6 +18,10 @@
## along with this program; if not, see <http://www.gnu.org/licenses/>.
# 2023/12/29 bug fixed : bitvalue show in break
import sigrokdecode as srd
class Decoder(srd.Decoder):
@ -64,6 +69,7 @@ class Decoder(srd.Decoder):
self.sample_usec = None
self.run_start = -1
self.state = 'FIND BREAK'
self.bit_pos = [[0, 0, 0] for _ in range(11)]
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
@ -120,32 +126,44 @@ class Decoder(srd.Decoder):
(dmx,) = self.wait({'skip': round(self.skip_per_bit/2)})
bit_value = not dmx if inv else dmx
self.bit_pos[self.bit] = [bit_start,bit_end,bit_value]
if self.bit == 0:
self.byte = 0
self.put(bit_start, bit_end,
self.out_ann, [3, ['Start bit']])
if bit_value != 0:
# self.put(bit_start, bit_end,self.out_ann, [3, ['Start bit']])
# if bit_value != 0:
# (Possibly) invalid start bit, mark but don't fail.
self.put(bit_start, bit_end,
self.out_ann, [10, ['Invalid start bit']])
# self.put(bit_start, bit_end,self.out_ann, [10, ['Invalid start bit']])
elif self.bit >= 9:
self.put(bit_start, bit_end,
self.out_ann, [4, ['Stop bit']])
# self.put(bit_start, bit_end,self.out_ann, [4, ['Stop bit']])
if bit_value != 1:
# Invalid stop bit, mark.
self.put(bit_start, bit_end,
self.out_ann, [10, ['Invalid stop bit']])
# self.put(bit_start, bit_end,self.out_ann, [10, ['Invalid stop bit']])
if self.bit == 10:
# On invalid 2nd stop bit, search for new break.
self.state = 'FIND BREAK'
# Label and process one bit.
self.put(bit_start, bit_end,
self.out_ann, [0, [str(bit_value)]])
# self.put(bit_start, bit_end,self.out_ann, [0, [str(bit_value)]])
self.byte |= bit_value << (self.bit - 1)
# Label a complete byte.
if self.state == 'READ BYTE' and self.bit == 10:
for index, value in enumerate(self.bit_pos):
if index == 0:
if value[2] == 0:
self.put(value[0], value[1],self.out_ann, [3, ['Start bit']])
self.put(value[0], value[1],self.out_ann, [10, ['Invalid start bit']])
elif index >= 9:
if value[2] == 1:
self.put(value[0], value[1],self.out_ann, [4, ['Stop bit']])
self.put(value[0], value[1],self.out_ann, [10, ['Invalid stop bit']])
self.put(value[0], value[1],self.out_ann, [0, [str(value[2])]])
if self.channel == 0:
d = [5, ['Start code']]
@ -1,7 +1,7 @@
## This file is part of the libsigrokdecode project.
## Copyright (C) 2012-2014 Uwe Hermann <uwe@hermann-uwe.de>
## Copyright (C) 2012-2020 Uwe Hermann <uwe@hermann-uwe.de>
## Copyright (C) 2013 Matt Ranostay <mranostay@gmail.com>
## This program is free software; you can redistribute it and/or modify
@ -20,7 +20,7 @@
import re
import sigrokdecode as srd
from common.srdhelper import bcd2int
from common.srdhelper import bcd2int, SrdIntEnum
days_of_week = (
'Sunday', 'Monday', 'Tuesday', 'Wednesday',
@ -47,10 +47,15 @@ rates = {
DS1307_I2C_ADDRESS = 0x68
def regs_and_bits():
l = [('reg-' + r.lower(), r + ' register') for r in regs]
l += [('bit-' + re.sub('\/| ', '-', b).lower(), b + ' bit') for b in bits]
l = [('reg_' + r.lower(), r + ' register') for r in regs]
l += [('bit_' + re.sub('\/| ', '_', b).lower(), b + ' bit') for b in bits]
return tuple(l)
a = ['REG_' + r.upper() for r in regs] + \
['BIT_' + re.sub('\/| ', '_', b).upper() for b in bits] + \
Ann = SrdIntEnum.from_list('Ann', a)
class Decoder(srd.Decoder):
api_version = 3
id = 'ds1307'
@ -62,17 +67,17 @@ class Decoder(srd.Decoder):
outputs = []
tags = ['Clock/timing', 'IC']
annotations = regs_and_bits() + (
('read-datetime', 'Read date/time'),
('write-datetime', 'Write date/time'),
('reg-read', 'Register read'),
('reg-write', 'Register write'),
('warnings', 'Warnings'),
('read_date_time', 'Read date/time'),
('write_date_time', 'Write date/time'),
('read_reg', 'Register read'),
('write_reg', 'Register write'),
('warning', 'Warning'),
annotation_rows = (
('bits', 'Bits', tuple(range(9, 24))),
('regs', 'Registers', tuple(range(9))),
('date-time', 'Date/time', (24, 25, 26, 27)),
('warnings', 'Warnings', (28,)),
('bits', 'Bits', Ann.prefixes('BIT_')),
('regs', 'Registers', Ann.prefixes('REG_')),
('date_time', 'Date/time', Ann.prefixes('READ_ WRITE_')),
('warnings', 'Warnings', (Ann.WARNING,)),
def __init__(self):
@ -100,84 +105,85 @@ class Decoder(srd.Decoder):
def putr(self, bit):
self.put(self.bits[bit][1], self.bits[bit][2], self.out_ann,
[11, ['Reserved bit', 'Reserved', 'Rsvd', 'R']])
[Ann.BIT_RESERVED, ['Reserved bit', 'Reserved', 'Rsvd', 'R']])
def handle_reg_0x00(self, b): # Seconds (0-59) / Clock halt bit
self.putd(7, 0, [0, ['Seconds', 'Sec', 'S']])
self.putd(7, 0, [Ann.REG_SECONDS, ['Seconds', 'Sec', 'S']])
ch = 1 if (b & (1 << 7)) else 0
self.putd(7, 7, [9, ['Clock halt: %d' % ch, 'Clk hlt: %d' % ch,
'CH: %d' % ch, 'CH']])
self.putd(7, 7, [Ann.BIT_CLOCK_HALT, ['Clock halt: %d' % ch,
'Clk hlt: %d' % ch, 'CH: %d' % ch, 'CH']])
s = self.seconds = bcd2int(b & 0x7f)
self.putd(6, 0, [10, ['Second: %d' % s, 'Sec: %d' % s, 'S: %d' % s, 'S']])
self.putd(6, 0, [Ann.BIT_SECONDS, ['Second: %d' % s, 'Sec: %d' % s,
'S: %d' % s, 'S']])
def handle_reg_0x01(self, b): # Minutes (0-59)
self.putd(7, 0, [1, ['Minutes', 'Min', 'M']])
self.putd(7, 0, [Ann.REG_MINUTES, ['Minutes', 'Min', 'M']])
m = self.minutes = bcd2int(b & 0x7f)
self.putd(6, 0, [12, ['Minute: %d' % m, 'Min: %d' % m, 'M: %d' % m, 'M']])
self.putd(6, 0, [Ann.BIT_MINUTES, ['Minute: %d' % m, 'Min: %d' % m, 'M: %d' % m, 'M']])
def handle_reg_0x02(self, b): # Hours (1-12+AM/PM or 0-23)
self.putd(7, 0, [2, ['Hours', 'H']])
self.putd(7, 0, [Ann.REG_HOURS, ['Hours', 'H']])
ampm_mode = True if (b & (1 << 6)) else False
if ampm_mode:
self.putd(6, 6, [13, ['12-hour mode', '12h mode', '12h']])
self.putd(6, 6, [Ann.BIT_12_24_HOURS, ['12-hour mode', '12h mode', '12h']])
a = 'PM' if (b & (1 << 5)) else 'AM'
self.putd(5, 5, [14, [a, a[0]]])
self.putd(5, 5, [Ann.BIT_AM_PM, [a, a[0]]])
h = self.hours = bcd2int(b & 0x1f)
self.putd(4, 0, [15, ['Hour: %d' % h, 'H: %d' % h, 'H']])
self.putd(4, 0, [Ann.BIT_HOURS, ['Hour: %d' % h, 'H: %d' % h, 'H']])
self.putd(6, 6, [13, ['24-hour mode', '24h mode', '24h']])
self.putd(6, 6, [Ann.BIT_12_24_HOURS, ['24-hour mode', '24h mode', '24h']])
h = self.hours = bcd2int(b & 0x3f)
self.putd(5, 0, [15, ['Hour: %d' % h, 'H: %d' % h, 'H']])
self.putd(5, 0, [Ann.BIT_HOURS, ['Hour: %d' % h, 'H: %d' % h, 'H']])
def handle_reg_0x03(self, b): # Day / day of week (1-7)
self.putd(7, 0, [3, ['Day of week', 'Day', 'D']])
self.putd(7, 0, [Ann.REG_DAY, ['Day of week', 'Day', 'D']])
for i in (7, 6, 5, 4, 3):
w = self.days = bcd2int(b & 0x07)
ws = days_of_week[self.days - 1]
self.putd(2, 0, [16, ['Weekday: %s' % ws, 'WD: %s' % ws, 'WD', 'W']])
self.putd(2, 0, [Ann.BIT_DAY, ['Weekday: %s' % ws, 'WD: %s' % ws, 'WD', 'W']])
def handle_reg_0x04(self, b): # Date (1-31)
self.putd(7, 0, [4, ['Date', 'D']])
self.putd(7, 0, [Ann.REG_DATE, ['Date', 'D']])
for i in (7, 6):
d = self.date = bcd2int(b & 0x3f)
self.putd(5, 0, [17, ['Date: %d' % d, 'D: %d' % d, 'D']])
self.putd(5, 0, [Ann.BIT_DATE, ['Date: %d' % d, 'D: %d' % d, 'D']])
def handle_reg_0x05(self, b): # Month (1-12)
self.putd(7, 0, [5, ['Month', 'Mon', 'M']])
self.putd(7, 0, [Ann.REG_MONTH, ['Month', 'Mon', 'M']])
for i in (7, 6, 5):
m = self.months = bcd2int(b & 0x1f)
self.putd(4, 0, [18, ['Month: %d' % m, 'Mon: %d' % m, 'M: %d' % m, 'M']])
self.putd(4, 0, [Ann.BIT_MONTH, ['Month: %d' % m, 'Mon: %d' % m, 'M: %d' % m, 'M']])
def handle_reg_0x06(self, b): # Year (0-99)
self.putd(7, 0, [6, ['Year', 'Y']])
self.putd(7, 0, [Ann.REG_YEAR, ['Year', 'Y']])
y = self.years = bcd2int(b & 0xff)
self.years += 2000
self.putd(7, 0, [19, ['Year: %d' % y, 'Y: %d' % y, 'Y']])
self.putd(7, 0, [Ann.BIT_YEAR, ['Year: %d' % y, 'Y: %d' % y, 'Y']])
def handle_reg_0x07(self, b): # Control Register
self.putd(7, 0, [7, ['Control', 'Ctrl', 'C']])
self.putd(7, 0, [Ann.REG_CONTROL, ['Control', 'Ctrl', 'C']])
for i in (6, 5, 3, 2):
o = 1 if (b & (1 << 7)) else 0
s = 1 if (b & (1 << 4)) else 0
s2 = 'en' if (b & (1 << 4)) else 'dis'
r = rates[b & 0x03]
self.putd(7, 7, [20, ['Output control: %d' % o,
self.putd(7, 7, [Ann.BIT_OUT, ['Output control: %d' % o,
'OUT: %d' % o, 'O: %d' % o, 'O']])
self.putd(4, 4, [21, ['Square wave output: %sabled' % s2,
self.putd(4, 4, [Ann.BIT_SQWE, ['Square wave output: %sabled' % s2,
'SQWE: %sabled' % s2, 'SQWE: %d' % s, 'S: %d' % s, 'S']])
self.putd(1, 0, [22, ['Square wave output rate: %s' % r,
self.putd(1, 0, [Ann.BIT_RS, ['Square wave output rate: %s' % r,
'Square wave rate: %s' % r, 'SQW rate: %s' % r, 'Rate: %s' % r,
'RS: %s' % s, 'RS', 'R']])
def handle_reg_0x3f(self, b): # RAM (bytes 0x08-0x3f)
self.putd(7, 0, [8, ['RAM', 'R']])
self.putd(7, 0, [23, ['SRAM: 0x%02X' % b, '0x%02X' % b]])
self.putd(7, 0, [Ann.REG_RAM, ['RAM', 'R']])
self.putd(7, 0, [Ann.BIT_RAM, ['SRAM: 0x%02X' % b, '0x%02X' % b]])
def output_datetime(self, cls, rw):
# TODO: Handle read/write of only parts of these items.
@ -201,7 +207,7 @@ class Decoder(srd.Decoder):
if addr == DS1307_I2C_ADDRESS:
return True
self.put(self.ss_block, self.es, self.out_ann,
[28, ['Ignoring non-DS1307 data (slave 0x%02X)' % addr]])
[Ann.WARNING, ['Ignoring non-DS1307 data (slave 0x%02X)' % addr]])
return False
def decode(self, ss, es, data):
@ -246,7 +252,7 @@ class Decoder(srd.Decoder):
if cmd == 'DATA WRITE':
elif cmd == 'STOP':
self.output_datetime(25, 'Written')
self.output_datetime(Ann.WRITE_DATE_TIME, 'Written')
self.state = 'IDLE'
elif self.state == 'READ RTC REGS':
# Wait for an address read operation.
@ -260,5 +266,5 @@ class Decoder(srd.Decoder):
if cmd == 'DATA READ':
elif cmd == 'STOP':
self.output_datetime(24, 'Read')
self.output_datetime(Ann.READ_DATE_TIME, 'Read')
self.state = 'IDLE'
@ -32,6 +32,8 @@ class Decoder(srd.Decoder):
options = (
{'id': 'addresssize', 'desc': 'Address size', 'default': 8, 'idn':'dec_eeprom93xx_opt_addresssize'},
{'id': 'wordsize', 'desc': 'Word size', 'default': 16, 'idn':'dec_eeprom93xx_opt_wordsize'},
{'id': 'format', 'desc': 'Data format', 'default': 'hex',
'values': ('ascii', 'hex'), 'idn':'dec_eeprom93xx_opt_format'},
annotations = (
('si-data', 'SI data'),
@ -42,6 +44,10 @@ class Decoder(srd.Decoder):
('data', 'Data', (0, 1)),
('warnings', 'Warnings', (2,)),
binary = (
('address', 'Address'),
('data', 'Data'),
def __init__(self):
@ -51,6 +57,7 @@ class Decoder(srd.Decoder):
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
self.out_binary = self.register(srd.OUTPUT_BINARY)
self.addresssize = self.options['addresssize']
self.wordsize = self.options['wordsize']
@ -60,7 +67,8 @@ class Decoder(srd.Decoder):
for b in range(len(data)):
a += (data[b].si << (len(data) - b - 1))
self.put(data[0].ss, data[-1].es, self.out_ann,
[0, ['Address: 0x%x' % a, 'Addr: 0x%x' % a, '0x%x' % a]])
[0, ['Address: 0x%04x' % a, 'Addr: 0x%04x' % a, '0x%04x' % a]])
self.put(data[0].ss, data[-1].es, self.out_binary, [0, bytes([a])])
def put_word(self, si, data):
# Decode word (MSb first).
@ -69,8 +77,22 @@ class Decoder(srd.Decoder):
d = data[b].si if si else data[b].so
word += (d << (len(data) - b - 1))
idx = 0 if si else 1
self.put(data[0].ss, data[-1].es,
self.out_ann, [idx, ['Data: 0x%x' % word, '0x%x' % word]])
if self.options['format'] == 'ascii':
word_str = ''
for s in range(0, len(data), 8):
c = 0xff & (word >> s)
if c in range(32, 126 + 1):
word_str = chr(c) + word_str
word_str = '[{:02X}]'.format(c) + word_str
self.put(data[0].ss, data[-1].es,
self.out_ann, [idx, ['Data: %s' % word_str, '%s' % word_str]])
self.put(data[0].ss, data[-1].es,
self.out_ann, [idx, ['Data: 0x%04x' % word, '0x%04x' % word]])
self.put(data[0].ss, data[-1].es, self.out_binary,
[1, bytes([(word & 0xff00) >> 8, word & 0xff])])
def decode(self, ss, es, data):
if len(data) < (2 + self.addresssize):
@ -2,6 +2,7 @@
## This file is part of the libsigrokdecode project.
## Copyright (C) 2019 Rene Staffen
## Copyright (C) 2020-2021 Gerhard Sittig <gerhard.sittig@gmx.net>
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
@ -61,12 +62,24 @@ class IrmpLibrary:
Lookup the C library's API routines. Declare their prototypes.
if not self._lib:
return False
self._lib.irmp_get_sample_rate.restype = ctypes.c_uint32
self._lib.irmp_get_sample_rate.argtypes = []
self._lib.irmp_instance_alloc.restype = ctypes.c_void_p
self._lib.irmp_instance_alloc.argtypes = []
self._lib.irmp_instance_free.restype = None
self._lib.irmp_instance_free.argtypes = [ ctypes.c_void_p, ]
self._lib.irmp_instance_id.restype = ctypes.c_size_t
self._lib.irmp_instance_id.argtypes = [ ctypes.c_void_p, ]
self._lib.irmp_instance_lock.restype = ctypes.c_int
self._lib.irmp_instance_lock.argtypes = [ ctypes.c_void_p, ctypes.c_int, ]
self._lib.irmp_instance_unlock.restype = None
self._lib.irmp_instance_unlock.argtypes = [ ctypes.c_void_p, ]
self._lib.irmp_reset_state.restype = None
self._lib.irmp_reset_state.argtypes = []
@ -85,6 +98,7 @@ class IrmpLibrary:
# Create a result buffer that's local to the library instance.
self._data = self.ResultData()
self._inst = None
return True
@ -93,30 +107,47 @@ class IrmpLibrary:
Create a library instance.
# Only create a working instance for the first invocation.
# Degrade all other instances, make them fail "late" during
# execution, so that users will see the errors.
self._lib = None
self._data = None
if IrmpLibrary.__usable_instance is None:
filename = self._library_filename()
self._lib = ctypes.cdll.LoadLibrary(filename)
IrmpLibrary.__usable_instance = self
filename = self._library_filename()
self._lib = ctypes.cdll.LoadLibrary(filename)
def __del__(self):
Release a disposed library instance.
if self._inst:
self._inst = None
def __enter__(self):
Enter a context (lock management).
if self._inst is None:
self._inst = self._lib.irmp_instance_alloc()
self._lib.irmp_instance_lock(self._inst, 1)
return self
def __exit__(self, extype, exvalue, trace):
Leave a context (lock management).
return False
def client_id(self):
return self._lib.irmp_instance_id(self._inst)
def get_sample_rate(self):
if not self._lib:
return None
return self._lib.irmp_get_sample_rate()
def reset_state(self):
if not self._lib:
return None
def add_one_sample(self, level):
if not self._lib:
raise Exception("IRMP library limited to a single instance.")
if not self._lib.irmp_add_one_sample(int(level)):
return False
@ -3,6 +3,7 @@
## Copyright (C) 2014 Gump Yang <gump.yang@gmail.com>
## Copyright (C) 2019 Rene Staffen
## Copyright (C) 2020-2021 Gerhard Sittig <gerhard.sittig@gmx.net>
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
@ -97,7 +98,7 @@ class Decoder(srd.Decoder):
def reset(self):
self.want_reset = True
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
@ -113,25 +114,26 @@ class Decoder(srd.Decoder):
except Exception as e:
txt = e.args[0]
raise LibraryError(txt)
if self.irmp:
self.lib_rate = self.irmp.get_sample_rate()
if not self.irmp or not self.lib_rate:
raise LibraryError('Cannot access IRMP library. One instance limit exceeded?')
if not self.irmp:
raise LibraryError('Cannot access IRMP library.')
if not self.samplerate:
raise SamplerateError('Cannot decode without samplerate.')
if self.samplerate % self.lib_rate:
raise SamplerateError('Capture samplerate must be multiple of library samplerate ({})'.format(self.lib_rate))
self.rate_factor = int(self.samplerate / self.lib_rate)
if self.want_reset:
self.want_reset = False
lib_rate = self.irmp.get_sample_rate()
if not lib_rate:
raise LibraryError('Cannot determine IRMP library\'s samplerate.')
if self.samplerate % lib_rate:
raise SamplerateError('Capture samplerate must be multiple of library samplerate ({})'.format(lib_rate))
self.rate_factor = int(self.samplerate / lib_rate)
active = 0 if self.options['polarity'] == 'active-low' else 1
self.active = 0 if self.options['polarity'] == 'active-low' else 1
ir, = self.wait()
while True:
if self.active == 1:
ir = 1 - ir
if self.irmp.add_one_sample(ir):
data = self.irmp.get_result_data()
ir, = self.wait([{'skip': self.rate_factor}])
with self.irmp:
while True:
if active == 1:
ir = 1 - ir
if self.irmp.add_one_sample(ir):
data = self.irmp.get_result_data()
ir, = self.wait([{'skip': self.rate_factor}])
@ -18,12 +18,34 @@
## along with this program; if not, see <http://www.gnu.org/licenses/>.
import sigrokdecode as srd
from common.srdhelper import bitpack
from .lists import *
import sigrokdecode as srd
# Concentrate all timing constraints of the IR protocol here in a single
# location at the top of the source, to raise awareness and to simplify
# review and adjustment. The tolerance is an arbitrary choice, available
# literature does not mention any. The inter-frame timeout is not a part
# of the protocol, but an implementation detail of this sigrok decoder.
_TIME_TOL = 10 # tolerance, in percent
_TIME_IDLE = 20.0 # inter-frame timeout, in ms
_TIME_LC = 13.5 # leader code, in ms
_TIME_RC = 11.25 # repeat code, in ms
_TIME_ONE = 2.25 # one data bit, in ms
_TIME_ZERO = 1.125 # zero data bit, in ms
_TIME_STOP = 0.562 # stop bit, in ms
class SamplerateError(Exception):
class Pin:
IR, = range(1)
class Ann:
REMOTE, WARN = range(13)
class Decoder(srd.Decoder):
api_version = 3
id = 'ir_nec'
@ -39,8 +61,10 @@ class Decoder(srd.Decoder):
options = (
{'id': 'polarity', 'desc': 'Polarity', 'default': 'active-low',
'values': ('active-low', 'active-high'), 'idn':'dec_ir_nec_opt_polarity'},
'values': ('auto', 'active-low', 'active-high'), 'idn':'dec_ir_nec_opt_polarity'},
{'id': 'cd_freq', 'desc': 'Carrier Frequency', 'default': 0, 'idn':'dec_ir_nec_opt_cd_freq'},
{'id': 'extended', 'desc': 'Extended NEC Protocol',
'default': 'no', 'values': ('yes', 'no'), 'idn':'dec_ir_nec_opt_extended'},
annotations = (
('bit', 'Bit'),
@ -55,13 +79,13 @@ class Decoder(srd.Decoder):
('cmd-inv', 'Command#'),
('repeat-code', 'Repeat code'),
('remote', 'Remote'),
('warnings', 'Warnings'),
('warning', 'Warning'),
annotation_rows = (
('bits', 'Bits', (0, 1, 2, 3, 4)),
('fields', 'Fields', (5, 6, 7, 8, 9, 10)),
('remote', 'Remote', (11,)),
('warnings', 'Warnings', (12,)),
('bits', 'Bits', (Ann.BIT, Ann.AGC, Ann.LONG_PAUSE, Ann.SHORT_PAUSE, Ann.STOP_BIT)),
('fields', 'Fields', (Ann.LEADER_CODE, Ann.ADDR, Ann.ADDR_INV, Ann.CMD, Ann.CMD_INV, Ann.REPEAT_CODE)),
('remote-vals', 'Remote', (Ann.REMOTE,)),
('warnings', 'Warnings', (Ann.WARN,)),
def putx(self, data):
@ -70,36 +94,44 @@ class Decoder(srd.Decoder):
def putb(self, data):
self.put(self.ss_bit, self.samplenum, self.out_ann, data)
def putd(self, data):
def putd(self, data, bit_count):
name = self.state.title()
d = {'ADDRESS': 6, 'ADDRESS#': 7, 'COMMAND': 8, 'COMMAND#': 9}
s = {'ADDRESS': ['ADDR', 'A'], 'ADDRESS#': ['ADDR#', 'A#'],
'COMMAND': ['CMD', 'C'], 'COMMAND#': ['CMD#', 'C#']}
self.putx([d[self.state], ['%s: 0x%02X' % (name, data),
'%s: 0x%02X' % (s[self.state][0], data),
'%s: 0x%02X' % (s[self.state][1], data), s[self.state][1]]])
fmt = '{{}}: 0x{{:0{}X}}'.format(bit_count // 4)
self.putx([d[self.state], [
fmt.format(name, data),
fmt.format(s[self.state][0], data),
fmt.format(s[self.state][1], data),
def putstop(self, ss):
self.put(ss, ss + self.stop, self.out_ann,
[4, ['Stop bit', 'Stop', 'St', 'S']])
[Ann.STOP_BIT, ['Stop bit', 'Stop', 'St', 'S']])
def putpause(self, p):
self.put(self.ss_start, self.ss_other_edge, self.out_ann,
[1, ['AGC pulse', 'AGC', 'A']])
idx = 2 if p == 'Long' else 3
self.put(self.ss_other_edge, self.samplenum, self.out_ann,
[idx, [p + ' pause', '%s-pause' % p[0], '%sP' % p[0], 'P']])
[Ann.AGC, ['AGC pulse', 'AGC', 'A']])
idx = Ann.LONG_PAUSE if p == 'Long' else Ann.SHORT_PAUSE
self.put(self.ss_other_edge, self.samplenum, self.out_ann, [idx, [
'{} pause'.format(p),
def putremote(self):
dev = address.get(self.addr, 'Unknown device')
buttons = command.get(self.addr, None)
if buttons is None:
btn = ['Unknown', 'Unk']
btn = buttons.get(self.cmd, ['Unknown', 'Unk'])
self.put(self.ss_remote, self.ss_bit + self.stop, self.out_ann,
[11, ['%s: %s' % (dev, btn[0]), '%s: %s' % (dev, btn[1]),
'%s' % btn[1]]])
buttons = command.get(self.addr, {})
btn = buttons.get(self.cmd, ['Unknown', 'Unk'])
self.put(self.ss_remote, self.ss_bit + self.stop, self.out_ann, [Ann.REMOTE, [
'{}: {}'.format(dev, btn[0]),
'{}: {}'.format(dev, btn[1]),
def __init__(self):
@ -107,22 +139,24 @@ class Decoder(srd.Decoder):
def reset(self):
self.state = 'IDLE'
self.ss_bit = self.ss_start = self.ss_other_edge = self.ss_remote = 0
self.data = self.count = self.active = None
self.data = []
self.addr = self.cmd = None
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
self.active = 0 if self.options['polarity'] == 'active-low' else 1
def metadata(self, key, value):
if key == srd.SRD_CONF_SAMPLERATE:
self.samplerate = value
self.tolerance = 0.05 # +/-5%
self.lc = int(self.samplerate * 0.0135) - 1 # 13.5ms
self.rc = int(self.samplerate * 0.01125) - 1 # 11.25ms
self.dazero = int(self.samplerate * 0.001125) - 1 # 1.125ms
self.daone = int(self.samplerate * 0.00225) - 1 # 2.25ms
self.stop = int(self.samplerate * 0.000652) - 1 # 0.652ms
def calc_rate(self):
self.tolerance = _TIME_TOL / 100
self.lc = int(self.samplerate * _TIME_LC / 1000) - 1
self.rc = int(self.samplerate * _TIME_RC / 1000) - 1
self.dazero = int(self.samplerate * _TIME_ZERO / 1000) - 1
self.daone = int(self.samplerate * _TIME_ONE / 1000) - 1
self.stop = int(self.samplerate * _TIME_STOP / 1000) - 1
self.idle_to = int(self.samplerate * _TIME_IDLE / 1000) - 1
def compare_with_tolerance(self, measured, base):
return (measured >= base * (1 - self.tolerance)
@ -135,37 +169,57 @@ class Decoder(srd.Decoder):
elif self.compare_with_tolerance(tick, self.daone):
ret = 1
if ret in (0, 1):
self.putb([0, ['%d' % ret]])
self.data |= (ret << self.count) # LSB-first
self.count = self.count + 1
self.putb([Ann.BIT, ['{:d}'.format(ret)]])
self.ss_bit = self.samplenum
def data_ok(self):
ret, name = (self.data >> 8) & (self.data & 0xff), self.state.title()
if self.count == 8:
def data_ok(self, check, want_len):
name = self.state.title()
normal, inverted = bitpack(self.data[:8]), bitpack(self.data[8:])
valid = (normal ^ inverted) == 0xff
show = inverted if self.state.endswith('#') else normal
is_ext_addr = self.is_extended and self.state == 'ADDRESS'
if is_ext_addr:
normal = bitpack(self.data)
show = normal
valid = True
if len(self.data) == want_len:
if self.state == 'ADDRESS':
self.addr = self.data
self.addr = normal
if self.state == 'COMMAND':
self.cmd = self.data
self.cmd = normal
self.putd(show, want_len)
self.ss_start = self.samplenum
if is_ext_addr:
self.data = []
self.ss_bit = self.ss_start = self.samplenum
return True
if ret == 0:
self.putd(self.data >> 8)
self.putx([12, ['%s error: 0x%04X' % (name, self.data)]])
self.data = self.count = 0
self.putd(show, want_len)
if check and not valid:
warn_show = bitpack(self.data)
self.putx([Ann.WARN, ['{} error: 0x{:04X}'.format(name, warn_show)]])
self.data = []
self.ss_bit = self.ss_start = self.samplenum
return ret == 0
return valid
def decode(self):
if not self.samplerate:
raise SamplerateError('Cannot decode without samplerate.')
cd_count = None
if self.options['cd_freq']:
cd_count = int(self.samplerate / self.options['cd_freq']) + 1
prev_ir = None
prev_ir = None
if self.options['polarity'] == 'auto':
# Take sample 0 as reference.
curr_level, = self.wait({'skip': 0})
active = 1 - curr_level
active = 0 if self.options['polarity'] == 'active-low' else 1
self.is_extended = self.options['extended'] == 'yes'
want_addr_len = 16 if self.is_extended else 8
while True:
# Detect changes in the presence of an active input signal.
@ -182,54 +236,64 @@ class Decoder(srd.Decoder):
# active period, but will shift their signal changes by one
# carrier period before they get passed to decoding logic.
if cd_count:
(cur_ir,) = self.wait([{0: 'e'}, {'skip': cd_count}])
(cur_ir,) = self.wait([{Pin.IR: 'e'}, {'skip': cd_count}])
if (self.matched & (0b1 << 0)):
cur_ir = self.active
cur_ir = active
if cur_ir == prev_ir:
prev_ir = cur_ir
self.ir = cur_ir
(self.ir,) = self.wait({0: 'e'})
(self.ir,) = self.wait({Pin.IR: 'e'})
if self.ir != self.active:
# Save the non-active edge, then wait for the next edge.
if self.ir != active:
# Save the location of the non-active edge (recessive),
# then wait for the next edge. Immediately process the
# end of the STOP bit which completes an IR frame.
self.ss_other_edge = self.samplenum
if self.state != 'STOP':
b = self.samplenum - self.ss_bit
# Reset internal state for long periods of idle level.
width = self.samplenum - self.ss_bit
if width >= self.idle_to and self.state != 'STOP':
# State machine.
if self.state == 'IDLE':
if self.compare_with_tolerance(b, self.lc):
if self.compare_with_tolerance(width, self.lc):
self.putx([5, ['Leader code', 'Leader', 'LC', 'L']])
self.putx([Ann.LEADER_CODE, ['Leader code', 'Leader', 'LC', 'L']])
self.ss_remote = self.ss_start
self.data = self.count = 0
self.data = []
self.state = 'ADDRESS'
elif self.compare_with_tolerance(b, self.rc):
elif self.compare_with_tolerance(width, self.rc):
self.samplenum += self.stop
self.putx([10, ['Repeat code', 'Repeat', 'RC', 'R']])
self.data = self.count = 0
self.putx([Ann.REPEAT_CODE, ['Repeat code', 'Repeat', 'RC', 'R']])
self.data = []
self.ss_bit = self.ss_start = self.samplenum
elif self.state == 'ADDRESS':
if self.count == 8:
self.state = 'ADDRESS#' if self.data_ok() else 'IDLE'
if len(self.data) == want_addr_len:
self.data_ok(False, want_addr_len)
self.state = 'COMMAND' if self.is_extended else 'ADDRESS#'
elif self.state == 'ADDRESS#':
if self.count == 16:
self.state = 'COMMAND' if self.data_ok() else 'IDLE'
if len(self.data) == 16:
self.data_ok(True, 8)
self.state = 'COMMAND'
elif self.state == 'COMMAND':
if self.count == 8:
self.state = 'COMMAND#' if self.data_ok() else 'IDLE'
if len(self.data) == 8:
self.data_ok(False, 8)
self.state = 'COMMAND#'
elif self.state == 'COMMAND#':
if self.count == 16:
self.state = 'STOP' if self.data_ok() else 'IDLE'
if len(self.data) == 16:
self.data_ok(True, 8)
self.state = 'STOP'
elif self.state == 'STOP':
@ -61,13 +61,12 @@ class Decoder(srd.Decoder):
def reset(self):
self.samplerate = None
self.samplenum = None
self.edges, self.bits, self.ss_es_bits = [], [], []
self.state = 'IDLE'
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
self.old_ir = 1 if self.options['polarity'] == 'active-low' else 0
self.next_edge = 'l' if self.options['polarity'] == 'active-low' else 'h'
def metadata(self, key, value):
if key == srd.SRD_CONF_SAMPLERATE:
@ -143,11 +142,7 @@ class Decoder(srd.Decoder):
raise SamplerateError('Cannot decode without samplerate.')
while True:
(self.ir,) = self.wait()
# Wait for any edge (rising or falling).
if self.old_ir == self.ir:
(self.ir,) = self.wait({0: self.next_edge})
# State machine.
if self.state == 'IDLE':
@ -155,7 +150,7 @@ class Decoder(srd.Decoder):
self.bits.append([self.samplenum, bit])
self.state = 'MID1'
self.old_ir = self.ir
self.next_edge = 'l' if self.ir else 'h'
edge = self.edge_type()
if edge == 'e':
@ -184,4 +179,4 @@ class Decoder(srd.Decoder):
self.old_ir = self.ir
self.next_edge = 'l' if self.ir else 'h'
@ -108,15 +108,15 @@ class Decoder(srd.Decoder):
def read_pulse(self, high, time):
e = 'f' if high else 'r'
max_time = int(time * 1.30)
(ir,), ss, es, (edge, timeout) = self.wait_wrap([{0: e}], max_time)
if timeout or not self.tolerance(ss, es, time):
(ir,), ss, es, matched = self.wait_wrap([{0: e}], max_time)
if (matched & 0b1) or not self.tolerance(ss, es, time):
raise SIRCError('Timeout')
return ir, ss, es, (edge, timeout)
return ir, ss, es, matched
def read_bit(self):
e = 'f' if self.active else 'r'
_, high_ss, high_es, (edge, timeout) = self.wait_wrap([{0: e}], 2000)
if timeout:
_, high_ss, high_es, matched = self.wait_wrap([{0: e}], 2000)
if (matched & 0b1):
raise SIRCError('Bit High Timeout')
if self.tolerance(high_ss, high_es, ONE_USEC):
bit = 1
@ -256,15 +256,11 @@ class Decoder(srd.Decoder):
def decode(self):
bMoreMatch = False
while True:
if self.timeout == 0:
rising_edge, = self.wait({0: 'e'})
bMoreMatch = False
rising_edge, = self.wait([{0: 'e'}, {'skip': self.timeout}])
bMoreMatch = True
# If this is the first edge, we only update ss
if self.ss == 0:
@ -214,7 +214,7 @@ class Decoder(srd.Decoder):
def end(self):
if self.done_break and len(self.lin_rsp):
def decode(self, ss, es, data):
ptype, rxtx, pdata = data
@ -30,8 +30,12 @@ class Decoder(srd.Decoder):
outputs = []
tags = ['IC', 'Sensor']
annotations = (
('celsius', 'Temperature in degrees Celsius'),
('kelvin', 'Temperature in Kelvin'),
('celsius', 'Temperature / °C'),
('kelvin', 'Temperature / K'),
annotation_rows = (
('temps-celsius', 'Temperature / °C', (0,)),
('temps-kelvin', 'Temperature / K', (1,)),
def __init__(self):
@ -23,6 +23,7 @@ from math import ceil
RX = 0
TX = 1
rxtx_channels = ('RX', 'TX')
class No_more_data(Exception):
'''This exception is a signal that we should stop parsing an ADU as there
@ -125,18 +126,18 @@ class Modbus_ADU:
self.annotation_prefix + 'error',
'Message too short or not finished')
self.hasError = True
if self.hasError and self.parent.options['channel'] == 'RX':
# If we are on RX mode (so client->server and server->client
# messages can be seperated) we like to mark blocks containing
# errors. We don't do this in TX mode, because then we interpret
# each frame as both a client->server and server->client frame, and
if self.hasError and self.parent.options['scchannel'] != self.parent.options['cschannel']:
# If we are decoding different channels (so client->server and
# server->client messages can be separated) we like to mark blocks
# containing errors. We don't do this when decoding the same
# channel as both a client->server and server->client frame, and
# one of those is bound to contain an error, making highlighting
# frames useless.
self.parent.puta(data[0].start, data[-1].end,
'error-indication', 'Frame contains error')
if len(data) > 256:
self.puti(len(data) - 1, self.annotation_prefix + 'error',
self.puti(len(data) - 1, 'error',
'Modbus data frames are limited to 256 bytes')
except No_more_data:
@ -814,7 +815,7 @@ class Modbus_ADU_CS(Modbus_ADU):
class Decoder(srd.Decoder):
api_version = 3
id = 'modbus'
name = 'Modbus RTU'
name = 'Modbus'
longname = 'Modbus RTU over RS232/RS485'
desc = 'Modbus RTU protocol for industrial applications.'
license = 'gplv3+'
@ -822,30 +823,33 @@ class Decoder(srd.Decoder):
outputs = ['modbus']
tags = ['Embedded/industrial']
annotations = (
('sc-server-id', ''),
('sc-function', ''),
('sc-crc', ''),
('sc-address', ''),
('sc-data', ''),
('sc-length', ''),
('sc-error', ''),
('cs-server-id', ''),
('cs-function', ''),
('cs-crc', ''),
('cs-address', ''),
('cs-data', ''),
('cs-length', ''),
('cs-error', ''),
('error-indication', ''),
('sc-server-id', 'SC server ID'),
('sc-function', 'SC function'),
('sc-crc', 'SC CRC'),
('sc-address', 'SC address'),
('sc-data', 'SC data'),
('sc-length', 'SC length'),
('sc-error', 'SC error'),
('cs-server-id', 'CS server ID'),
('cs-function', 'CS function'),
('cs-crc', 'CS CRC'),
('cs-address', 'CS address'),
('cs-data', 'CS data'),
('cs-length', 'CS length'),
('cs-error', 'CS error'),
('error-indication', 'Error indication'),
annotation_rows = (
('sc', 'Server->client', (7, 8, 9, 10, 11, 12, 13)),
('cs', 'Client->server', (0, 1, 2, 3, 4, 5, 6)),
('error-indicator', 'Errors in frame', (14,)),
('sc', 'Server->client', (0, 1, 2, 3, 4, 5, 6)),
('cs', 'Client->server', (7, 8, 9, 10, 11, 12, 13)),
('error-indicators', 'Errors in frame', (14,)),
options = (
{'id': 'channel', 'desc': 'Direction', 'default': 'TX',
'values': ('TX', 'RX'), 'idn':'dec_modbus_opt_channel'},
{'id': 'scchannel', 'desc': 'Server -> client channel',
'default': rxtx_channels[0], 'values': rxtx_channels, 'idn':'dec_modbus_opt_scchannel'},
{'id': 'cschannel', 'desc': 'Client -> server channel',
'default': rxtx_channels[1], 'values': rxtx_channels, 'idn':'dec_modbus_opt_cschannel'},
{'id': 'framegap', 'desc': 'Inter-frame bit gap', 'default': 28, 'idn':'dec_modbus_opt_framegap'},
def __init__(self):
@ -909,7 +913,7 @@ class Decoder(srd.Decoder):
# somewhere between seems fine.
# A character is 11 bits long, so (3.5 + 1.5)/2 * 11 ~= 28
# TODO: Display error for too short or too long.
if (ss - ADU.last_read) <= self.bitlength * 28:
if (ss - ADU.last_read) <= self.bitlength * self.options['framegap']:
ADU.add_data(ss, es, data)
# It's been too long since the last part of the ADU!
@ -926,9 +930,13 @@ class Decoder(srd.Decoder):
def decode(self, ss, es, data):
ptype, rxtx, pdata = data
# Ignore unknown/unsupported ptypes.
if ptype not in ('STARTBIT', 'DATA', 'STOPBIT'):
# Decide what ADU(s) we need this packet to go to.
# Note that it's possible to go to both ADUs.
if self.options['channel'] == 'TX':
if rxtx_channels[rxtx] == self.options['scchannel']:
self.decode_adu(ss, es, data, 'Sc')
if self.options['channel'] == 'RX':
if rxtx_channels[rxtx] == self.options['cschannel']:
self.decode_adu(ss, es, data, 'Cs')
@ -20,6 +20,8 @@
import sigrokdecode as srd
from .lists import *
TX, RX = range(2)
class Decoder(srd.Decoder):
api_version = 3
id = 'mrf24j40'
@ -31,16 +33,30 @@ class Decoder(srd.Decoder):
outputs = []
tags = ['IC', 'Wireless/RF']
annotations = (
('sread', 'Short register read commands'),
('swrite', 'Short register write commands'),
('lread', 'Long register read commands'),
('lwrite', 'Long register write commands'),
('warning', 'Warnings'),
('sread', 'Short register read'),
('swrite', 'Short register write'),
('lread', 'Long register read'),
('lwrite', 'Long register write'),
('warning', 'Warning'),
('tx-frame', 'TX frame'),
('rx-frame', 'RX frame'),
('tx-retry-1', '1x TX retry'),
('tx-retry-2', '2x TX retry'),
('tx-retry-3', '3x TX retry'),
('tx-fail', 'TX fail (too many retries)'),
('ccafail', 'CCAFAIL (channel busy)'),
annotation_rows = (
('read', 'Read', (0, 2)),
('write', 'Write', (1, 3)),
('reads', 'Reads', (0, 2)),
('writes', 'Writes', (1, 3)),
('warnings', 'Warnings', (4,)),
('tx-frames', 'TX frames', (5,)),
('rx-frames', 'RX frames', (6,)),
('tx-retries-1', '1x TX retries', (7,)),
('tx-retries-2', '2x TX retries', (8,)),
('tx-retries-3', '3x TX retries', (9,)),
('tx-fails', 'TX fails', (10,)),
('ccafails', 'CCAFAILs', (11,)),
def __init__(self):
@ -48,8 +64,9 @@ class Decoder(srd.Decoder):
def reset(self):
self.ss_cmd, self.es_cmd = 0, 0
self.mosi_bytes = []
self.miso_bytes = []
self.ss_frame, self.es_frame = [0, 0], [0, 0]
self.mosi_bytes, self.miso_bytes = [], []
self.framecache = [[], []]
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
@ -68,10 +85,34 @@ class Decoder(srd.Decoder):
write = self.mosi_bytes[0] & 0x1
reg = (self.mosi_bytes[0] >> 1) & 0x3f
reg_desc = sregs.get(reg, 'illegal')
for rxtx in (RX, TX):
if self.framecache[rxtx] == []:
bit0 = self.mosi_bytes[1] & (1 << 0)
if rxtx == TX and not (reg_desc == 'TXNCON' and bit0 == 1):
if rxtx == RX and not (reg_desc == 'RXFLUSH' and bit0 == 1):
idx = 5 if rxtx == TX else 6
xmitdir = 'TX' if rxtx == TX else 'RX'
frame = ' '.join(['%02X' % b for b in self.framecache[rxtx]])
self.put(self.ss_frame[rxtx], self.es_frame[rxtx], self.out_ann,
[idx, ['%s frame: %s' % (xmitdir, frame)]])
self.framecache[rxtx] = []
if write:
self.putx([1, ['%s: {$}' % reg_desc, '@%02X' % self.mosi_bytes[1]]])
self.putx([1, ['%s: {$}' % (reg_desc, '@%02X' % self.mosi_bytes[1])]])
self.putx([0, ['%s: {$}' % reg_desc, '@%02X' % self.miso_bytes[1]]])
numretries = (self.miso_bytes[1] & 0xc0) >> 6
if reg_desc == 'TXSTAT' and numretries > 0:
txfail = 1 if ((self.miso_bytes[1] & (1 << 0)) != 0) else 0
idx = 6 + numretries + txfail
if txfail:
self.putx([idx, ['TX fail (>= 4 retries)', 'TX fail']])
self.putx([idx, ['TX retries: %d' % numretries]])
if reg_desc == 'TXSTAT' and (self.miso_bytes[1] & (1 << 5)) != 0:
self.putx([11, ['CCAFAIL (channel busy)', 'CCAFAIL']])
def handle_long(self):
dword = self.mosi_bytes[0] << 8 | self.mosi_bytes[1]
@ -99,6 +140,16 @@ class Decoder(srd.Decoder):
self.putx([2, ['%s: {$}' % reg_desc, '@%02X' % self.miso_bytes[2]]])
for rxtx in (RX, TX):
if rxtx == RX and reg_desc[:3] != 'RX:':
if rxtx == TX and reg_desc[:3] != 'TX:':
if len(self.framecache[rxtx]) == 0:
self.ss_frame[rxtx] = self.ss_cmd
self.es_frame[rxtx] = self.es_cmd
self.framecache[rxtx] += [self.mosi_bytes[2]] if rxtx == TX else [self.miso_bytes[2]]
def decode(self, ss, es, data):
ptype = data[0]
if ptype == 'CS-CHANGE':
@ -23,6 +23,12 @@ NUM_OUTPUT_CHANNELS = 8
# TODO: Other I²C functions: general call / reset address, device ID address.
def logic_channels(num_channels):
l = []
for i in range(num_channels):
l.append(tuple(['p%d' % i, 'P%d' % i]))
return tuple(l)
class Decoder(srd.Decoder):
api_version = 3
id = 'pca9571'
@ -36,8 +42,9 @@ class Decoder(srd.Decoder):
annotations = (
('register', 'Register type'),
('value', 'Register value'),
('warning', 'Warning messages'),
('warning', 'Warning'),
logic_output_channels = logic_channels(NUM_OUTPUT_CHANNELS)
annotation_rows = (
('regs', 'Registers', (0, 1)),
('warnings', 'Warnings', (2,)),
@ -49,13 +56,24 @@ class Decoder(srd.Decoder):
def reset(self):
self.state = 'IDLE'
self.last_write = 0xFF # Chip port default state is high.
self.last_write_es = 0
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
self.out_logic = self.register(srd.OUTPUT_LOGIC)
def flush(self):
def putx(self, data):
self.put(self.ss, self.es, self.out_ann, data)
def put_logic_states(self):
if (self.es > self.last_write_es):
data = bytes([self.last_write])
self.put(self.last_write_es, self.es, self.out_logic, [0, data])
self.last_write_es = self.es
def handle_io(self, b):
if self.state == 'READ DATA':
operation = ['Outputs read', 'R']
@ -64,7 +82,9 @@ class Decoder(srd.Decoder):
'(%02X) are different' % self.last_write]])
operation = ['Outputs set', 'W']
self.last_write = b
self.putx([1, [operation[0] + ': %02X' % b,
operation[1] + ': %02X' % b]])
@ -2,8 +2,9 @@
## This file is part of the libsigrokdecode project.
## Copyright (C) 2016 Vladimir Ermakov <vooon341@gmail.com>
## Copyright (C) 2023 DreamSourceLab <support@dreamsourcelab.com>
## Copyright (C) 2021 Michael Miller <makuna@live.com>
## Copyright (C) 2023 DreamSourceLab <support@dreamsourcelab.com>
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
Normal file
Normal file
@ -0,0 +1,35 @@
## This file is part of the libsigrokdecode project.
## Copyright (C) 2022 Gerhard Sittig <gerhard.sittig@gmx.net>
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## GNU General Public License for more details.
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
SBUS by Futaba, a hobby remote control protocol on top of UART.
Sometimes referred to as "Serial BUS" or S-BUS.
UART communication typically runs at 100kbps with 8e2 frame format and
inverted signals (high voltage level is logic low).
SBUS messages take 3ms to transfer, and typically repeat in intervals
of 7ms or 14ms. An SBUS message consists of 25 UART bytes, and carries
16 proportional channels with 11 bits each, and 2 digital channels
(boolean, 1 bit), and flags which represent current communication state.
Proportional channel values typically are in the 192..1792 range, but
individual implementations may differ.
from .pd import Decoder
Normal file
Normal file
@ -0,0 +1,273 @@
## This file is part of the libsigrokdecode project.
## Copyright (C) 2022 Gerhard Sittig <gerhard.sittig@gmx.net>
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## GNU General Public License for more details.
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
(<ptype>, <pdata>)
This is the list of <ptype> codes and their respective <pdata> values:
- 'HEADER': The data is the header byte's value.
- 'PROPORTIONAL': The data is a tuple of the channel number (1-based)
and the channel's value.
- 'DIGITAL': The data is a tuple of the channel number (1-based)
and the channel's value.
- 'FLAG': The data is a tuple of the flag's name, and the flag's value.
- 'FOOTER': The data is the footer byte's value.
import sigrokdecode as srd
from common.srdhelper import bitpack_lsb
class Ann:
WARN = range(7)
class Decoder(srd.Decoder):
api_version = 3
id = 'sbus_futaba'
name = 'SBUS (Futaba)'
longname = 'Futaba SBUS (Serial bus)'
desc = 'Serial bus for hobby remote control by Futaba'
license = 'gplv2+'
inputs = ['uart']
outputs = ['sbus_futaba']
tags = ['Remote Control']
options = (
{'id': 'prop_val_min', 'desc': 'Proportional value lower boundary', 'default': 0},
{'id': 'prop_val_max', 'desc': 'Proportional value upper boundary', 'default': 2047},
annotations = (
('header', 'Header'),
('proportional', 'Proportional'),
('digital', 'Digital'),
('framelost', 'Frame Lost'),
('failsafe', 'Failsafe'),
('footer', 'Footer'),
('warning', 'Warning'),
annotation_rows = (
('framing', 'Framing', (Ann.HEADER, Ann.FOOTER,
('channels', 'Channels', (Ann.PROPORTIONAL, Ann.DIGITAL)),
('warnings', 'Warnings', (Ann.WARN,)),
def __init__(self):
self.bits_accum = []
self.sent_fields = None
self.msg_complete = None
self.failed = None
def reset(self):
self.sent_fields = 0
self.msg_complete = False
self.failed = None
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
self.out_py = self.register(srd.OUTPUT_PYTHON)
def putg(self, ss, es, data):
# Put a graphical annotation.
self.put(ss, es, self.out_ann, data)
def putpy(self, ss, es, data):
# Pass Python to upper layers.
self.put(ss, es, self.out_py, data)
def get_ss_es_bits(self, bitcount):
# Get start/end times, and bit values of given length.
# Gets all remaining data when 'bitcount' is None.
if bitcount is None:
bitcount = len(self.bits_accum)
if len(self.bits_accum) < bitcount:
return None, None, None
bits = self.bits_accum[:bitcount]
self.bits_accum = self.bits_accum[bitcount:]
ss, es = bits[0][1], bits[-1][2]
bits = [b[0] for b in bits]
return ss, es, bits
def flush_accum_bits(self):
# Valid data was queued. See if we got full SBUS fields so far.
# Annotate them early, cease inspection of failed messages. The
# implementation is phrased to reduce the potential for clipboard
# errors: 'upto' is the next supported field count, 'want' is one
# field's bit count. Grab as many as we find in an invocation.
upto = 0
if self.failed:
# Annotate the header byte. Not seeing the expected bit pattern
# emits a warning annotation, but by design won't fail the SBUS
# message. It's considered more useful to present the channels'
# values instead. The warning still raises awareness.
upto += 1
want = 8
while self.sent_fields < upto:
if len(self.bits_accum) < want:
ss, es, bits = self.get_ss_es_bits(want)
value = bitpack_lsb(bits)
text = ['0x{:02x}'.format(value)]
self.putg(ss, es, [Ann.HEADER, text])
if value != 0x0f:
text = ['Unexpected header', 'Header']
self.putg(ss, es, [Ann.WARN, text])
self.putpy(ss, es, ['HEADER', value])
self.sent_fields += 1
# Annotate the proportional channels' data. Check for user
# provided value range violations. Channel numbers are in
# the 1..18 range (1-based).
upto += 16
want = 11
while self.sent_fields < upto:
if len(self.bits_accum) < want:
ss, es, bits = self.get_ss_es_bits(want)
value = bitpack_lsb(bits)
text = ['{:d}'.format(value)]
self.putg(ss, es, [Ann.PROPORTIONAL, text])
if value < self.options['prop_val_min']:
text = ['Low proportional value', 'Low value', 'Low']
self.putg(ss, es, [Ann.WARN, text])
if value > self.options['prop_val_max']:
text = ['High proportional value', 'High value', 'High']
self.putg(ss, es, [Ann.WARN, text])
idx = self.sent_fields - (upto - 16)
ch_nr = 1 + idx
self.putpy(ss, es, ['PROPORTIONAL', (ch_nr, value)])
self.sent_fields += 1
# Annotate the digital channels' data.
upto += 2
want = 1
while self.sent_fields < upto:
if len(self.bits_accum) < want:
ss, es, bits = self.get_ss_es_bits(want)
value = bitpack_lsb(bits)
text = ['{:d}'.format(value)]
self.putg(ss, es, [Ann.DIGITAL, text])
idx = self.sent_fields - (upto - 2)
ch_nr = 17 + idx
self.putpy(ss, es, ['DIGITAL', (ch_nr, value)])
self.sent_fields += 1
# Annotate the flags' state. Index starts from LSB.
flag_names = ['framelost', 'failsafe', 'msb']
upto += 2
want = 1
while self.sent_fields < upto:
if len(self.bits_accum) < want:
ss, es, bits = self.get_ss_es_bits(want)
value = bitpack_lsb(bits)
text = ['{:d}'.format(value)]
idx = self.sent_fields - (upto - 2)
cls = Ann.FLAG_LSB + idx
self.putg(ss, es, [cls, text])
flg_name = flag_names[idx]
self.putpy(ss, es, ['FLAG', (flg_name, value)])
self.sent_fields += 1
# Warn when flags' padding (bits [7:4]) is unexpexted.
upto += 1
want = 4
while self.sent_fields < upto:
if len(self.bits_accum) < want:
ss, es, bits = self.get_ss_es_bits(want)
value = bitpack_lsb(bits)
if value != 0x0:
text = ['Unexpected MSB flags', 'Flags']
self.putg(ss, es, [Ann.WARN, text])
flg_name = flag_names[-1]
self.putpy(ss, es, ['FLAG', (flg_name, value)])
self.sent_fields += 1
# Annotate the footer byte. Warn when unexpected.
upto += 1
want = 8
while self.sent_fields < upto:
if len(self.bits_accum) < want:
ss, es, bits = self.get_ss_es_bits(want)
value = bitpack_lsb(bits)
text = ['0x{:02x}'.format(value)]
self.putg(ss, es, [Ann.FOOTER, text])
if value != 0x00:
text = ['Unexpected footer', 'Footer']
self.putg(ss, es, [Ann.WARN, text])
self.putpy(ss, es, ['FOOTER', value])
self.sent_fields += 1
# Check for the completion of an SBUS message. Warn when more
# UART data is seen after the message. Defer the warning until
# more bits were collected, flush at next IDLE or BREAK, which
# spans all unprocessed data, and improves perception.
if self.sent_fields >= upto:
self.msg_complete = True
if self.msg_complete and self.bits_accum:
self.failed = ['Excess data bits', 'Excess']
def handle_bits(self, ss, es, bits):
# UART data bits were seen. Store them, validity is yet unknown.
def handle_frame(self, ss, es, value, valid):
# A UART frame became complete. Get its validity. Process its bits.
if not valid:
self.failed = ['Invalid data', 'Invalid']
def handle_idle(self, ss, es):
# An IDLE period was seen in the UART level. Flush, reset state.
if self.bits_accum and not self.failed:
self.failed = ['Unprocessed data bits', 'Unprocessed']
if self.bits_accum and self.failed:
ss, es, _ = self.get_ss_es_bits(None)
self.putg(ss, es, [Ann.WARN, self.failed])
def handle_break(self, ss, es):
# A BREAK period was seen in the UART level. Warn, reset state.
break_ss, break_es = ss, es
if not self.failed:
self.failed = ['BREAK condition', 'Break']
# Re-use logic for "annotated bits warning".
self.handle_idle(None, None)
# Unconditionally annotate BREAK as warning.
text = ['BREAK condition', 'Break']
self.putg(ss, es, [Ann.WARN, text])
def decode(self, ss, es, data):
# Implementor's note: Expects DATA bits to arrive before FRAME
# validity. Either of IDLE or BREAK terminates an SBUS message.
ptype, rxtx, pdata = data
if ptype == 'DATA':
_, bits = pdata
self.handle_bits(ss, es, bits)
elif ptype == 'FRAME':
value, valid = pdata
self.handle_frame(ss, es, value, valid)
elif ptype == 'IDLE':
self.handle_idle(ss, es)
elif ptype == 'BREAK':
self.handle_break(ss, es)
@ -1,7 +1,7 @@
## This file is part of the libsigrokdecode project.
## Copyright (C) 2012-2014 Uwe Hermann <uwe@hermann-uwe.de>
## Copyright (C) 2012-2020 Uwe Hermann <uwe@hermann-uwe.de>
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
@ -18,8 +18,15 @@
import sigrokdecode as srd
from common.srdhelper import SrdIntEnum
from common.sdcard import (cmd_names, acmd_names)
responses = '1 1b 2 3 7'.split()
a = ['CMD%d' % i for i in range(64)] + ['ACMD%d' % i for i in range(64)] + \
['R' + r.upper() for r in responses] + ['BIT', 'BIT_WARNING']
Ann = SrdIntEnum.from_list('Ann', a)
class Decoder(srd.Decoder):
api_version = 3
id = 'sdcard_spi'
@ -30,23 +37,16 @@ class Decoder(srd.Decoder):
inputs = ['spi']
outputs = []
tags = ['Memory']
# annotations length: 135 + 1
annotations = \
tuple(('cmd%d' % i, 'CMD%d' % i) for i in range(64)) + \
tuple(('acmd%d' % i, 'ACMD%d' % i) for i in range(64)) + ( \
('r1', 'R1 reply'),
('r1b', 'R1B reply'),
('r2', 'R2 reply'),
('r3', 'R3 reply'),
('r7', 'R7 reply'),
('bits:', 'Bits'),
('bits', 'Bits'),
('bit-warnings', 'Bit warnings'),
tuple(('acmd%d' % i, 'ACMD%d' % i) for i in range(64)) + \
tuple(('r%s' % r, 'R%s response' % r) for r in responses) + ( \
('bit', 'Bit'),
('bit-warning', 'Bit warning'),
annotation_rows = (
('bits', 'Bits', (134, 135)),
('cmd-reply', 'Commands/replies', tuple(range(133))), #0-132
('bits', 'Bits', (Ann.BIT, Ann.BIT_WARNING)),
('commands-replies', 'Commands/replies', Ann.prefixes('CMD ACMD R')),
def __init__(self):
@ -57,6 +57,7 @@ class Decoder(srd.Decoder):
self.ss, self.es = 0, 0
self.ss_bit, self.es_bit = 0, 0
self.ss_cmd, self.es_cmd = 0, 0
self.ss_busy, self.es_busy = 0, 0
self.cmd_token = []
self.cmd_token_bits = []
self.is_acmd = False # Indicates CMD vs. ACMD
@ -65,6 +66,9 @@ class Decoder(srd.Decoder):
self.cmd_str = ''
self.is_cmd24 = False
self.cmd24_start_token_found = False
self.is_cmd17 = False
self.cmd17_start_token_found = False
self.busy_first_byte = False
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
@ -74,9 +78,6 @@ class Decoder(srd.Decoder):
def putc(self, cmd, desc):
self.putx([cmd, ['%s: %s' % (self.cmd_str, desc)]])
def putc_4(self, cmd, desc, v):
self.putx([cmd, ['%s: %s' % (self.cmd_str, desc), '@%04X' % v]])
def putb(self, data):
self.put(self.ss_bit, self.es_bit, self.out_ann, data)
@ -124,39 +125,39 @@ class Decoder(srd.Decoder):
# Bits[47:47]: Start bit (always 0)
bit, self.ss_bit, self.es_bit = tb(5, 7)[0], tb(5, 7)[1], tb(5, 7)[2]
if bit == 0:
self.putb([134, ['Start bit: %d' % bit]])
self.putb([Ann.BIT, ['Start bit: %d' % bit]])
self.putb([135, ['Start bit: %s (Warning: Must be 0!)' % bit]])
self.putb([Ann.BIT_WARNING, ['Start bit: %s (Warning: Must be 0!)' % bit]])
# Bits[46:46]: Transmitter bit (1 == host)
bit, self.ss_bit, self.es_bit = tb(5, 6)[0], tb(5, 6)[1], tb(5, 6)[2]
if bit == 1:
self.putb([134, ['Transmitter bit: %d' % bit]])
self.putb([Ann.BIT, ['Transmitter bit: %d' % bit]])
self.putb([135, ['Transmitter bit: %d (Warning: Must be 1!)' % bit]])
self.putb([Ann.BIT_WARNING, ['Transmitter bit: %d (Warning: Must be 1!)' % bit]])
# Bits[45:40]: Command index (BCD; valid: 0-63)
cmd = self.cmd_index = t[0] & 0x3f
self.ss_bit, self.es_bit = tb(5, 5)[1], tb(5, 0)[2]
self.putb([134, ['Command: %s%d (%s)' % (s, cmd, self.cmd_name(cmd))]])
self.putb([Ann.BIT, ['Command: %s%d (%s)' % (s, cmd, self.cmd_name(cmd))]])
# Bits[39:8]: Argument
self.arg = (t[1] << 24) | (t[2] << 16) | (t[3] << 8) | t[4]
self.ss_bit, self.es_bit = tb(4, 7)[1], tb(1, 0)[2]
self.putb([134, ['Argument: {$}', '@%04X' % self.arg]])
self.putb([Ann.BIT, ['Argument: 0x%04x' % self.arg]])
# Bits[7:1]: CRC7
# TODO: Check CRC7.
crc = t[5] >> 1
self.ss_bit, self.es_bit = tb(0, 7)[1], tb(0, 1)[2]
self.putb([134, ['CRC7: {$}', '@%01X' % crc]])
self.putb([Ann.BIT, ['CRC7: 0x%01x' % crc]])
# Bits[0:0]: End bit (always 1)
bit, self.ss_bit, self.es_bit = tb(0, 0)[0], tb(0, 0)[1], tb(0, 0)[2]
if bit == 1:
self.putb([134, ['End bit: %d' % bit]])
self.putb([Ann.BIT, ['End bit: %d' % bit]])
self.putb([135, ['End bit: %d (Warning: Must be 1!)' % bit]])
self.putb([Ann.BIT_WARNING, ['End bit: %d (Warning: Must be 1!)' % bit]])
# Handle command.
if cmd in (0, 1, 9, 16, 17, 24, 41, 49, 55, 59):
@ -164,26 +165,26 @@ class Decoder(srd.Decoder):
self.cmd_str = '%s%d (%s)' % (s, cmd, self.cmd_name(cmd))
self.state = 'HANDLE CMD999'
a = '%s%d: {$}' % (s, cmd)
self.putx([cmd, [a, '@' + '%02x %02x %02x %02x %02x %02x' % (tuple(t))]])
a = '%s%d: %02x %02x %02x %02x %02x %02x' % ((s, cmd) + tuple(t))
self.putx([cmd, [a]])
def handle_cmd0(self):
self.putc(0, 'Reset the SD card')
self.putc(Ann.CMD0, 'Reset the SD card')
self.state = 'GET RESPONSE R1'
def handle_cmd1(self):
self.putc(1, 'Send HCS info and activate the card init process')
self.putc(Ann.CMD1, 'Send HCS info and activate the card init process')
hcs = (self.arg & (1 << 30)) >> 30
self.ss_bit = self.cmd_token_bits[5 - 4][6][1]
self.es_bit = self.cmd_token_bits[5 - 4][6][2]
self.putb([134, ['HCS: %d' % hcs]])
self.putb([Ann.BIT, ['HCS: %d' % hcs]])
self.state = 'GET RESPONSE R1'
def handle_cmd9(self):
# CMD9: SEND_CSD (128 bits / 16 bytes)
self.putc(9, 'Ask card to send its card specific data (CSD)')
self.putc(Ann.CMD9, 'Ask card to send its card specific data (CSD)')
if len(self.read_buf) == 0:
self.ss_cmd = self.ss
@ -193,7 +194,7 @@ class Decoder(srd.Decoder):
self.es_cmd = self.es
self.read_buf = self.read_buf[4:] # TODO: Document or redo.
self.putx([9, ['CSD: %s' % self.read_buf]])
self.putx([Ann.CMD9, ['CSD: %s' % self.read_buf]])
# TODO: Decode all bits.
self.read_buf = []
### self.state = 'GET RESPONSE R1'
@ -201,11 +202,11 @@ class Decoder(srd.Decoder):
def handle_cmd10(self):
# CMD10: SEND_CID (128 bits / 16 bytes)
self.putc(10, 'Ask card to send its card identification (CID)')
self.putc(Ann.CMD10, 'Ask card to send its card identification (CID)')
if len(self.read_buf) < 16:
self.putx([10, ['CID: %s' % self.read_buf]])
self.putx([Ann.CMD10, ['CID: %s' % self.read_buf]])
# TODO: Decode all bits.
self.read_buf = []
self.state = 'GET RESPONSE R1'
@ -214,26 +215,18 @@ class Decoder(srd.Decoder):
self.blocklen = self.arg
# TODO: Sanity check on block length.
self.putc(16, 'Set the block length to %d bytes' % self.blocklen)
self.putc(Ann.CMD16, 'Set the block length to %d bytes' % self.blocklen)
self.state = 'GET RESPONSE R1'
def handle_cmd17(self):
self.putc_4(17, 'Read a block from address {$}', self.arg)
if len(self.read_buf) == 0:
self.ss_cmd = self.ss
if len(self.read_buf) < self.blocklen + 2: # FIXME
self.es_cmd = self.es
self.read_buf = self.read_buf[2:] # FIXME
self.putx([17, ['Block data: %s' % self.read_buf]])
self.read_buf = []
self.putc(Ann.CMD17, 'Read a block from address 0x%04x' % self.arg)
self.is_cmd17 = True
self.state = 'GET RESPONSE R1'
def handle_cmd24(self):
self.putc_4(24, 'Write a block to address {$}', self.arg)
self.putc(Ann.CMD24, 'Write a block to address 0x%04x' % self.arg)
self.is_cmd24 = True
self.state = 'GET RESPONSE R1'
@ -242,7 +235,7 @@ class Decoder(srd.Decoder):
def handle_cmd55(self):
self.putc(55, 'Next command is an application-specific command')
self.putc(Ann.CMD55, 'Next command is an application-specific command')
self.is_acmd = True
self.state = 'GET RESPONSE R1'
@ -250,12 +243,12 @@ class Decoder(srd.Decoder):
crc_on_off = self.arg & (1 << 0)
s = 'on' if crc_on_off == 1 else 'off'
self.putc(59, 'Turn the SD card CRC option %s' % s)
self.putc(Ann.CMD59, 'Turn the SD card CRC option %s' % s)
self.state = 'GET RESPONSE R1'
def handle_acmd41(self):
self.putc(64 + 41, 'Send HCS info and activate the card init process')
self.putc(Ann.ACMD41, 'Send HCS info and activate the card init process')
self.state = 'GET RESPONSE R1'
def handle_cmd999(self):
@ -303,12 +296,12 @@ class Decoder(srd.Decoder):
# Sent by the card after every command except for SEND_STATUS.
self.ss_cmd, self.es_cmd = self.miso_bits[7][1], self.miso_bits[0][2]
self.putx([65, ['R1: {$}', '@%02x' % res]])
self.putx([Ann.R1, ['R1: 0x%02x' % res]])
def putbit(bit, data):
b = self.miso_bits[bit]
self.ss_bit, self.es_bit = b[1], b[2]
self.putb([134, data])
self.putb([Ann.BIT, data])
# Bit 0: 'In idle state' bit
s = '' if (res & (1 << 0)) else 'not '
@ -341,6 +334,8 @@ class Decoder(srd.Decoder):
# Bit 7: Always set to 0
putbit(7, ['Bit 7 (always 0)'])
if self.is_cmd17:
self.state = 'HANDLE DATA BLOCK CMD17'
if self.is_cmd24:
self.state = 'HANDLE DATA BLOCK CMD24'
@ -364,6 +359,36 @@ class Decoder(srd.Decoder):
def handle_data_cmd17(self, miso):
# CMD17 returns one byte R1, then some bytes 0xff, then a Start Block
# (single byte 0xfe), then self.blocklen bytes of data, then always
# 2 bytes of CRC.
if self.cmd17_start_token_found:
if len(self.read_buf) == 0:
self.ss_data = self.ss
if not self.blocklen:
# Assume a fixed block size when inspection of the previous
# traffic did not provide the respective parameter value.
# TODO: Make the default block size a PD option?
self.blocklen = 512
# Wait until block transfer completed.
if len(self.read_buf) < self.blocklen:
if len(self.read_buf) == self.blocklen:
self.es_data = self.es
self.put(self.ss_data, self.es_data, self.out_ann, [Ann.CMD17, ['Block data: %s' % self.read_buf]])
elif len(self.read_buf) == (self.blocklen + 1):
self.ss_crc = self.ss
elif len(self.read_buf) == (self.blocklen + 2):
self.es_crc = self.es
# TODO: Check CRC.
self.put(self.ss_crc, self.es_crc, self.out_ann, [Ann.CMD17, ['CRC']])
self.state = 'IDLE'
elif miso == 0xfe:
self.put(self.ss, self.es, self.out_ann, [Ann.CMD17, ['Start Block']])
self.cmd17_start_token_found = True
def handle_data_cmd24(self, mosi):
if self.cmd24_start_token_found:
if len(self.read_buf) == 0:
@ -379,11 +404,11 @@ class Decoder(srd.Decoder):
if len(self.read_buf) < self.blocklen:
self.es_data = self.es
self.put(self.ss_data, self.es_data, self.out_ann, [24, ['Block data: %s' % self.read_buf]])
self.put(self.ss_data, self.es_data, self.out_ann, [Ann.CMD24, ['Block data: %s' % self.read_buf]])
self.read_buf = []
self.state = 'DATA RESPONSE'
elif mosi == 0xfe:
self.put(self.ss, self.es, self.out_ann, [24, ['Start Block']])
self.put(self.ss, self.es, self.out_ann, [Ann.CMD24, ['Start Block']])
self.cmd24_start_token_found = True
def handle_data_response(self, miso):
@ -403,21 +428,39 @@ class Decoder(srd.Decoder):
# Should we return to IDLE here?
m = self.miso_bits
self.put(m[7][1], m[5][2], self.out_ann, [134, ['Don\'t care']])
self.put(m[4][1], m[4][2], self.out_ann, [134, ['Always 0']])
self.put(m[7][1], m[5][2], self.out_ann, [Ann.BIT, ['Don\'t care']])
self.put(m[4][1], m[4][2], self.out_ann, [Ann.BIT, ['Always 0']])
if miso == 0x05:
self.put(m[3][1], m[1][2], self.out_ann, [134, ['Data accepted']])
self.put(m[3][1], m[1][2], self.out_ann, [Ann.BIT, ['Data accepted']])
elif miso == 0x0b:
self.put(m[3][1], m[1][2], self.out_ann, [134, ['Data rejected (CRC error)']])
self.put(m[3][1], m[1][2], self.out_ann, [Ann.BIT, ['Data rejected (CRC error)']])
elif miso == 0x0d:
self.put(m[3][1], m[1][2], self.out_ann, [134, ['Data rejected (write error)']])
self.put(m[0][1], m[0][2], self.out_ann, [134, ['Always 1']])
ann_class = None
self.put(m[3][1], m[1][2], self.out_ann, [Ann.BIT, ['Data rejected (write error)']])
self.put(m[0][1], m[0][2], self.out_ann, [Ann.BIT, ['Always 1']])
cls = Ann.CMD24 if self.is_cmd24 else None
if cls is not None:
self.put(self.ss, self.es, self.out_ann, [cls, ['Data Response']])
if self.is_cmd24:
ann_class = 24
if ann_class is not None:
self.put(self.ss, self.es, self.out_ann, [ann_class, ['Data Response']])
self.state = 'IDLE'
# We just send a block of data to be written to the card,
# this takes some time.
self.state = 'WAIT WHILE CARD BUSY'
self.busy_first_byte = True
self.state = 'IDLE'
def wait_while_busy(self, miso):
if miso != 0x00:
cls = Ann.CMD24 if self.is_cmd24 else None
if cls is not None:
self.put(self.ss_busy, self.es_busy, self.out_ann, [cls, ['Card is busy']])
self.state = 'IDLE'
if self.busy_first_byte:
self.ss_busy = self.ss
self.busy_first_byte = False
self.es_busy = self.es
def decode(self, ss, es, data):
ptype, mosi, miso = data
@ -465,7 +508,11 @@ class Decoder(srd.Decoder):
handle_response = getattr(self, s)
self.state = 'IDLE'
elif self.state == 'HANDLE DATA BLOCK CMD17':
elif self.state == 'HANDLE DATA BLOCK CMD24':
elif self.state == 'DATA RESPONSE':
elif self.state == 'WAIT WHILE CARD BUSY':
@ -2,7 +2,7 @@
## This file is part of the libsigrokdecode project.
## Copyright (C) 2014 Guenther Wenninger <robin@bitschubbser.org>
## Copyright (C) 2022 DreamSourceLab <support@dreamsourcelab.com>
## Copyright (C) 2023 DreamSourceLab <support@dreamsourcelab.com>
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
@ -18,6 +18,10 @@
## along with this program; if not, see <http://www.gnu.org/licenses/>.
# 2023/12/19 bug fixed: Aux Data Error
import sigrokdecode as srd
class SamplerateError(Exception):
@ -264,6 +268,7 @@ class Decoder(srd.Decoder):
for a in aux_audio_data:
sam = sam + str(a[0])
sam_rot = str(a[0]) + sam_rot
sample = subframe[4:24]
for s in sample:
sam = sam + str(s[0])
@ -274,7 +279,7 @@ class Decoder(srd.Decoder):
parity = subframe[27:28]
self.putx(aux_audio_data[0][1], aux_audio_data[3][2], \
[3, ['Aux 0x%x' % int(sam, 2), '0x%x' % int(sam, 2)]])
[3, ['Aux 0x%x' % int(sam[:4], 2), '0x%x' % int(sam[:4], 2)]])
self.putx(sample[0][1], sample[19][2], \
[3, ['Sample 0x%x' % int(sam, 2), '0x%x' % int(sam, 2)]])
self.putx(aux_audio_data[0][1], sample[19][2], \
@ -354,7 +359,7 @@ class Decoder(srd.Decoder):
parity = self.subframe[27:28]
self.putx(aux_audio_data[0][1], aux_audio_data[3][2], \
[3, ['Aux 0x%x' % int(sam, 2), '0x%x' % int(sam, 2)]])
[3, ['Aux 0x%x' % int(sam[:4], 2), '0x%x' % int(sam[:4], 2)]])
self.putx(sample[0][1], sample[19][2], \
[3, ['Sample 0x%x' % int(sam, 2), '0x%x' % int(sam, 2)]])
self.putx(aux_audio_data[0][1], sample[19][2], \
@ -1,299 +1,299 @@
# Source: https://raw.githubusercontent.com/WKPlus/rangedict/master/rangedict.py
__all__ = ['RangeDict']
class Color(object):
RED = 1
class Node(object):
__slots__ = ('r', 'left', 'right', 'value', 'color', 'parent')
def __init__(self, r, value, parent=None, color=Color.RED):
self.r = r
self.value = value
self.parent = parent
self.color = color
self.left = None
self.right = None
def value_copy(self, other):
self.r = other.r
self.value = other.value
class RangeDict(dict):
def __init__(self):
self._root = None
def __setitem__(self, r, v):
if r[1] < r[0]:
raise KeyError
node = self._insert(r, v)
def _insert(self, r, v):
if not self._root:
self._root = Node(r, v)
return self._root
cur = self._root
while True:
if r[1] < cur.r[0]:
if not cur.left:
cur.left = Node(r, v, cur)
return cur.left
cur = cur.left
elif r[0] > cur.r[1]:
if not cur.right:
cur.right = Node(r, v, cur)
return cur.right
cur = cur.right
raise KeyError # overlap not supported
def _insert_adjust(self, node):
''' adjust to make the tree still a red black tree '''
if not node.parent:
node.color = Color.BLACK
if node.parent.color == Color.BLACK:
uncle = self.sibling(node.parent)
if node_color(uncle) == Color.RED:
node.parent.color = Color.BLACK
uncle.color = Color.BLACK
node.parent.parent.color = Color.RED
return self._insert_adjust(node.parent.parent)
#parent is red and uncle is black
# since parent is red, grandparent must exists and be black
parent = node.parent
grandparent = parent.parent
if self.is_left_son(parent, grandparent):
if self.is_left_son(node, parent):
grandparent.color = Color.RED
parent.color = Color.BLACK
grandparent.color = Color.RED
node.color = Color.BLACK
if self.is_left_son(node, parent):
grandparent.color = Color.RED
node.color = Color.BLACK
grandparent.color = Color.RED
parent.color = Color.BLACK
def _find_key(self, key):
cur = self._root
while cur:
if key > cur.r[1]:
cur = cur.right
elif key < cur.r[0]:
cur = cur.left
return cur
def _find_range(self, r):
cur = self._root
while cur:
if r[1] < cur.r[0]:
cur = cur.left
elif r[0] > cur.r[1]:
cur = cur.right
elif r[0] == cur.r[0] and r[1] == cur.r[1]:
return cur
raise KeyError
raise KeyError
def __getitem__(self, key):
tar = self._find_key(key)
if tar:
return tar.value
raise KeyError
def __contains__(self, key):
return bool(self._find_key(key))
def __delitem__(self, r):
node = self._find_range(r)
if node.left and node.right:
left_rightest_child = self.find_rightest(node.left)
node = left_rightest_child
def _delete(self, node):
# node has at most one child
child = node.left if node.left else node.right
if not node.parent: # node is root
self._root = child
if self._root:
self._root.parent = None
self._root.color = Color.BLACK
parent = node.parent
if not child:
child = Node(None, None, parent, Color.BLACK)
if self.is_left_son(node, parent):
parent.left = child
parent.right = child
child.parent = parent
if node.color == Color.RED:
# no need to adjust when deleting a red node
if node_color(child) == Color.RED:
child.color = Color.BLACK
if not child.r:
# mock a None node for adjust, need to delete it after that
parent = child.parent
if self.is_left_son(child, parent):
parent.left = None
parent.right = None
def _delete_adjust(self, node):
if not node.parent:
node.color = Color.BLACK
parent = node.parent
sibling = self.sibling(node)
if node_color(sibling) == Color.RED:
if self.is_left_son(node, parent):
parent.color = Color.RED
sibling.color = Color.BLACK
sibling = self.sibling(node) # must be black
# sibling must be black now
if not self.is_black(parent) and self.is_black(sibling.left) and \
parent.color = Color.BLACK
sibling.color = Color.RED
if self.is_black(parent) and self.is_black(sibling.left) and \
sibling.color = Color.RED
return self._delete_adjust(parent)
if self.is_left_son(node, parent):
if not self.is_black(sibling.left) and \
sibling.left.color = Color.BLACK
sibling.color = Color.RED
sibling = sibling.parent
# sibling.right must be red
sibling.color = parent.color
parent.color = Color.BLACK
sibling.right.color = Color.BLACK
if not self.is_black(sibling.right) and \
sibling.right.color = Color.BLACK
sibling.color = Color.RED
sibling = sibling.parent
# sibling.left must be red
sibling.color = parent.color
parent.color = Color.BLACK
sibling.left.color = Color.RED
def left_rotate(self, node):
right_son = node.right
if not node.parent:
self._root = right_son
elif self.is_left_son(node, node.parent):
node.parent.left = right_son
node.parent.right = right_son
right_son.parent = node.parent
node.parent = right_son
node.right = right_son.left
right_son.left = node
def right_rotate(self, node):
left_son = node.left
if not node.parent:
self._root = left_son
elif self.is_left_son(node, node.parent):
node.parent.left = left_son
node.parent.right = left_son
left_son.parent = node.parent
node.parent = left_son
node.left = left_son.right
left_son.right = node
def sibling(node):
if node.parent.left == node:
return node.parent.right
return node.parent.left
def is_left_son(child, parent):
if parent.left == child:
return True
return False
def find_rightest(node):
while node.right:
node = node.right
return node
def is_black(node):
return node_color(node) == Color.BLACK
def node_color(node):
if not node:
return Color.BLACK
return node.color
def in_order(root):
ret = []
if not root:
return []
return in_order(root.left) + [root.value] + in_order(root.right)
def height(root):
if not root:
return 0
return 1 + max(height(root.left), height(root.right))
if __name__ == '__main__':
# Source: https://raw.githubusercontent.com/WKPlus/rangedict/master/rangedict.py
__all__ = ['RangeDict']
class Color(object):
RED = 1
class Node(object):
__slots__ = ('r', 'left', 'right', 'value', 'color', 'parent')
def __init__(self, r, value, parent=None, color=Color.RED):
self.r = r
self.value = value
self.parent = parent
self.color = color
self.left = None
self.right = None
def value_copy(self, other):
self.r = other.r
self.value = other.value
class RangeDict(dict):
def __init__(self):
self._root = None
def __setitem__(self, r, v):
if r[1] < r[0]:
raise KeyError
node = self._insert(r, v)
def _insert(self, r, v):
if not self._root:
self._root = Node(r, v)
return self._root
cur = self._root
while True:
if r[1] < cur.r[0]:
if not cur.left:
cur.left = Node(r, v, cur)
return cur.left
cur = cur.left
elif r[0] > cur.r[1]:
if not cur.right:
cur.right = Node(r, v, cur)
return cur.right
cur = cur.right
raise KeyError # overlap not supported
def _insert_adjust(self, node):
''' adjust to make the tree still a red black tree '''
if not node.parent:
node.color = Color.BLACK
if node.parent.color == Color.BLACK:
uncle = self.sibling(node.parent)
if node_color(uncle) == Color.RED:
node.parent.color = Color.BLACK
uncle.color = Color.BLACK
node.parent.parent.color = Color.RED
return self._insert_adjust(node.parent.parent)
#parent is red and uncle is black
# since parent is red, grandparent must exists and be black
parent = node.parent
grandparent = parent.parent
if self.is_left_son(parent, grandparent):
if self.is_left_son(node, parent):
grandparent.color = Color.RED
parent.color = Color.BLACK
grandparent.color = Color.RED
node.color = Color.BLACK
if self.is_left_son(node, parent):
grandparent.color = Color.RED
node.color = Color.BLACK
grandparent.color = Color.RED
parent.color = Color.BLACK
def _find_key(self, key):
cur = self._root
while cur:
if key > cur.r[1]:
cur = cur.right
elif key < cur.r[0]:
cur = cur.left
return cur
def _find_range(self, r):
cur = self._root
while cur:
if r[1] < cur.r[0]:
cur = cur.left
elif r[0] > cur.r[1]:
cur = cur.right
elif r[0] == cur.r[0] and r[1] == cur.r[1]:
return cur
raise KeyError
raise KeyError
def __getitem__(self, key):
tar = self._find_key(key)
if tar:
return tar.value
raise KeyError
def __contains__(self, key):
return bool(self._find_key(key))
def __delitem__(self, r):
node = self._find_range(r)
if node.left and node.right:
left_rightest_child = self.find_rightest(node.left)
node = left_rightest_child
def _delete(self, node):
# node has at most one child
child = node.left if node.left else node.right
if not node.parent: # node is root
self._root = child
if self._root:
self._root.parent = None
self._root.color = Color.BLACK
parent = node.parent
if not child:
child = Node(None, None, parent, Color.BLACK)
if self.is_left_son(node, parent):
parent.left = child
parent.right = child
child.parent = parent
if node.color == Color.RED:
# no need to adjust when deleting a red node
if node_color(child) == Color.RED:
child.color = Color.BLACK
if not child.r:
# mock a None node for adjust, need to delete it after that
parent = child.parent
if self.is_left_son(child, parent):
parent.left = None
parent.right = None
def _delete_adjust(self, node):
if not node.parent:
node.color = Color.BLACK
parent = node.parent
sibling = self.sibling(node)
if node_color(sibling) == Color.RED:
if self.is_left_son(node, parent):
parent.color = Color.RED
sibling.color = Color.BLACK
sibling = self.sibling(node) # must be black
# sibling must be black now
if not self.is_black(parent) and self.is_black(sibling.left) and \
parent.color = Color.BLACK
sibling.color = Color.RED
if self.is_black(parent) and self.is_black(sibling.left) and \
sibling.color = Color.RED
return self._delete_adjust(parent)
if self.is_left_son(node, parent):
if not self.is_black(sibling.left) and \
sibling.left.color = Color.BLACK
sibling.color = Color.RED
sibling = sibling.parent
# sibling.right must be red
sibling.color = parent.color
parent.color = Color.BLACK
sibling.right.color = Color.BLACK
if not self.is_black(sibling.right) and \
sibling.right.color = Color.BLACK
sibling.color = Color.RED
sibling = sibling.parent
# sibling.left must be red
sibling.color = parent.color
parent.color = Color.BLACK
sibling.left.color = Color.RED
def left_rotate(self, node):
right_son = node.right
if not node.parent:
self._root = right_son
elif self.is_left_son(node, node.parent):
node.parent.left = right_son
node.parent.right = right_son
right_son.parent = node.parent
node.parent = right_son
node.right = right_son.left
right_son.left = node
def right_rotate(self, node):
left_son = node.left
if not node.parent:
self._root = left_son
elif self.is_left_son(node, node.parent):
node.parent.left = left_son
node.parent.right = left_son
left_son.parent = node.parent
node.parent = left_son
node.left = left_son.right
left_son.right = node
def sibling(node):
if node.parent.left == node:
return node.parent.right
return node.parent.left
def is_left_son(child, parent):
if parent.left == child:
return True
return False
def find_rightest(node):
while node.right:
node = node.right
return node
def is_black(node):
return node_color(node) == Color.BLACK
def node_color(node):
if not node:
return Color.BLACK
return node.color
def in_order(root):
ret = []
if not root:
return []
return in_order(root.left) + [root.value] + in_order(root.right)
def height(root):
if not root:
return 0
return 1 + max(height(root.left), height(root.right))
if __name__ == '__main__':
@ -1,7 +1,7 @@
## This file is part of the libsigrokdecode project.
## Copyright (C) 2011-2016 Uwe Hermann <uwe@hermann-uwe.de>
## Copyright (C) 2011-2020 Uwe Hermann <uwe@hermann-uwe.de>
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
@ -18,16 +18,14 @@
import sigrokdecode as srd
import re
from common.srdhelper import SrdIntEnum
from .lists import *
L = len(cmds)
# Don't forget to keep this in sync with 'cmds' is lists.py.
class Ann:
BIT, FIELD, WARN = range(L + 3)
a = [re.sub('\/', '_', c[0]).replace('2READ', 'READ2X') for c in cmds.values()] + ['BIT', 'FIELD', 'WARN']
Ann = SrdIntEnum.from_list('Ann', a)
def cmd_annotation_classes():
return tuple([tuple([cmd[0].lower(), cmd[1]]) for cmd in cmds.values()])
@ -73,9 +71,9 @@ def decode_status_reg(data):
class Decoder(srd.Decoder):
api_version = 3
id = 'spiflash'
name = 'SPI flash'
longname = 'SPI flash chips'
desc = 'xx25 series SPI (NOR) flash chip protocol.'
name = 'SPI flash/EEPROM'
longname = 'SPI flash/EEPROM chips'
desc = 'xx25 series SPI (NOR) flash/EEPROM chip protocol.'
license = 'gplv2+'
inputs = ['spi']
outputs = []
@ -164,17 +162,15 @@ class Decoder(srd.Decoder):
self.addr |= (mosi << ((4 - self.cmdstate) * 8))
b = ((3 - (self.cmdstate - 2)) * 8) - 1
['Address bits %d..%d: {$}' % (b, b - 7),
'Addr bits %d..%d: {$}' % (b, b - 7),
'Addr bits %d..%d' % (b, b - 7),
'A%d..A%d' % (b, b - 7),
'@%02X' % mosi
['Address bits %d..%d: 0x%02x' % (b, b - 7, mosi),
'Addr bits %d..%d: 0x%02x' % (b, b - 7, mosi),
'Addr bits %d..%d' % (b, b - 7), 'A%d..A%d' % (b, b - 7)]])
if self.cmdstate == 2:
self.ss_field = self.ss
if self.cmdstate == 4:
self.es_field = self.es
self.putf([Ann.FIELD, ['Address: {$}', 'Addr: {$}', '{$}', '@%06x' % self.addr]])
self.putf([Ann.FIELD, ['Address: 0x%06x' % self.addr,
'Addr: 0x%06x' % self.addr, '0x%06x' % self.addr]])
def handle_wren(self, mosi, miso):
self.putx([Ann.WREN, self.cmd_ann_list()])
@ -190,14 +186,14 @@ class Decoder(srd.Decoder):
elif self.cmdstate == 2:
# Byte 2: Slave sends the JEDEC manufacturer ID.
self.putx([Ann.FIELD, ['Manufacturer ID: {$}', '@%02x' % miso]])
self.putx([Ann.FIELD, ['Manufacturer ID: 0x%02x' % miso]])
elif self.cmdstate == 3:
# Byte 3: Slave sends the memory type.
self.putx([Ann.FIELD, ['Memory type: {$}', '@%02x' % miso]])
self.putx([Ann.FIELD, ['Memory type: 0x%02x' % miso]])
elif self.cmdstate == 4:
# Byte 4: Slave sends the device ID.
self.device_id = miso
self.putx([Ann.FIELD, ['Device ID: {$}', '@%02x' % miso]])
self.putx([Ann.FIELD, ['Device ID: 0x%02x' % miso]])
if self.cmdstate == 4:
self.es_cmd = self.es
@ -318,7 +314,7 @@ class Decoder(srd.Decoder):
# Bytes 2/3/4: Master sends read address (24bits, MSB-first).
elif self.cmdstate == 5:
self.putx([Ann.BIT, ['Dummy byte: {$}', '@%02x' % mosi]])
self.putx([Ann.BIT, ['Dummy byte: 0x%02x' % mosi]])
elif self.cmdstate >= 6:
# Bytes 6-x: Master reads data bytes (until CS# de-asserted).
self.es_field = self.es # Will be overwritten for each byte.
@ -348,7 +344,7 @@ class Decoder(srd.Decoder):
# Byte 5: Dummy byte. Also handle byte 4 (address LSB) here.
self.cmdstate = 5
self.putx([Ann.BIT, ['Dummy byte: {$}', '@%02x' % b2]])
self.putx([Ann.BIT, ['Dummy byte: 0x%02x' % b2]])
elif self.cmdstate >= 6:
# Bytes 6-x: Master reads data bytes (until CS# de-asserted).
self.es_field = self.es # Will be overwritten for each byte.
@ -370,7 +366,7 @@ class Decoder(srd.Decoder):
self.es_field = self.es
if self.cmdstate == 2:
self.ss_field = self.ss
self.putx([Ann.BIT, ['Status register byte %d: {$}' % ((self.cmdstate % 2) + 1, '@%02x' % miso)]])
self.putx([Ann.BIT, ['Status register byte %d: 0x%02x' % ((self.cmdstate % 2) + 1, miso)]])
self.cmdstate += 1
# TODO: Warn/abort if we don't see the necessary amount of bytes.
@ -386,8 +382,8 @@ class Decoder(srd.Decoder):
if self.cmdstate == 4:
self.es_cmd = self.es
d = ['Erase sector %d ({$})' % self.addr, '@%06x' % self.addr]
self.putc([Ann.SE, d])
d = 'Erase sector %d (0x%06x)' % (self.addr, self.addr)
self.putc([Ann.SE, [d]])
# TODO: Max. size depends on chip, check that too if possible.
if self.addr % 4096 != 0:
# Sector addresses must be 4K-aligned (same for all 3 chips).
@ -439,7 +435,7 @@ class Decoder(srd.Decoder):
elif self.cmdstate in (2, 3, 4):
# Bytes 2/3/4: Master sends three dummy bytes.
self.putx([Ann.FIELD, ['Dummy byte: {$}', '@%02x' % mosi]])
self.putx([Ann.FIELD, ['Dummy byte: %02x' % mosi]])
elif self.cmdstate == 5:
# Byte 5: Slave sends device ID.
self.es_cmd = self.es
@ -456,7 +452,7 @@ class Decoder(srd.Decoder):
elif self.cmdstate in (2, 3):
# Bytes 2/3: Master sends two dummy bytes.
self.putx([Ann.FIELD, ['Dummy byte: {$}', '@%02X' % mosi]])
self.putx([Ann.FIELD, ['Dummy byte: 0x%02x' % mosi]])
elif self.cmdstate == 4:
# Byte 4: Master sends 0x00 or 0x01.
# 0x00: Master wants manufacturer ID as first reply byte.
@ -468,12 +464,12 @@ class Decoder(srd.Decoder):
# Byte 5: Slave sends manufacturer ID (or device ID).
self.ids = [miso]
d = 'Manufacturer' if self.manufacturer_id_first else 'Device'
self.putx([Ann.FIELD, ['%s ID: {$}' % d, '@%02X' % miso]])
self.putx([Ann.FIELD, ['%s ID: 0x%02x' % (d, miso)]])
elif self.cmdstate == 6:
# Byte 6: Slave sends device ID (or manufacturer ID).
d = 'Device' if self.manufacturer_id_first else 'Manufacturer'
self.putx([Ann.FIELD, ['%s ID: {$}' % d, '@%02X' % miso]])
self.putx([Ann.FIELD, ['%s ID: 0x%02x' % (d, miso)]])
if self.cmdstate == 6:
id_ = self.ids[1] if self.manufacturer_id_first else self.ids[0]
@ -514,7 +510,8 @@ class Decoder(srd.Decoder):
s = ''.join(map(chr, self.data))
self.putf([Ann.FIELD, ['%s (%d bytes)' % (label, len(self.data))]])
self.putc([idx, ['%s (addr {$}, %d bytes): %s' % (cmds[self.state][1], len(self.data), s), '@%06x' % self.addr]])
self.putc([idx, ['%s (addr 0x%06x, %d bytes): %s' % \
(cmds[self.state][1], self.addr, len(self.data), s)]])
def decode(self, ss, es, data):
ptype, mosi, miso = data
@ -536,5 +533,5 @@ class Decoder(srd.Decoder):
self.cmd_handlers[self.state](mosi, miso)
except KeyError:
self.putx([Ann.BIT, ['Unknown command: {$}', '@%02x' % mosi]])
self.putx([Ann.BIT, ['Unknown command: 0x%02x' % mosi]])
self.state = None
@ -21,6 +21,14 @@
import sigrokdecode as srd
def logic_channels(num_channels):
l = []
for i in range(num_channels):
l.append(tuple(['p%d' % i, 'P-port input/output %d' % i]))
return tuple(l)
class Decoder(srd.Decoder):
api_version = 3
id = 'tca6408a'
@ -34,8 +42,9 @@ class Decoder(srd.Decoder):
annotations = (
('register', 'Register type'),
('value', 'Register value'),
('warnings', 'Warning messages'),
('warning', 'Warning'),
logic_output_channels = logic_channels(NUM_OUTPUT_CHANNELS)
annotation_rows = (
('regs', 'Registers', (0, 1)),
('warnings', 'Warnings', (2,)),
@ -48,17 +57,33 @@ class Decoder(srd.Decoder):
self.state = 'IDLE'
self.chip = -1
self.logic_output_es = 0
self.logic_value = 0
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
self.out_logic = self.register(srd.OUTPUT_LOGIC)
def flush(self):
def putx(self, data):
self.put(self.ss, self.es, self.out_ann, data)
def put_logic_states(self):
if (self.es > self.logic_output_es):
data = bytes([self.logic_value])
self.put(self.logic_output_es, self.es, self.out_logic, [0, data])
self.logic_output_es = self.es
def handle_reg_0x00(self, b):
self.putx([1, ['State of inputs: %02X' % b]])
def handle_reg_0x01(self, b):
self.putx([1, ['Outputs set: %02X' % b ]])
self.putx([1, ['Outputs set: %02X' % b]])
self.logic_value = b
def handle_reg_0x02(self, b):
self.putx([1, ['Polarity inverted: %02X' % b]])
@ -120,16 +120,22 @@ class Decoder(srd.Decoder):
license = 'gplv2+'
inputs = ['usb_packet']
outputs = ['usb_request']
options = (
{'id': 'in_request_start', 'desc': 'Start IN requests on',
'default': 'submit', 'values': ('submit', 'first-ack'), 'idn':'dec_usb_request_opt_in_request_start'},
tags = ['PC']
annotations = (
('request-setup-read', 'Setup: Device-to-host'),
('request-setup-write', 'Setup: Host-to-device'),
('request-bulk-read', 'Bulk: Device-to-host'),
('request-bulk-write', 'Bulk: Host-to-device'),
('errors', 'Unexpected packets'),
('error', 'Unexpected packet'),
annotation_rows = (
('request', 'USB requests', tuple(range(4))),
('request-setup', 'USB SETUP', (0, 1)),
('request-in', 'USB BULK IN', (2,)),
('request-out', 'USB BULK OUT', (3,)),
('errors', 'Errors', (4,)),
binary = (
@ -178,6 +184,7 @@ class Decoder(srd.Decoder):
def start(self):
self.out_binary = self.register(srd.OUTPUT_BINARY)
self.out_ann = self.register(srd.OUTPUT_ANN)
self.in_request_start = self.options['in_request_start']
def handle_transfer(self):
request_started = 0
@ -195,7 +202,7 @@ class Decoder(srd.Decoder):
if not (addr, ep) in self.request:
self.request[(addr, ep)] = {'setup_data': [], 'data': [],
'type': None, 'ss': self.ss_transaction, 'es': None,
'id': self.request_id, 'addr': addr, 'ep': ep}
'ss_data': None, 'id': self.request_id, 'addr': addr, 'ep': ep}
self.request_id += 1
request_started = 1
request = self.request[(addr,ep)]
@ -207,11 +214,14 @@ class Decoder(srd.Decoder):
# BULK or INTERRUPT transfer
if request['type'] in (None, 'BULK IN') and self.transaction_type == 'IN':
request['type'] = 'BULK IN'
if len(request['data']) == 0 and len(self.transaction_data) > 0:
request['ss_data'] = self.ss_transaction
request['data'] += self.transaction_data
self.handle_request(request_started, request_end)
elif request['type'] in (None, 'BULK OUT') and self.transaction_type == 'OUT':
request['type'] = 'BULK OUT'
request['data'] += self.transaction_data
if self.handshake == 'ACK':
request['data'] += self.transaction_data
self.handle_request(request_started, request_end)
@ -231,7 +241,8 @@ class Decoder(srd.Decoder):
request['data'] += self.transaction_data
elif request['type'] == 'SETUP OUT' and self.transaction_type == 'OUT':
request['data'] += self.transaction_data
if self.handshake == 'ACK':
request['data'] += self.transaction_data
if request['wLength'] == len(request['data']):
self.handle_request(1, 0)
@ -275,7 +286,9 @@ class Decoder(srd.Decoder):
addr = self.transaction_addr
request = self.request[(addr, ep)]
ss, es = request['ss'], request['es']
ss, es, ss_data = request['ss'], request['es'], request['ss_data']
if self.in_request_start == 'submit':
ss_data = ss
if request_start == 1:
# Issue PCAP 'SUBMIT' packet.
@ -292,7 +305,7 @@ class Decoder(srd.Decoder):
elif request['type'] == 'SETUP OUT':
self.putr(ss, es, [1, ['SETUP out: %s' % summary]])
elif request['type'] == 'BULK IN':
self.putr(ss, es, [2, ['BULK in: %s' % summary]])
self.putr(ss_data, es, [2, ['BULK in: %s' % summary]])
elif request['type'] == 'BULK OUT':
self.putr(ss, es, [3, ['BULK out: %s' % summary]])
@ -339,6 +352,8 @@ class Decoder(srd.Decoder):
self.es_transaction = es
self.transaction_state = 'TOKEN RECEIVED'
self.transaction_ep = ep
if ep > 0 and pname == 'IN':
self.transaction_ep = ep + 0x80
self.transaction_addr = addr
self.transaction_type = pname # IN OUT SETUP
@ -33,8 +33,12 @@ class Decoder(srd.Decoder):
outputs = []
tags = ['Networking']
annotations = (
('fieldnames-and-values', 'XFP structure field names and values'),
('fields', 'XFP structure fields'),
('field-name-and-val', 'Field name and value'),
('field-val', 'Field value'),
annotation_rows = (
('field-names-and-vals', 'Field names and values', (0,)),
('field-vals', 'Field values', (1,)),
def __init__(self):
Reference in New Issue
Block a user