import subprocess, plistlib, sys, os, time, json sys.path.append(os.path.abspath(os.path.dirname(os.path.realpath(__file__)))) import run class Disk: def __init__(self): self.r = run.Run() self.diskutil = self.get_diskutil() self.os_version = ".".join( self.r.run({"args":["sw_vers", "-productVersion"]})[0].split(".")[:2] ) self.apfs = {} self._update_disks() def _get_plist(self, s): p = {} try: if sys.version_info >= (3, 0): p = plistlib.loads(s.encode("utf-8")) else: p = plistlib.readPlistFromString(s.encode("utf-8")) except: pass return p def _compare_versions(self, vers1, vers2): # Helper method to compare ##.## strings # # vers1 < vers2 = True # vers1 = vers2 = None # vers1 > vers2 = False # try: v1_parts = vers1.split(".") v2_parts = vers2.split(".") except: # Formatted wrong - return None return None for i in range(len(v1_parts)): if int(v1_parts[i]) < int(v2_parts[i]): return True elif int(v1_parts[i]) > int(v2_parts[i]): return False # Never differed - return None, must be equal return None def update(self): self._update_disks() def _update_disks(self): self.disks = self.get_disks() self.disk_text = self.get_disk_text() if self._compare_versions("10.12", self.os_version): self.apfs = self.get_apfs() else: self.apfs = {} def get_diskutil(self): # Returns the path to the diskutil binary return self.r.run({"args":["which", "diskutil"]})[0].split("\n")[0].split("\r")[0] def get_disks(self): # Returns a dictionary object of connected disks disk_list = self.r.run({"args":[self.diskutil, "list", "-plist"]})[0] return self._get_plist(disk_list) def get_disk_text(self): # Returns plain text listing connected disks return self.r.run({"args":[self.diskutil, "list"]})[0] def get_disk_info(self, disk): disk_id = self.get_identifier(disk) if not disk_id: return None disk_list = self.r.run({"args":[self.diskutil, "info", "-plist", disk_id]})[0] return self._get_plist(disk_list) def get_disk_fs(self, disk): disk_id = self.get_identifier(disk) if not disk_id: return None return self.get_disk_info(disk_id).get("FilesystemName", None) def get_disk_fs_type(self, disk): disk_id = self.get_identifier(disk) if not disk_id: return None return self.get_disk_info(disk_id).get("FilesystemType", None) def get_apfs(self): # Returns a dictionary object of apfs disks output = self.r.run({"args":"echo y | " + self.diskutil + " apfs list -plist", "shell" : True}) if not output[2] == 0: # Error getting apfs info - return an empty dict return {} disk_list = output[0] p_list = disk_list.split(" 1: # We had text before the start - get only the plist info disk_list = "