Files
Jonathan Tinney 7971961166
Some checks failed
Analyze modified files / flake8 (push) Failing after 2m28s
Build / build-win (push) Has been cancelled
Build / build-ubuntu2204 (push) Has been cancelled
ctest / Test C++ ubuntu-latest (push) Has been cancelled
ctest / Test C++ windows-latest (push) Has been cancelled
Analyze modified files / mypy (push) Has been cancelled
Build and Publish Docker Images / Push Docker image to Docker Hub (push) Successful in 5m4s
Native Code Static Analysis / scan-build (push) Failing after 5m2s
type check / pyright (push) Successful in 1m7s
unittests / Test Python 3.11.2 ubuntu-latest (push) Failing after 16m23s
unittests / Test Python 3.12 ubuntu-latest (push) Failing after 28m19s
unittests / Test Python 3.13 ubuntu-latest (push) Failing after 14m49s
unittests / Test hosting with 3.13 on ubuntu-latest (push) Successful in 5m0s
unittests / Test Python 3.13 macos-latest (push) Has been cancelled
unittests / Test Python 3.11 windows-latest (push) Has been cancelled
unittests / Test Python 3.13 windows-latest (push) Has been cancelled
add schedule I, sonic 1/frontiers/heroes, spirit island
2026-04-02 23:46:36 -07:00

294 lines
10 KiB
Python

"""Apply BPS patches to files."""
# This file was created by Sopoforic/Vidua for creating a BPS patch.
# For portability we have put this directly in our codebase.
import logging
import zlib
from io import BytesIO
from typing import BinaryIO
from enum import Enum
from typing import BinaryIO
logger = logging.getLogger(__name__)
class PatchType(Enum):
"""An enumeration of the types of patches that can be applied."""
IPS = 1
BPS = 2
def identify(patch: BinaryIO):
"""Identify the type of a patch.
:param patch: the patch file
"""
patch.seek(0)
if patch.read(4) == b"BPS1":
return PatchType.BPS
patch.seek(0)
if patch.read(5) == b"PATCH":
return PatchType.IPS
return None
def copy_bytes(from_buffer: BinaryIO, to_buffer: BinaryIO, length: int, size: int = 2**8):
"""Copy bytes from one buffer to another.
:param from_buffer: the source buffer
:param to_buffer: the destination buffer
:param length: the total number of bytes to copy
:param size: the number of bytes to copy at once
"""
while length > 0:
length -= to_buffer.write(from_buffer.read(min(length, size)))
def decode_number(bps_patch: BinaryIO) -> int:
"""Return the next number in ``bps_patch``.
:param bps_patch: the patch file
"""
data = 0
shift = 1
while True:
pos = bps_patch.tell()
end = bps_patch.seek(0, 2)
if pos > end - 12:
raise ValueError("Invalid number encoding.")
bps_patch.seek(pos)
x = bps_patch.read(1)
x = ord(x)
data += (x & 0x7F) * shift
if x & 0x80:
break
else:
shift <<= 7
data += shift
return data
def patch_info(bps_patch: BinaryIO) -> dict:
"""Return a dictionary of information about the patch.
>>> patch_info(bps_patch)
{'target_size': 24,
'metadata': b'',
'target_checksum': 2648610592,
'source_checksum': 3418748557,
'source_size': 37}
:param bps_patch: the patch file
"""
info = {}
bps_patch.seek(4)
info["source_size"] = decode_number(bps_patch)
info["target_size"] = decode_number(bps_patch)
metadata_size = decode_number(bps_patch)
info["metadata"] = bps_patch.read(metadata_size)
bps_patch.seek(-12, 2)
info["source_checksum"] = int.from_bytes(bps_patch.read(4), byteorder="little")
info["target_checksum"] = int.from_bytes(bps_patch.read(4), byteorder="little")
bps_patch.seek(0)
return info
def validate_patch(bps_patch: BinaryIO):
"""Verify that ``bps_patch`` is a valid BPS patch.
If the patch is valid, return. If the patch is invalid, raise a
``ValueError`` describing the problem.
:param bps_patch: the patch file
"""
bps_patch.seek(0)
if bps_patch.read(4) != b"BPS1":
raise ValueError("Invalid file format marker.")
patch_end = bps_patch.seek(0, 2)
if patch_end < 19:
raise ValueError("Patch too short.")
bps_patch.seek(0)
calculated_checksum = zlib.crc32(bps_patch.read(patch_end - 4))
checksum = int.from_bytes(bps_patch.read(4), byteorder="little")
if calculated_checksum != checksum:
raise ValueError("Invalid checksum. Stored checksum {:X}, actual checksum {:X}.".format(checksum, calculated_checksum))
bps_patch.seek(4)
try:
source_size = decode_number(bps_patch)
logger.debug("Source size: 0x%x", source_size)
except ValueError:
raise ValueError("Failed to decode source size.")
try:
target_size = decode_number(bps_patch)
logger.debug("Target size: 0x%x", target_size)
except ValueError:
raise ValueError("Failed to decode target size.")
try:
metadata_size = decode_number(bps_patch)
logger.debug("Metadata size: 0x%x", metadata_size)
except ValueError:
raise ValueError("Failed to decode metadata size.")
if metadata_size + bps_patch.tell() > patch_end - 12:
raise ValueError("Metadata size too large.")
bps_patch.seek(metadata_size, 1)
source_position = 0
target_position = 0
outread_position = 0
while bps_patch.tell() < patch_end - 12:
bps_pos = bps_patch.tell()
data = decode_number(bps_patch)
command = data & 3
length = (data >> 2) + 1
error_details = """\
Offset: 0x{:x}
Command: {:d}
Length: 0x{:x}
Source position: 0x{:x}
Target position: 0x{:x}""".format(
bps_pos, command, length, source_position, target_position
)
# logger.debug("Decoded a command.\n%s", error_details)
if command == 0:
# SourceRead
target_position += length
if target_position > source_size:
raise ValueError("Attempted to read beyond end of source.\n{}".format(error_details))
if target_position > target_size:
raise ValueError("Attempted to write beyond end of target.\n{}".format(error_details))
elif command == 1:
# TargetRead
target_position += length
if target_position > target_size:
raise ValueError("Attempted to write beyond end of target.\n{}".format(error_details))
bps_patch.seek(length, 1)
if bps_patch.tell() > patch_end - 12:
raise ValueError("TargetRead length too large.\n{}".format(error_details))
elif command == 2:
# SourceCopy
copy_data = decode_number(bps_patch)
source_relative_offset = (-1 if (copy_data & 1) else 1) * (copy_data >> 1)
error_details += "\nSRO: {}".format(source_relative_offset)
if source_position + source_relative_offset < 0:
raise ValueError("Attempted to read beyond beginning of source.\n{}".format(error_details))
source_position += source_relative_offset + length
if source_position > source_size:
raise ValueError("Attempted to read beyond end of source.\n{}".format(error_details))
target_position += length
if target_position > target_size:
raise ValueError("Attempted to write beyond end of target.\n{}".format(error_details))
elif command == 3:
# TargetCopy
copy_data = decode_number(bps_patch)
target_relative_offset = (-1 if (copy_data & 1) else 1) * (copy_data >> 1)
outread_position += target_relative_offset
error_details += "\nTRO: {}".format(target_relative_offset)
if outread_position < 0:
raise ValueError("Attempted to read beyond beginning of target.\n{}".format(error_details))
if outread_position >= target_position:
raise ValueError("Attempted to read beyond end of target.\n{}".format(error_details))
target_position += length
if target_position > target_size:
raise ValueError("Attempted to write beyond end of target.\n{}".format(error_details))
outread_position += length
if target_position != target_size:
raise ValueError("Final patch size incorrect. Expected: {}. Actual: {}".format(target_size, target_position))
def validate_checksum(source: BinaryIO, bps_patch: BinaryIO):
"""Ensure that the source file matches the checksum in the patch.
If the checksum matches, return. Otherwise, raise a ``ValueError``.
:param source: the file to be patched
:param bps_patch: the patch file
"""
bps_patch.seek(-12, 2)
checksum = int.from_bytes(bps_patch.read(4), byteorder="little")
source.seek(0)
calculated_checksum = zlib.crc32(source.read())
if calculated_checksum != checksum:
raise ValueError("Incompatible source. Stored checksum {:X}, actual checksum {:X}.".format(checksum, calculated_checksum))
source.seek(0)
def patch(source: BinaryIO, bps_patch: BinaryIO) -> BinaryIO:
"""Return the patched source.
:param source: the source file to be patched
:param bps_patch: the patch file
"""
bps_patch.seek(0)
validate_patch(bps_patch)
validate_checksum(source, bps_patch)
patch_end = bps_patch.seek(0, 2)
bps_patch.seek(4)
source_size = decode_number(bps_patch) # noqa: F841
target_size = decode_number(bps_patch) # noqa: F841
metadata_size = decode_number(bps_patch)
bps_patch.seek(metadata_size, 1)
outread_position = 0
output = BytesIO()
while bps_patch.tell() < patch_end - 12:
data = decode_number(bps_patch)
command = data & 3
length = (data >> 2) + 1
if command == 0:
# SourceRead
old_source_position = source.tell()
source.seek(output.tell())
copy_bytes(source, output, length)
source.seek(old_source_position)
elif command == 1:
# TargetRead
copy_bytes(bps_patch, output, length)
elif command == 2:
# SourceCopy
copy_data = decode_number(bps_patch)
source_relative_offset = (-1 if (copy_data & 1) else 1) * (copy_data >> 1)
source.seek(source_relative_offset, 1)
copy_bytes(source, output, length)
elif command == 3:
# TargetCopy
copy_data = decode_number(bps_patch)
target_relative_offset = (-1 if (copy_data & 1) else 1) * (copy_data >> 1)
outread_position += target_relative_offset
to_go = length
while to_go > 0:
step = min(output.tell() - outread_position, 2**8)
output.seek(outread_position)
segment = output.read(min(to_go, step))
outread_position = output.tell()
output.seek(0, 2)
output.write(segment)
to_go -= step
bps_patch.seek(-8, 2)
checksum = int.from_bytes(bps_patch.read(4), byteorder="little")
output.seek(0)
calculated_checksum = zlib.crc32(output.read())
if calculated_checksum == checksum:
logger.debug("Patch applied successfully.")
else:
raise ValueError("Invalid checksum. Stored checksum {:X}, actual checksum {:X}.".format(checksum, calculated_checksum))
output.seek(0)
return output