import os import io import json import wwise import tempfile import wavescan import platform import subprocess from vfs import decrypt from mapper import Mapper from allocator import Allocator from filereader import FileReader cwd = os.getcwd() path = lambda *args: os.path.join(*args) def call(args): try: subprocess.call(args, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) except Exception as e: print(f"[WARNING] failed to extract, {e}") class WwiseExtract: def __init__(self): self.allocator = Allocator() self.hdiff_dir = None self.maps = {} ### loading files ### def load_map(self, _map): map_name = _map.split(".")[0] if map_name not in self.maps or self.maps[map_name] is None: print("Map load required !") mapper = Mapper(path(cwd, f"maps/{_map}")) self.maps[map_name] = mapper else: print("Mapping already loaded, skipping") return self.maps[map_name] def load_folder(self, _map, files, diff_path, base_path, progress): self.progress = progress self.steps = 1 self.mapper = None if _map is not None: self.mapper = self.load_map(_map) self.file_structure = {"folders": {}, "files": []} hdiff_files = [] if diff_path != "": hdiff_files = [f for f in os.listdir(diff_path) if f.endswith(".pck.hdiff")] # TODO: hdiff mode will only use .hdiff files and ignore .pck even in the update folder, i need to implement it, eventually # remove alone pck / hdiff base_files = [os.path.basename(f) for f in files] hdiff_files = [f for f in hdiff_files if os.path.basename(f.replace(".hdiff", "")) in base_files] base_hfiles = [os.path.basename(f) for f in hdiff_files] files = [f for f in files if f"{os.path.basename(f)}.hdiff" in base_hfiles] if len(files) == 0: return None pos = 0 print(f"\nLoading {len(files)} files...") for file in files: pos += 1 self.update_progress(pos, len(files), 1) hdiff = None if f"{os.path.basename(file)}.hdiff" in hdiff_files: hdiff = path(diff_path, hdiff_files[hdiff_files.index(f"{os.path.basename(file)}.hdiff")]) self.load_file(file, hdiff, base_path) return self.file_structure def load_file(self, _input, hdiff, base_path): with open(_input, "rb") as f: data = f.read() f.close() self.get_wems(data, os.path.basename(_input), hdiff, os.path.relpath(_input, start=base_path)) def get_wems(self, data, filename, hdiff, relpath): files = wavescan.get_data(data, filename) if hdiff is not None: with open(hdiff, "rb") as f: hdiff_data = f.read() f.close() hdiff_files, data = self.get_hdiff_files(data, hdiff_data, filename) files = self.compare_diff(files, hdiff_files) self.map_names(files, filename, relpath, hdiff is not None, data) def compare_diff(self, old, new): old_dict = {file[0]:file[2] for file in old} new_files = [file for file in new if file[0] not in list(old_dict.keys())] changed_files = [file for file in new if file[0] in list(old_dict.keys()) and file[2] != old_dict[file[0]]] return [new_files, changed_files] def get_hdiff_files(self, data, hdiff_data, source_name): working_dir = tempfile.TemporaryDirectory() if self.hdiff_dir is None: self.hdiff_dir = tempfile.TemporaryDirectory() with open(path(working_dir.name, "source.pck"), "wb") as f: f.write(data) f.close() with open(path(working_dir.name, "patch.pck.hdiff"), "wb") as f: f.write(hdiff_data) f.close() args = [ path(cwd, "tools/hpatchz/hpatchz.exe"), "-f", path(working_dir.name, "source.pck"), path(working_dir.name, "patch.pck.hdiff"), path(working_dir.name, "patch.pck") ] if platform.system() != "Windows": args.insert(0, "wine") call(args) if not os.path.exists(path(working_dir.name, "patch.pck")): print(f"[ERROR] failed to patch {source_name}, skipping") return [] with open(path(working_dir.name, "patch.pck"), "rb") as f: data = f.read() f.close() with open(path(self.hdiff_dir.name, source_name), "wb") as f: f.write(data) f.close() files = wavescan.get_data(data, source_name) working_dir.cleanup() return files, data def map_names(self, files, filename, relpath, hdiff=False, data=None, skip_source=True): # disable skip source if required mapper = self.mapper base = self.file_structure if hdiff: old_files = files filename = f"{filename} (hdiff)" files = [*files[0], *files[1]] # in case of manual use of mapping, use this # load json here # handle = open("banks.json", "r") # banks = json.loads(handle.read()) # handle.close() def process_file(file): if mapper is not None: key = mapper.get_key(file[0].split(".")[0]) # and override the method with a manual dict lookup # _id = file[0].split(".")[0] # if _id in list(banks["banks"].keys()): # key = [banks["banks"][_id], ""] else: key = None file_data = { "source": relpath, "size": file[2], "offset": file[1], "original_name": file[0], "metadata": {} } wem_data = data[file_data["offset"]:file_data["offset"]+file_data["size"]] parsed_wem = wwise.parse_wwise(wem_data, f"{file[3]}:{file[0]}:{file[1]}", file[0]) if not parsed_wem: return file_data["metadata"] = parsed_wem if key is not None: if hdiff: if file in old_files[0]: key[0] = f"new_files\\{key[0]}" else: key[0] = f"changed_files\\{key[0]}" parts = f"{filename}\\{key[0]}.wem".split("\\") if skip_source: parts = parts[1:] self.add_to_structure(parts, file_data) else: temp = base["folders"] if not skip_source: if filename not in temp: temp[filename] = {"folders": {}, "files": []} temp = temp[filename]["folders"] if hdiff: if file in old_files[0]: if "new_files" not in temp: temp["new_files"] = {"folders": {}, "files": []} temp = temp["new_files"]["folders"] if file in old_files[1]: if "changed_files" not in temp: temp["changed_files"] = {"folders": {}, "files": []} temp = temp["changed_files"]["folders"] if "unmapped" not in temp: temp["unmapped"] = {"folders": {}, "files": []} temp["unmapped"]["files"].append([file[0], file_data]) pos = 0 for file in files: process_file(file) pos += 1 self.update_progress(pos, len(files), 1) self.file_structure = base def add_to_structure(self, parts, meta): current_level = self.file_structure for part in parts[:-1]: if "folders" not in current_level: current_level["folders"] = {} if part not in current_level["folders"]: current_level["folders"][part] = {"folders": {}, "files": []} current_level = current_level["folders"][part] if "files" not in current_level: current_level["files"] = [] current_level["files"].append([parts[-1], meta]) ### extracting files ### def extract_files(self, _input, files, output, _format, progress): temp_dir = tempfile.TemporaryDirectory() self.progress = progress self.steps = { "wem": 1, "wav": 2, "mp3": 3, "ogg": 3 }[_format] # wem if _format == "wem": output_folder = output else: output_folder = path(temp_dir.name, "wem") self.extract_wem(_input, files, output_folder) if _format == "wem": temp_dir.cleanup() return # wav new_input = output_folder files = [path("/".join(file["path"]), file["name"]) for file in files] if _format == "wav": output_folder = output else: output_folder = path(temp_dir.name, "wav") self.extract_wav(new_input, files, output_folder) if _format == "wav": temp_dir.cleanup() return # mp3 & ogg files = [path(os.path.dirname(file), f'{os.path.basename(file).split(".")[0]}.wav') for file in files] new_input = output_folder output_folder = output self.extract_ffmpeg(new_input, files, output_folder, _format) temp_dir.cleanup() return def extract_wem(self, _input, files, output): print(": Extracting audio as wem") all_sources = list(set([e["source"] for e in files])) pos = 0 for source in all_sources: # load source load_path = path(_input, source) if self.hdiff_dir is not None: source = source.split(" (hdiff)")[0] hdiff_path = path(self.hdiff_dir.name, source) if os.path.isfile(hdiff_path): load_path = hdiff_path self.allocator.load_file(load_path, source) # extract every file from this one for file in [file for file in files if file["source"] == source]: pos += 1 self.update_progress(pos, len(files), 1) file["source"] = file["source"].split(" (hdiff)")[0] data = self.allocator.read_at(file["source"], file["offset"], file["size"]) if data[0:4] not in [b"RIFF", b"RIFX"]: # file may be vfs encrypted data = bytearray(data) wem_id = 0 try: wem_id = int(file["original_name"][:-4]) except ValueError: try: wem_id = int(file["original_name"][:-4], 16) except ValueError: continue decrypt(data, 0, len(data), wem_id, 0) if data[0:4] not in [b"RIFF", b"RIFX"]: continue filepath = path("/".join(file["path"]), file["name"]) fullpath = path(output, filepath) os.makedirs(os.path.dirname(fullpath), exist_ok=True) with open(fullpath, "wb") as f: f.write(data) f.close() # unload source self.allocator.unload_file(source) # security self.allocator.free_mem() def extract_wav(self, _input, files, output): print(": Converting audio to wav") pos = 0 for file in files: pos += 1 self.update_progress(pos, len(files), 2) filename = f'{os.path.basename(file).split(".")[0]}.wav' filepath = path(output, os.path.dirname(file), filename) os.makedirs(os.path.dirname(filepath), exist_ok=True) args = [ path(cwd, "tools/vgmstream/vgmstream-cli.exe"), "-o", filepath, path(_input, file) ] if platform.system() != "Windows": args.insert(0, "wine") call(args) def extract_ffmpeg(self, _input, files, output, _format): print(f": Converting audio to {_format}") encoders = { "mp3": "libmp3lame", "ogg": "libvorbis" } encoder = encoders[_format] pos = 0 for file in files: pos += 1 self.update_progress(pos, len(files), 3) filename = f'{os.path.basename(file).split(".")[0]}.{_format}' filepath = path(output, os.path.dirname(file), filename) os.makedirs(os.path.dirname(filepath), exist_ok=True) args = [ path(cwd, "tools/ffmpeg/ffmpeg.exe"), "-i", path(_input, file), "-acodec", encoder, "-b:a", "192k", # 192|4 filepath ] if platform.system() != "Windows": args.insert(0, "wine") call(args) ### other ### def update_progress(self, current, total, step): base = 100 / self.steps self.progress(["total", current * base / total + base * (step - 1)]) self.progress(["file", current * 100 / total]) def reset(self): self.mapper = None for e in self.maps.values(): e.reset() self.maps.clear() self.allocator.free_mem() if self.hdiff_dir is not None: self.hdiff_dir.cleanup() self.hdiff_dir = None