# This Python file uses the following encoding: utf-8
import io
import os
from dataclasses import dataclass
from PIL.Image import WEB
import numpy as np
from bcachefs.c_bcachefs import (
PyBcachefs as _Bcachefs,
PyBcachefs_iterator as _Bcachefs_iterator,
)
EXTENT_TYPE = 0
INODE_TYPE = 1
DIRENT_TYPE = 2
DIR_TYPE = 4
FILE_TYPE = 8
[docs]@dataclass(eq=True, frozen=True)
class Extent:
"""Specify the location of an extent of a file inside the disk image
Attributes
----------
inode: int
inode of the file
file_offset: int
position of the extent in the logical file
offset: int
position inside the disk image where the extent starts
size: int
size of the extent
"""
inode: int = 0
file_offset: int = 0
offset: int = 0
size: int = 0
[docs]@dataclass(eq=True, frozen=True)
class Inode:
"""Bcachefs Inode Attributes
Attributes
----------
inode: int
inode the attributes belongs to
size: int
file size
"""
inode: int = 0
size: int = 0
[docs]@dataclass(eq=True, frozen=True)
class DirEnt:
"""Bcachefs directory entry
Attributes
----------
parent_inode: int
inode of the parent entry (directory)
inode: int
inode of the current entry
type: int
file (8) or directory (4)
name: str
name of current entry (file or directory)
"""
parent_inode: int = 0
inode: int = 0
type: int = 0
name: str = ""
@property
def is_dir(self):
return self.type == DIR_TYPE
@property
def is_file(self):
return self.type == FILE_TYPE
def __str__(self):
return self.name
ROOT_DIRENT = DirEnt(0, 4096, DIR_TYPE, "/")
LOSTFOUND_DIRENT = DirEnt(4096, 4097, DIR_TYPE, "lost+found")
class _BcachefsFileBinary(io.BufferedIOBase):
"""Python file interface for Bcachefs files
Parameters
----------
name: str
name of the file being opened
extends
list of Extent
file: file object
underlying opened disk image file
inode: int
inode of the file being opened
size: int
size of the file being opened
"""
def __init__(self, name, extents, file, inode, size):
self.name = file
self._inode = inode
self._size = size
# underlying bcachefs archive
# DO NOT close this!!
self._file = file
# sort by offset so the extents are always in the right order
sorted(extents, key=lambda extent: extent.file_offset)
self._extents = extents
self._extent_pos = 0 # current extent being read
self._extent_read = (
0 # offset pointing to the unread part of the current extend
)
self._pos = 0 # absolute position inside the file
def reset(self):
"""Reset internal state to point to the begining of the file"""
self._extent_pos = 0
self._extend_read = 0
self._pos = 0
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
pass
@property
def closed(self) -> bool:
"""return true if we finished reading the current file
Notes
-----
You can reuse the same file multiple time by calling `reset`
"""
return self._extent_pos >= len(self._extents)
def fileno(self) -> int:
"""returns the inode of the file inside bcachefs"""
return self._inode
def read(self, n=-1) -> bytes:
"""Read at most n bytes
Parameters
----------
n: int
max size that can be read if -1 all the file is read
"""
if n == -1:
return self.readall()
buffer = np.empty(n, dtype="<u1")
view = memoryview(buffer)
size = self.readinto(view)
return bytes(buffer[:size])
def read1(self, size: int) -> bytes:
"""Read at most size bytes with at most one call to the underlying stream"""
buffer = np.empty(size, dtype="<u1")
view = memoryview(buffer)
size = self.readinto1(view)
return bytes(buffer[:size])
def readall(self) -> bytes:
"""Most efficient way to read a file, single allocation"""
buffer = np.empty(self._size, dtype="<u1")
memory = memoryview(buffer)
for extent in self._extents:
s = extent.file_offset
e = s + extent.size
self._file.seek(extent.offset)
self._file.readinto(memory[s:e])
return bytes(buffer)
def readinto1(self, b: memoryview) -> int:
"""Read at most one extend
Notes
-----
The size of the buffer is not checked against the extent size,
this means we could possibly read beyond the extent but the size returned
will be inside the bounds.
"""
# we ran out of extent, done
if self._extent_pos >= len(self._extents):
return 0
# continue reading the current extent
extent = self._extents[self._extent_pos]
self._file.seek(extent.offset + self._extent_read)
read = self._file.readinto(b)
self._extent_read += read
self._pos += read
# if we finished reading current extend go the the next one
if self._extent_read >= extent.size:
self._extent_pos += 1
self._extent_read = 0
# finished reading the file
if self._pos > self._size:
diff = self._pos - self._size
self._extent_pos += 1
self._extent_read = 0
return read - diff
return read
def readinto(self, b: memoryview) -> int:
"""Read until the buffer is full"""
n = len(b)
size = self.readinto1(b)
while size < n and not self.closed:
size += self.readinto1(b[size:])
return size
@property
def isatty(self):
return False
@property
def readable(self):
return not self.closed
@property
def seekable(self):
return True
def seek(self, offset, whence=io.SEEK_SET):
"""Seek a specific position inside the file"""
if whence == io.SEEK_END:
return self.seek(self._size + offset, io.SEEK_SET)
if whence == io.SEEK_CUR:
return self.seek(self._pos + offset, io.SEEK_SET)
if whence == io.SEEK_SET:
self.reset()
e = 0
for i, extent in enumerate(self._extents):
s = extent.file_offset
e = s + extent.size
if s <= offset < e:
self._extent_pos = i
self._extent_read = offset - s
self._pos = offset
break
return offset
def tell(self):
"""Returns the current possition of the file cursor"""
return self._pos
def detach(self):
raise io.UnsupportedOperation
@property
def writable(self):
return False
def writelines(self, lines):
raise io.UnsupportedOperation
def write(self, b):
raise io.UnsupportedOperation
def flush(self):
pass
[docs]class Bcachefs:
"""Opens a Bcachefs disk image for reading
Parameters
----------
path: str
path to the disk image
Examples
--------
>>> with Bcachefs(path_to_file, 'r') as image:
... with image.open('dir/subdir/file2', 'rb') as f:
... data = f.read()
... print(data.decode('utf-8'))
File content 2
<BLANKLINE>
"""
def __init__(self, path: str, mode: str = "rb"):
assert mode in ("r", "rb"), "Only reading is supported"
self._path = path
self._filesystem = None
self._size = 0
self._file: [io.RawIOBase] = None
self._closed = True
self._pwd = "/" # Used in Cursor
self._dirent = ROOT_DIRENT # Used in Cursor
self._extents_map = {}
self._inodes_ls = {ROOT_DIRENT.inode: []}
self._inodes_tree = {}
self._inode_map = {}
[docs] def open(self, name: [str, int], mode: str = "rb", encoding: str = "utf-8"):
"""Open a file inside the image for reading
Parameters
----------
name: str, int
Path to a file or inode integer
mode: str
reading mode rb (bytes)
encoding: str
string encoding to use, defaults to utf-8
Raises
------
FileNotFoundError when opening an file that does not exist
"""
inode = name
if isinstance(name, str):
dirent = self.find_dirent(name)
inode = None
if dirent is not None:
inode = dirent.inode
extents = self._extents_map.get(inode)
if extents is None:
raise FileNotFoundError(f"{name} was not found")
file_size = self._inode_map[inode]
base = _BcachefsFileBinary(name, extents, self._file, inode, file_size)
return base
[docs] def namelist(self):
"""Returns a list of files contained by this archive
Notes
-----
Added for parity with Zipfile interface
Examples
--------
>>> with Bcachefs(path_to_file, 'r') as image:
... print(image.namelist())
['file1', 'n09332890/n09332890_29876.JPEG', 'dir/subdir/file2', 'n04467665/n04467665_63788.JPEG', 'n02033041/n02033041_3834.JPEG', 'n02445715/n02445715_16523.JPEG', 'n04584207/n04584207_7936.JPEG']
"""
directories = self._inodes_ls.get(ROOT_DIRENT.inode, [])
return self._namelist("", directories)
def _namelist(self, path, directories):
names = []
for dirent in directories:
if dirent.is_dir:
children = self._inodes_ls.get(dirent.inode, [])
names.extend(
self._namelist(os.path.join(path, dirent.name), children)
)
if dirent.is_file:
names.append(os.path.join(path, dirent.name))
return names
def __enter__(self):
self._open()
return self
def __exit__(self, _type, _value, _traceback):
self.close()
def __iter__(self):
return (ent for ent in self._inodes_tree.values())
@property
def path(self) -> str:
"""Path of the current image"""
return self._path
@property
def size(self) -> int:
"""Size of the current image"""
return self._size
@property
def closed(self) -> bool:
"""Is current image closed"""
return self._closed
[docs] def cd(self, path: str = "/"):
"""Creates a cursor to a directory"""
cursor = Cursor(
self.path, self._extents_map, self._inodes_ls, self._inodes_tree
)
return cursor.cd(path)
def _open(self):
if self._closed:
self._filesystem = _Bcachefs()
self._filesystem.open(self._path)
self._size = self._filesystem.size
self._file = open(self._path, "rb")
self._closed = False
self._parse()
[docs] def close(self):
if not self._closed:
# if the object was pickled we did not need the filesystem
# to be set
if self._filesystem:
self._filesystem.close()
self._filesystem = None
self._size = 0
self._file.close()
self._file = None
self._closed = True
[docs] def find_dirent(self, path: str = None) -> DirEnt:
"""Resolve a path to its directory entry, returns none if it was not found"""
if not path:
dirent = self._dirent
else:
parts = [p for p in path.split("/") if p]
dirent = self._dirent if not path.startswith("/") else ROOT_DIRENT
while parts:
dirent = self._inodes_tree.get(
(dirent.inode, parts.pop(0)), None
)
if dirent is None:
break
return dirent
[docs] def ls(self, path: [str, DirEnt] = None):
"""Show the files inside a given directory"""
if isinstance(path, DirEnt):
parent = path
elif not path:
parent = self._dirent
else:
parent = self.find_dirent(os.path.join(self._pwd, path))
if parent.is_dir:
return self._inodes_ls[parent.inode]
else:
return [parent]
[docs] def read_file(self, inode: [str, int]) -> memoryview:
with self.open(inode) as f:
return f.readall()
[docs] def walk(self, top: str = None):
"""Traverse the file system recursively"""
if not top:
top = self._pwd
parent = self._dirent
else:
top = os.path.join(self._pwd, top)
parent = self.find_dirent(top)
if parent:
return self._walk(top, parent)
def _parse(self):
"""Generate a cache of bcachefs btrees"""
if self._extents_map:
return
for dirent in BcachefsIterDirEnt(self._filesystem):
if dirent.is_dir:
self._inodes_ls.setdefault(dirent.inode, [])
for dirent in BcachefsIterDirEnt(self._filesystem):
self._inodes_ls[dirent.parent_inode].append(dirent)
self._inodes_tree[(dirent.parent_inode, dirent.name)] = dirent
for extent in BcachefsIterExtent(self._filesystem):
self._extents_map.setdefault(extent.inode, [])
self._extents_map[extent.inode].append(extent)
for inode in BcachefsIterInode(self._filesystem):
self._inode_map[inode.inode] = inode.size
for inode, extents in self._extents_map.items():
self._extents_map[inode] = self._unique_extent_list(extents)
for parent_inode, ls in self._inodes_ls.items():
self._inodes_ls[parent_inode] = self._unique_dirent_list(ls)
def _walk(self, dirpath: str, dirent: DirEnt):
dirs = [ent for ent in self._inodes_ls[dirent.inode] if ent.is_dir]
files = [ent for ent in self._inodes_ls[dirent.inode] if not ent.is_dir]
yield dirpath, dirs, files
for d in dirs:
yield from self._walk(os.path.join(dirpath, d.name), d)
@staticmethod
def _unique_extent_list(inode_extents):
# It's possible to have multiple duplicated extents for a single inode
# and this implementation assumes that the last ones should be the
# correct ones.
unique_extent_list = []
for ent in sorted(inode_extents, key=lambda _: _.file_offset):
if ent not in unique_extent_list[-1:]:
unique_extent_list.append(ent)
return unique_extent_list
@staticmethod
def _unique_dirent_list(dirent_ls):
# It's possible to have multiple inodes for a single file and this
# implementation assumes that the last inode should be the correct one.
return list({ent.name: ent for ent in dirent_ls}.values())
def __getstate__(self):
return dict(
path=self._path,
size=self._size,
closed=self._closed,
pwd=self._pwd,
dirent=self._dirent,
extents_map=self._extents_map,
inode_ls=self._inodes_ls,
inode_tree=self._inodes_tree,
inode_map=self._inode_map,
)
def __setstate__(self, state):
self._path = state["path"]
self._size = state["size"]
self._closed = state["closed"]
if not self._closed:
self._file = open(self._path, "rb")
self._filesystem = None
self._pwd = state["pwd"]
self._dirent = state["dirent"]
self._extents_map = state["extents_map"]
self._inodes_ls = state["inode_ls"]
self._inodes_tree = state["inode_tree"]
self._inode_map = state["inode_map"]
[docs]class Cursor(Bcachefs):
def __init__(
self,
path: [str, Bcachefs],
extents_map: dict,
inodes_ls: dict,
inodes_tree: dict,
):
if isinstance(path, str):
super(Cursor, self).__init__(path)
else:
path: Bcachefs
super(Cursor, self).__init__(path.path)
self._extents_map = extents_map
self._inodes_ls = inodes_ls
self._inodes_tree = inodes_tree
self._is_owner = False
def __iter__(self):
for _, dirs, files in self.walk():
for d in dirs:
yield d
for f in files:
yield f
@property
def pwd(self):
return self._pwd
[docs] def cd(self, path: str = "/"):
if not path:
path = "/"
_path = path
elif path.startswith(".."):
pwd = self._pwd.split("/")
path = path.split("/")
while pwd and path and path[0] == "..":
pwd.pop()
path.pop(0)
pwd = "/".join(pwd)
if not pwd:
pwd = "/"
path = os.path.join(pwd, *path)
_path = path
else:
_path = os.path.join(self._pwd, path)
dirent = self.find_dirent(path)
if dirent and dirent.is_dir:
self._pwd = _path
self._dirent = dirent
return self
else:
return None
[docs]class BcachefsIter:
def __init__(self, fs: _Bcachefs, t: int = DIRENT_TYPE):
self._iter: _Bcachefs_iterator = fs.iter(t)
def __iter__(self):
return self
def __next__(self):
item = self._iter.next()
if item is None:
raise StopIteration
return item
[docs]class BcachefsIterExtent(BcachefsIter):
"""Iterates over bcachefs extend btree"""
def __init__(self, fs: _Bcachefs):
super(BcachefsIterExtent, self).__init__(fs, EXTENT_TYPE)
def __next__(self):
return Extent(*super(BcachefsIterExtent, self).__next__())
[docs]class BcachefsIterInode(BcachefsIter):
"""Iterates over bcachefs inode btree"""
def __init__(self, fs: _Bcachefs):
super(BcachefsIterInode, self).__init__(fs, INODE_TYPE)
def __next__(self):
return Inode(*super(BcachefsIterInode, self).__next__())
[docs]class BcachefsIterDirEnt(BcachefsIter):
"""Iterates over bcachefs dirent btree"""
def __init__(self, fs: _Bcachefs):
super(BcachefsIterDirEnt, self).__init__(fs, DIRENT_TYPE)
def __next__(self):
return DirEnt(*super(BcachefsIterDirEnt, self).__next__())