Skip to content

Commit

Permalink
Cleanup Python3 script args options/command
Browse files Browse the repository at this point in the history
Fix in dump --speed fast
Add in dump --speed max
Add in dump Time taken & Throughput in KBytes/script
  • Loading branch information
bvernoux committed Feb 16, 2024
1 parent cc74074 commit d156961
Showing 1 changed file with 113 additions and 73 deletions.
186 changes: 113 additions & 73 deletions contrib/hydra_spiflash_nor_dump/hydra_spiflash_nor_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
#
# Author: Pedro Ribeiro <[email protected]>
# Author: Jonathan Borgeaud <[email protected]>
# Author: Benjamin Vernoux <[email protected]>
# License: GPLv3 (https://choosealicense.com/licenses/gpl-3.0/)
#
import sys
import time
import argparse
import serial

Expand All @@ -22,42 +24,67 @@ def signal_handler(signal, frame):

class HydrabusSpiFlash:

def __init__(self, serial_port):

self.top_parser = argparse.ArgumentParser(add_help=False)
self.top_parser.add_argument('--spi',
default=1,
help='Set the SPI port to SPI1 or SPI2',
metavar='{1,2}')
def __init__(self):

parser = argparse.ArgumentParser(
parents=[self.top_parser],
description='Tool to query NOR memory flash with Hydrabus',
epilog='This script requires python 3.7+, and serial',
usage='''hydra_spiflash_nor_dump.py <command> [<args>] [OPTIONS]
Commands are:
get_chip_id Return the chip identification
dump <dump_file> <n_4k_sectors> <hex_address> Dump the flash in <dump_file> starting at <hex_address>
''')

parser.add_argument('command',
help='dump to dump the flash or get_chip_id')

args_spi = parser.parse_args()
args = parser.parse_args(sys.argv[1:2])
if not hasattr(self, args.command):
print('Unrecognized command')
parser.print_help()
sys.exit(1)

self.serial_port = serial_port
self.hydrabus = None
self.setup(args_spi.spi)

# use dispatch pattern to invoke method with same name
getattr(self, args.command)()
usage='''hydra_spiflash_nor_dump.py [options] <command> [<args>]
options:
--com_port (/dev/ttyACM0 by default)
--spi (1 for SPI1 by default) or 2 for SPI2
Commands are:
get_chip_id Return the chip identification
dump <dump_file> <n_4k_sectors> <hex_address> Dump the flash in <dump_file> starting at <hex_address>
[--speed slow](by default 320KHz) or [--speed fast](10.5MHz) or [--speed max](42MHz SPI1/21MHz SPI2)
''')

parser.add_argument("--com_port",
default="/dev/ttyACM0",
help="Specify COM port (default: /dev/ttyACM0)")
parser.add_argument("--spi",
type=int,
choices=[1, 2],
default=1,
help="Specify SPI value (1 or 2) (default: 1)")

subparsers = parser.add_subparsers(dest="command",
required=True,
help="Choose command")

dump_parser = subparsers.add_parser("dump", help="Dump command help")
dump_parser.add_argument("dump_file",
type=str,
help="Specify dump file")
dump_parser.add_argument("n_4k_sectors",
type=int,
help="Specify number of 4K sectors")
dump_parser.add_argument("hex_address",
type=str,
help="Specify hex address")
dump_parser.add_argument("--speed",
choices=["slow", "fast", "max"],
default="slow",
help="Specify speed (default: slow)")

subparsers.add_parser("get_chip_id", help="Get chip ID command help")
self.args = parser.parse_args()

self.setup()

if self.args.command == "dump":
print("Dumping with the following options:")
print("COM Port:", self.args.com_port, "SPI:", self.args.spi,
"SPI Speed:", self.args.speed, "Dump File:",
self.args.dump_file, "Number of 4K Sectors:",
self.args.n_4k_sectors, "Hex Address:",
self.args.hex_address)
self.dump()
elif self.args.command == "get_chip_id":
print("Getting chip ID with the following options:")
print("COM Port:", self.args.com_port, "SPI:", self.args.spi)
self.get_chip_id()
self.cleanup()

def error(self, message):
print(message)
Expand All @@ -71,10 +98,10 @@ def calc_hex_addr(self, addr, add, addr_len):
byte_arr = self.hex_to_bin(addr + add, addr_len)
return byte_arr

def setup(self, spi):
def setup(self):

# Open serial port
self.hydrabus = serial.Serial(self.serial_port, 115200)
self.hydrabus = serial.Serial(self.args.com_port, 115200)

# Open binary mode
for _ in range(20):
Expand All @@ -90,7 +117,7 @@ def setup(self, spi):
if b"SPI1" not in self.hydrabus.read(4):
self.error("Cannot set SPI mode, try again or reset hydrabus.")

if spi == 1:
if self.args.spi == 1:
# Configure SPI port (default polarity and clock phase, SPI1 device)
self.hydrabus.write(b'\x81')
else:
Expand Down Expand Up @@ -120,53 +147,54 @@ def get_chip_id(self):
self.error('Oups something went wrong...')

chip_id = self.hydrabus.read(3)
if int.from_bytes(chip_id) == 0:
print('Cannot read get chip ID')

if int.from_bytes(chip_id) == 0 or int.from_bytes(chip_id) == 0xFFFFFF:
print(f"Chip ID: 0x{chip_id.hex()}")
print('Cannot read get chip ID (invalid chip ID) exit')
self.cleanup()
sys.exit(-1)
else:
print(f"Chip ID: 0x{chip_id.hex()}")
print("Finished get_chip_id function.")
self.cleanup()
sys.exit(0)

def dump(self):
parser = argparse.ArgumentParser(description='Dump the NOR flash')

parser.add_argument('dump_file')
parser.add_argument('n_4k_sectors', type=int)
parser.add_argument('hex_address', help='Starting address')
parser.add_argument('--speed',
action='store',
help='Slow(320Khz) or fast(10.5Mhz)')
args = parser.parse_args(sys.argv[2:])
if '0x' in args.hex_address:
self.get_chip_id()
if '0x' in self.args.hex_address:
# we get hexadecimal address, need convert
start_address = int(args.hex_address, 16)
else:
start_address = int(args.hex_address)

if args.speed == 'slow':
# Set spi speed to 320 Khz
self.hydrabus.write(b'\x60')
start_address = int(self.args.hex_address, 16)
else:
# Set spi speed to 10.5 mHz, a conservative fast speed
self.hydrabus.write(b'\x61')
print("Using fast mode (10.5 mHz) to dump the chip.")
start_address = int(self.args.hex_address)

if self.args.speed == 'slow':
# Set spi speed 'slow' to 320kHz
if self.args.spi == 1:
self.hydrabus.write(b'\x60')
else: # SPI2
self.hydrabus.write(b'\x61')
elif self.args.speed == 'fast':
# Set spi speed 'fast' to 10.5MHz
if self.args.spi == 1:
self.hydrabus.write(b'\x65')
else: # SPI2
self.hydrabus.write(b'\x66')
elif self.args.speed == 'max':
# Set spi speed 'max' 42MHz SPI1 / 21MHz SPI2
self.hydrabus.write(b'\x67')
if b'\x01' not in self.hydrabus.read(1):
self.error("Cannot set SPI speed, try again or reset hydrabus.")

print('Starting to read chip...')
print('Reading ' + str(args.n_4k_sectors) + ' sectors')
print('Reading ' + str(self.args.n_4k_sectors) + ' sectors')

sector = 0
total_bytes_read = 0
start_time = time.time()

with open(args.dump_file, 'wb+') as dst:

print(f"sectors: {args.n_4k_sectors}: {SECTORE_SIZE}")
if args.n_4k_sectors * SECTORE_SIZE > 0xffffff:
with open(self.args.dump_file, 'wb+') as dst:
print(f"sectors: {self.args.n_4k_sectors}: {SECTORE_SIZE}")
if self.args.n_4k_sectors * SECTORE_SIZE > 0xffffff:
print("Size is bigger than 3bytes address, using 0x13 command")
# size is bigger than 3bytes address so use the 0x13 command
while sector < args.n_4k_sectors:
while sector < self.args.n_4k_sectors:
# write-then-read: write 5 bytes
# (1 read cmd + 4 read addr), read SECTORE_SIZE bytes
self.hydrabus.write(b'\x04\x00\x05' +
Expand All @@ -181,12 +209,14 @@ def dump(self):
self.error("Cannot read at address...")

# ...followed by SECTORE_SIZE read bytes
dst.write(self.hydrabus.read(SECTORE_SIZE))
print('Read sector ' + str(sector))
data = self.hydrabus.read(SECTORE_SIZE)
dst.write(data)
total_bytes_read += len(data)
print('Read sector ' + str(sector), end='\r')
sector += 1
else:
# the size is less than 3bytes address so use the 0x03 command
while sector < args.n_4k_sectors:
while sector < self.args.n_4k_sectors:
# write-then-read: write 4 bytes
# (1 read cmd + 3 read addr), read SECTORE_SIZE bytes
self.hydrabus.write(b'\x04\x00\x04' +
Expand All @@ -201,11 +231,21 @@ def dump(self):
self.error("Cannot read at address...")

# ...followed by SECTORE_SIZE read bytes
dst.write(self.hydrabus.read(SECTORE_SIZE))
print('Read sector ' + str(sector))
data = self.hydrabus.read(SECTORE_SIZE)
dst.write(data)
total_bytes_read += len(data)
print('Read sector ' + str(sector), end='\r')
sector += 1

print('Finished dumping to ' + args.dump_file)
end_time = time.time()
elapsed_time = end_time - start_time

# Calculate throughput in KBytes/s
throughput_kbytes_persec = (total_bytes_read / elapsed_time) / 1024

print('Finished dumping to ' + self.args.dump_file)
print(f'Time taken: {elapsed_time:.2f} seconds')
print(f'Throughput: {throughput_kbytes_persec:.2f} KBytes/s')
self.cleanup()


Expand All @@ -214,4 +254,4 @@ def dump(self):
# disable this for debugging
# signal.signal(signal.SIGINT, signal_handler)

hydra = HydrabusSpiFlash("/dev/ttyACM0")
hydra = HydrabusSpiFlash()

0 comments on commit d156961

Please sign in to comment.