gibMacOS/Scripts/disk.py
2018-10-06 22:07:05 -05:00

436 lines
No EOL
16 KiB
Python

import subprocess, plistlib, sys, os, time, json
sys.path.append(os.path.abspath(os.path.dirname(os.path.realpath(__file__))))
import run
class Disk:
def __init__(self):
self.r = run.Run()
self.diskutil = self.get_diskutil()
self.os_version = ".".join(
self.r.run({"args":["sw_vers", "-productVersion"]})[0].split(".")[:2]
)
self.apfs = {}
self._update_disks()
def _get_plist(self, s):
p = {}
try:
if sys.version_info >= (3, 0):
p = plistlib.loads(s.encode("utf-8"))
else:
p = plistlib.readPlistFromString(s.encode("utf-8"))
except:
pass
return p
def _compare_versions(self, vers1, vers2):
# Helper method to compare ##.## strings
#
# vers1 < vers2 = True
# vers1 = vers2 = None
# vers1 > vers2 = False
#
try:
v1_parts = vers1.split(".")
v2_parts = vers2.split(".")
except:
# Formatted wrong - return None
return None
for i in range(len(v1_parts)):
if int(v1_parts[i]) < int(v2_parts[i]):
return True
elif int(v1_parts[i]) > int(v2_parts[i]):
return False
# Never differed - return None, must be equal
return None
def update(self):
self._update_disks()
def _update_disks(self):
self.disks = self.get_disks()
self.disk_text = self.get_disk_text()
if self._compare_versions("10.12", self.os_version):
self.apfs = self.get_apfs()
else:
self.apfs = {}
def get_diskutil(self):
# Returns the path to the diskutil binary
return self.r.run({"args":["which", "diskutil"]})[0].split("\n")[0].split("\r")[0]
def get_disks(self):
# Returns a dictionary object of connected disks
disk_list = self.r.run({"args":[self.diskutil, "list", "-plist"]})[0]
return self._get_plist(disk_list)
def get_disk_text(self):
# Returns plain text listing connected disks
return self.r.run({"args":[self.diskutil, "list"]})[0]
def get_disk_info(self, disk):
disk_id = self.get_identifier(disk)
if not disk_id:
return None
disk_list = self.r.run({"args":[self.diskutil, "info", "-plist", disk_id]})[0]
return self._get_plist(disk_list)
def get_disk_fs(self, disk):
disk_id = self.get_identifier(disk)
if not disk_id:
return None
return self.get_disk_info(disk_id).get("FilesystemName", None)
def get_disk_fs_type(self, disk):
disk_id = self.get_identifier(disk)
if not disk_id:
return None
return self.get_disk_info(disk_id).get("FilesystemType", None)
def get_apfs(self):
# Returns a dictionary object of apfs disks
output = self.r.run({"args":"echo y | " + self.diskutil + " apfs list -plist", "shell" : True})
if not output[2] == 0:
# Error getting apfs info - return an empty dict
return {}
disk_list = output[0]
p_list = disk_list.split("<?xml")
if len(p_list) > 1:
# We had text before the start - get only the plist info
disk_list = "<?xml" + p_list[-1]
return self._get_plist(disk_list)
def is_apfs(self, disk):
disk_id = self.get_identifier(disk)
if not disk_id:
return None
# Takes a disk identifier, and returns whether or not it's apfs
for d in self.disks.get("AllDisksAndPartitions", []):
if not "APFSVolumes" in d:
continue
if d.get("DeviceIdentifier", "").lower() == disk_id.lower():
return True
for a in d.get("APFSVolumes", []):
if a.get("DeviceIdentifier", "").lower() == disk_id.lower():
return True
return False
def is_apfs_container(self, disk):
disk_id = self.get_identifier(disk)
if not disk_id:
return None
# Takes a disk identifier, and returns whether or not that specific
# disk/volume is an APFS Container
for d in self.disks.get("AllDisksAndPartitions", []):
# Only check partitions
for p in d.get("Partitions", []):
if disk_id.lower() == p.get("DeviceIdentifier", "").lower():
return p.get("Content", "").lower() == "apple_apfs"
return False
def is_cs_container(self, disk):
disk_id = self.get_identifier(disk)
if not disk_id:
return None
# Takes a disk identifier, and returns whether or not that specific
# disk/volume is an CoreStorage Container
for d in self.disks.get("AllDisksAndPartitions", []):
# Only check partitions
for p in d.get("Partitions", []):
if disk_id.lower() == p.get("DeviceIdentifier", "").lower():
return p.get("Content", "").lower() == "apple_corestorage"
return False
def is_core_storage(self, disk):
disk_id = self.get_identifier(disk)
if not disk_id:
return None
if self._get_physical_disk(disk_id, "Logical Volume on "):
return True
return False
def get_identifier(self, disk):
# Should be able to take a mount point, disk name, or disk identifier,
# and return the disk's identifier
# Iterate!!
if not disk or not len(str(disk)):
return None
disk = disk.lower()
if disk.startswith("/dev/r"):
disk = disk[len("/dev/r"):]
elif disk.startswith("/dev/"):
disk = disk[len("/dev/"):]
if disk in self.disks.get("AllDisks", []):
return disk
for d in self.disks.get("AllDisksAndPartitions", []):
for a in d.get("APFSVolumes", []):
if disk in [ a.get(x, "").lower() for x in ["DeviceIdentifier", "VolumeName", "VolumeUUID", "DiskUUID", "MountPoint"] ]:
return a.get("DeviceIdentifier", None)
for a in d.get("Partitions", []):
if disk in [ a.get(x, "").lower() for x in ["DeviceIdentifier", "VolumeName", "VolumeUUID", "DiskUUID", "MountPoint"] ]:
return a.get("DeviceIdentifier", None)
# At this point, we didn't find it
return None
def get_top_identifier(self, disk):
disk_id = self.get_identifier(disk)
if not disk_id:
return None
return disk_id.replace("disk", "didk").split("s")[0].replace("didk", "disk")
def _get_physical_disk(self, disk, search_term):
# Change disk0s1 to disk0
our_disk = self.get_top_identifier(disk)
our_term = "/dev/" + our_disk
found_disk = False
our_text = ""
for line in self.disk_text.split("\n"):
if line.lower().startswith(our_term):
found_disk = True
continue
if not found_disk:
continue
if line.lower().startswith("/dev/disk"):
# At the next disk - bail
break
if search_term.lower() in line.lower():
our_text = line
break
if not len(our_text):
# Nothing found
return None
our_stores = "".join(our_text.strip().split(search_term)[1:]).split(" ,")
if not len(our_stores):
return None
for store in our_stores:
efi = self.get_efi(store)
if efi:
return store
return None
def get_physical_store(self, disk):
# Returns the physical store containing the EFI
disk_id = self.get_identifier(disk)
if not disk_id:
return None
if not self.is_apfs(disk_id):
return None
return self._get_physical_disk(disk_id, "Physical Store ")
def get_core_storage_pv(self, disk):
# Returns the core storage physical volume containing the EFI
disk_id = self.get_identifier(disk)
if not disk_id:
return None
if not self.is_core_storage(disk_id):
return None
return self._get_physical_disk(disk_id, "Logical Volume on ")
def get_parent(self, disk):
# Disk can be a mount point, disk name, or disk identifier
disk_id = self.get_identifier(disk)
if self.is_apfs(disk_id):
disk_id = self.get_physical_store(disk_id)
elif self.is_core_storage(disk_id):
disk_id = self.get_core_storage_pv(disk_id)
if not disk_id:
return None
if self.is_apfs(disk_id):
# We have apfs - let's get the container ref
for a in self.apfs.get("Containers", []):
# Check if it's the whole container
if a.get("ContainerReference", "").lower() == disk_id.lower():
return a["ContainerReference"]
# Check through each volume and return the parent's container ref
for v in a.get("Volumes", []):
if v.get("DeviceIdentifier", "").lower() == disk_id.lower():
return a.get("ContainerReference", None)
else:
# Not apfs - go through all volumes and whole disks
for d in self.disks.get("AllDisksAndPartitions", []):
if d.get("DeviceIdentifier", "").lower() == disk_id.lower():
return d["DeviceIdentifier"]
for p in d.get("Partitions", []):
if p.get("DeviceIdentifier", "").lower() == disk_id.lower():
return d["DeviceIdentifier"]
# Didn't find anything
return None
def get_efi(self, disk):
disk_id = self.get_parent(self.get_identifier(disk))
if not disk_id:
return None
# At this point - we should have the parent
for d in self.disks["AllDisksAndPartitions"]:
if d.get("DeviceIdentifier", "").lower() == disk_id.lower():
# Found our disk
for p in d.get("Partitions", []):
if p.get("Content", "").lower() == "efi":
return p.get("DeviceIdentifier", None)
return None
def mount_partition(self, disk, sudo=False):
disk_id = self.get_identifier(disk)
if not disk_id:
return None
if sudo:
# We need to create a new folder, then mount it manually
fst = self.get_disk_fs_type(disk_id)
if not fst:
# No detected fs
return None
vn = self.get_volume_name(disk_id)
if vn == "":
vn = "Untitled"
# Get safe volume name
if os.path.exists(os.path.join("/Volumes", vn)):
num = 1
while True:
if os.path.exists(os.path.join("/Volumes", vn + " " + str(num))):
num += 1
continue
break
vn = vn + " " + str(num)
# Create the dir, then mount
out = self.r.run([
{"args":["mkdir", os.path.join("/Volumes", vn)], "sudo":True, "show":True},
{"args":["mount", "-t", fst, "/dev/"+disk_id, os.path.join("/Volumes", vn)], "sudo":True, "show":True}
], True)
self._update_disks()
if len(out) and type(out[0]) is tuple:
out = out[-1] # Set out to the last output
if out[2] != 0:
# Non-zero exit code, clean up our mount point - if it exists
if os.path.exists(os.path.join("/Volumes", vn)):
self.r.run({"args":["rm", "-Rf", os.path.join("/Volumes", vn)], "sudo":True})
return out
out = self.r.run({"args":[self.diskutil, "mount", disk_id]})
self._update_disks()
return out
def unmount_partition(self, disk):
disk_id = self.get_identifier(disk)
if not disk_id:
return None
out = self.r.run({"args":[self.diskutil, "unmount", disk_id]})
self._update_disks()
return out
def is_mounted(self, disk):
disk_id = self.get_identifier(disk)
if not disk_id:
return None
m = self.get_mount_point(disk_id)
return (m != None and len(m))
def get_volumes(self):
# Returns a list object with all volumes from disks
return self.disks.get("VolumesFromDisks", [])
def _get_value_apfs(self, disk, field, default = None):
return self._get_value(disk, field, default, True)
def _get_value(self, disk, field, default = None, apfs_only = False):
disk_id = self.get_identifier(disk)
if not disk_id:
return None
# Takes a disk identifier, and returns the requested value
for d in self.disks.get("AllDisksAndPartitions", []):
for a in d.get("APFSVolumes", []):
if a.get("DeviceIdentifier", "").lower() == disk_id.lower():
return a.get(field, default)
if apfs_only:
# Skip looking at regular partitions
continue
if d.get("DeviceIdentifier", "").lower() == disk_id.lower():
return d.get(field, default)
for a in d.get("Partitions", []):
if a.get("DeviceIdentifier", "").lower() == disk_id.lower():
return a.get(field, default)
return None
# Getter methods
def get_content(self, disk):
return self._get_value(disk, "Content")
def get_volume_name(self, disk):
return self._get_value(disk, "VolumeName")
def get_volume_uuid(self, disk):
return self._get_value(disk, "VolumeUUID")
def get_disk_uuid(self, disk):
return self._get_value(disk, "DiskUUID")
def get_mount_point(self, disk):
return self._get_value(disk, "MountPoint")
def open_mount_point(self, disk, new_window = False):
disk_id = self.get_identifier(disk)
if not disk_id:
return None
mount = self.get_mount_point(disk_id)
if not mount:
return None
out = self.r.run({"args":["open", mount]})
return out[2] == 0
def get_mounted_volumes(self):
# Returns a list of mounted volumes
vol_list = self.r.run({"args":["ls", "-1", "/Volumes"]})[0].split("\n")
vol_list = [ x for x in vol_list if x != "" ]
return vol_list
def get_mounted_volume_dicts(self):
# Returns a list of dicts of name, identifier, mount point dicts
vol_list = []
for v in self.get_mounted_volumes():
i = self.get_identifier(os.path.join("/Volumes", v))
if i == None:
i = self.get_identifier("/")
if not self.get_volume_name(i) == v:
# Not valid and not our boot drive
continue
vol_list.append({
"name" : self.get_volume_name(i),
"identifier" : i,
"mount_point" : self.get_mount_point(i),
"disk_uuid" : self.get_disk_uuid(i),
"volume_uuid" : self.get_volume_uuid(i)
})
return vol_list
def get_disks_and_partitions_dict(self):
# Returns a list of dictionaries like so:
# { "disk0" : { "partitions" : [
# {
# "identifier" : "disk0s1",
# "name" : "EFI",
# "mount_point" : "/Volumes/EFI"
# }
# ] } }
disks = {}
for d in self.disks.get("AllDisks", []):
# Get the parent and make sure it has an entry
parent = self.get_parent(d)
top_disk = self.get_top_identifier(d)
if top_disk == d and not self.is_core_storage(d):
# Top level, skip
continue
# Not top level - make sure it's not an apfs container or core storage container
if self.is_apfs_container(d):
continue
if self.is_cs_container(d):
continue
if not parent in disks:
disks[parent] = { "partitions" : [] }
disks[parent]["partitions"].append({
"name" : self.get_volume_name(d),
"identifier" : d,
"mount_point" : self.get_mount_point(d),
"disk_uuid" : self.get_disk_uuid(d),
"volume_uuid" : self.get_volume_uuid(d)
})
return disks