import subprocess, plistlib, sys, os, time, json sys.path.append(os.path.abspath(os.path.dirname(os.path.realpath(__file__)))) import run class Disk: def __init__(self): self.r = run.Run() self.diskutil = self.get_diskutil() self.os_version = ".".join( self.r.run({"args":["sw_vers", "-productVersion"]})[0].split(".")[:2] ) self.full_os_version = self.r.run({"args":["sw_vers", "-productVersion"]})[0] if len(self.full_os_version.split(".")) < 3: # Add .0 in case of 10.14 self.full_os_version += ".0" self.sudo_mount_version = "10.13.6" self.sudo_mount_types = ["efi"] self.apfs = {} self._update_disks() def _get_str(self, val): # Helper method to return a string value based on input type if (sys.version_info < (3,0) and isinstance(val, (str, unicode))) or (sys.version_info >= (3,0) and isinstance(val, str)): return val return str(val) def _get_plist(self, s): p = {} try: if sys.version_info >= (3, 0): p = plistlib.loads(s.encode("utf-8")) else: p = plistlib.readPlistFromString(s.encode("utf-8")) except: pass return p def _compare_versions(self, vers1, vers2, pad = -1): # Helper method to compare ##.## strings # # vers1 < vers2 = True # vers1 = vers2 = None # vers1 > vers2 = False # # Must be separated with a period # Sanitize the pads pad = -1 if not type(pad) is int else pad # Cast as strings vers1 = str(vers1) vers2 = str(vers2) # Split to lists v1_parts = vers1.split(".") v2_parts = vers2.split(".") # Equalize lengths if len(v1_parts) < len(v2_parts): v1_parts.extend([str(pad) for x in range(len(v2_parts) - len(v1_parts))]) elif len(v2_parts) < len(v1_parts): v2_parts.extend([str(pad) for x in range(len(v1_parts) - len(v2_parts))]) # Iterate and compare for i in range(len(v1_parts)): # Remove non-numeric v1 = ''.join(c for c in v1_parts[i] if c.isdigit()) v2 = ''.join(c for c in v2_parts[i] if c.isdigit()) # If empty - make it a pad var v1 = pad if not len(v1) else v1 v2 = pad if not len(v2) else v2 # Compare if int(v1) < int(v2): return True elif int(v1) > int(v2): return False # Never differed - return None, must be equal return None def update(self): self._update_disks() def _update_disks(self): self.disks = self.get_disks() self.disk_text = self.get_disk_text() if self._compare_versions("10.12", self.os_version): self.apfs = self.get_apfs() else: self.apfs = {} def get_diskutil(self): # Returns the path to the diskutil binary return self.r.run({"args":["which", "diskutil"]})[0].split("\n")[0].split("\r")[0] def get_disks(self): # Returns a dictionary object of connected disks disk_list = self.r.run({"args":[self.diskutil, "list", "-plist"]})[0] return self._get_plist(disk_list) def get_disk_text(self): # Returns plain text listing connected disks return self.r.run({"args":[self.diskutil, "list"]})[0] def get_disk_info(self, disk): disk_id = self.get_identifier(disk) if not disk_id: return None disk_list = self.r.run({"args":[self.diskutil, "info", "-plist", disk_id]})[0] return self._get_plist(disk_list) def get_disk_fs(self, disk): disk_id = self.get_identifier(disk) if not disk_id: return None return self.get_disk_info(disk_id).get("FilesystemName", None) def get_disk_fs_type(self, disk): disk_id = self.get_identifier(disk) if not disk_id: return None return self.get_disk_info(disk_id).get("FilesystemType", None) def get_apfs(self): # Returns a dictionary object of apfs disks output = self.r.run({"args":"echo y | " + self.diskutil + " apfs list -plist", "shell" : True}) if not output[2] == 0: # Error getting apfs info - return an empty dict return {} disk_list = output[0] p_list = disk_list.split(" 1: # We had text before the start - get only the plist info disk_list = "