mirror of
https://github.com/DarkFlippers/unleashed-firmware
synced 2024-11-23 04:53:08 +00:00
fcbcb6b5a8
* toolbox: compress: moved decompressor implementation to separate func * toolbox: compress: callback-based api; cli: storage unpack command * toolbox: compress: separate r/w contexts for stream api * targets: f18: sync API * compress: naming fixes & cleanup * toolbox: compress: using hs buffer size for stream buffers * toolbox: tar: heatshrink stream mode * toolbox: compress: docs & small cleanup * toolbox: tar: header support for .hs; updater: now uses .hs for resources; .hs.tar: now rewindable * toolbox: compress: fixed hs stream tail handling * updater: reworked progress for resources cleanup; rebalanced stage weights * updater: single-pass decompression; scripts: print resources compression ratio * updater: fixed warnings * toolbox: tar: doxygen * docs: update * docs: info or tarhs format; scripts: added standalone compression/decompression tool for heatshrink-formatted streams * scripts: tarhs: fixed parameter handling * cli: storage extract command; toolbox: tar: guess type based on extension * unit_tests: added test for streamed raw hs decompressor `compress_decode_streamed` * unit_tests: compress: added extraction test for .tar.hs * rpc: autodetect compressed archives * scripts: minor cleanup of common parts * scripts: update: now using in-memory intermediate tar stream * scripts: added hs.py wrapper for heatshrink-related ops (single object and directory-as-tar compression) * scripts: naming fixes * Toolbox: export compress_config_heatshrink_default as const symbol * Toolbox: fix various types naming * Toolbox: more of types naming fixes * Toolbox: use size_t in compress io callbacks and structures * UnitTests: update to match new compress API * Toolbox: proper path_extract_extension usage Co-authored-by: あく <alleteam@gmail.com>
145 lines
4.5 KiB
Python
Executable file
145 lines
4.5 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import heatshrink2 as hs
|
|
from flipper.app import App
|
|
from flipper.assets.heatshrink_stream import HeatshrinkDataStreamHeader
|
|
from flipper.assets.tarball import compress_tree_tarball
|
|
|
|
|
|
class HSWrapper(App):
|
|
DEFAULT_WINDOW = 13
|
|
DEFAULT_LOOKAHEAD = 6
|
|
|
|
def init(self):
|
|
self.subparsers = self.parser.add_subparsers(
|
|
title="subcommands", dest="subcommand"
|
|
)
|
|
|
|
self.parser_compress = self.subparsers.add_parser(
|
|
"compress", help="compress file using heatshrink"
|
|
)
|
|
self.parser_compress.add_argument(
|
|
"-w", "--window", help="window size", type=int, default=self.DEFAULT_WINDOW
|
|
)
|
|
self.parser_compress.add_argument(
|
|
"-l",
|
|
"--lookahead",
|
|
help="lookahead size",
|
|
type=int,
|
|
default=self.DEFAULT_LOOKAHEAD,
|
|
)
|
|
self.parser_compress.add_argument("file", help="file to compress")
|
|
self.parser_compress.add_argument(
|
|
"-o", "--output", help="output file", required=True
|
|
)
|
|
self.parser_compress.set_defaults(func=self.compress)
|
|
|
|
self.parser_decompress = self.subparsers.add_parser(
|
|
"decompress", help="decompress file using heatshrink"
|
|
)
|
|
self.parser_decompress.add_argument("file", help="file to decompress")
|
|
self.parser_decompress.add_argument(
|
|
"-o", "--output", help="output file", required=True
|
|
)
|
|
self.parser_decompress.set_defaults(func=self.decompress)
|
|
|
|
self.parser_info = self.subparsers.add_parser("info", help="show file info")
|
|
self.parser_info.add_argument("file", help="file to show info for")
|
|
self.parser_info.set_defaults(func=self.info)
|
|
|
|
self.parser_tar = self.subparsers.add_parser(
|
|
"tar", help="create a tarball and compress it"
|
|
)
|
|
self.parser_tar.add_argument("dir", help="directory to tar")
|
|
self.parser_tar.add_argument(
|
|
"-o", "--output", help="output file", required=True
|
|
)
|
|
self.parser_tar.add_argument(
|
|
"-w", "--window", help="window size", type=int, default=self.DEFAULT_WINDOW
|
|
)
|
|
self.parser_tar.add_argument(
|
|
"-l",
|
|
"--lookahead",
|
|
help="lookahead size",
|
|
type=int,
|
|
default=self.DEFAULT_LOOKAHEAD,
|
|
)
|
|
self.parser_tar.set_defaults(func=self.tar)
|
|
|
|
def compress(self):
|
|
args = self.args
|
|
|
|
with open(args.file, "rb") as f:
|
|
data = f.read()
|
|
|
|
compressed = hs.compress(
|
|
data, window_sz2=args.window, lookahead_sz2=args.lookahead
|
|
)
|
|
|
|
with open(args.output, "wb") as f:
|
|
header = HeatshrinkDataStreamHeader(args.window, args.lookahead)
|
|
f.write(header.pack())
|
|
f.write(compressed)
|
|
|
|
self.logger.info(
|
|
f"Compressed {len(data)} bytes to {len(compressed)} bytes, "
|
|
f"compression ratio: {len(compressed) * 100 / len(data):.2f}%"
|
|
)
|
|
return 0
|
|
|
|
def decompress(self):
|
|
args = self.args
|
|
|
|
with open(args.file, "rb") as f:
|
|
header = HeatshrinkDataStreamHeader.unpack(f.read(7))
|
|
compressed = f.read()
|
|
|
|
self.logger.info(
|
|
f"Decompressing with window size {header.window_size} and lookahead size {header.lookahead_size}"
|
|
)
|
|
|
|
data = hs.decompress(
|
|
compressed,
|
|
window_sz2=header.window_size,
|
|
lookahead_sz2=header.lookahead_size,
|
|
)
|
|
|
|
with open(args.output, "wb") as f:
|
|
f.write(data)
|
|
|
|
self.logger.info(f"Decompressed {len(compressed)} bytes to {len(data)} bytes")
|
|
return 0
|
|
|
|
def info(self):
|
|
args = self.args
|
|
|
|
try:
|
|
with open(args.file, "rb") as f:
|
|
header = HeatshrinkDataStreamHeader.unpack(f.read(7))
|
|
except Exception as e:
|
|
self.logger.error(f"Error: {e}")
|
|
return 1
|
|
|
|
self.logger.info(
|
|
f"Window size: {header.window_size}, lookahead size: {header.lookahead_size}"
|
|
)
|
|
|
|
return 0
|
|
|
|
def tar(self):
|
|
args = self.args
|
|
|
|
orig_size, compressed_size = compress_tree_tarball(
|
|
args.dir, args.output, hs_window=args.window, hs_lookahead=args.lookahead
|
|
)
|
|
|
|
self.logger.info(
|
|
f"Tarred and compressed {orig_size} bytes to {compressed_size} bytes, "
|
|
f"compression ratio: {compressed_size * 100 / orig_size:.2f}%"
|
|
)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
HSWrapper()()
|