Source code for daisy.array

from __future__ import absolute_import
from .coordinate import Coordinate
from .freezable import Freezable
from .roi import Roi
import numpy as np


[docs]class Array(Freezable): '''A ROI and voxel size annotated ndarray-like. Acts as a view into actual data. Args: data (``ndarray``-like): The data to hold. Can be a numpy, HDF5, zarr, etc. array like. Needs to have ``shape`` and slicing support for reading/writing. It is assumed that slicing returns an ``ndarray``. roi (`class:Roi`): The region of interest (ROI) represented by this array. voxel_size (`tuple`): The size of a voxel. data_offset (`tuple`, optional): The start of ``data``, in world units. Defaults to ``roi.begin``, if not given. chunk_shape (`tuple`, optional): The size of a chunk of the underlying data container in voxels. check_write_chunk_align (``bool``, optional): If true, assert that each write to this array is aligned with the chunks of the underlying array-like. ''' def __init__( self, data, roi, voxel_size, data_offset=None, chunk_shape=None, check_write_chunk_align=False): self.data = data self.roi = roi self.voxel_size = Coordinate(voxel_size) self.chunk_shape = Coordinate(chunk_shape) if chunk_shape else None self.n_channel_dims = len(data.shape) - roi.dims self.check_write_chunk_align = check_write_chunk_align assert self.voxel_size.dims == self.roi.dims, ( "dimension of voxel_size (%d) does not match dimension of roi (%d)" % (self.voxel_size.dims, self.roi.dims)) if data_offset is None: data_offset = roi.begin else: data_offset = Coordinate(data_offset) spatial_shape = self.data.shape[self.n_channel_dims:] self.data_roi = Roi( data_offset, self.voxel_size * Coordinate(spatial_shape), ) assert self.roi.begin.is_multiple_of(voxel_size), ( "roi offset %s is not a multiple of voxel size %s" % ( self.roi.begin, voxel_size)) assert self.roi.shape.is_multiple_of(voxel_size), ( "roi shape %s is not a multiple of voxel size %s" % ( self.roi.shape, voxel_size)) assert data_offset.is_multiple_of(voxel_size), ( "data offset %s is not a multiple of voxel size %s" % ( data_offset, voxel_size)) assert self.data_roi.contains(roi), ( "data ROI %s does not contain given ROI %s" % ( self.data_roi, roi)) self.freeze() @property def shape(self): '''Get the shape in voxels of this array, possibly including channel dimensions. This is equivalent to:: array.to_ndarray().shape() but does not actually create the ``ndarray``. ''' view_shape = (self.roi/self.voxel_size).shape return self.data.shape[:self.n_channel_dims] + view_shape @property def dtype(self): '''Get the dtype of this array.''' return self.data.dtype def __getitem__(self, key): '''Get a sub-array or a single value. Args: key (`class:Roi` or `class:Coordinate`): The ROI specifying the sub-array or a coordinate for a single value. Returns: If ``key`` is a `class:Roi`, returns a `class:Array` that represents this ROI. This is a light-weight operation that does not access the actual data held by this array. If ``key`` is a `class:Coordinate`, the array value (possible multi-channel) closest to the coordinate is returned. ''' if isinstance(key, Roi): roi = key assert self.roi.contains(roi), ( "Requested roi %s is not contained in this array %s." % ( roi, self.roi)) return Array( self.data, roi, self.voxel_size, self.data_roi.begin) elif isinstance(key, Coordinate): coordinate = key assert self.roi.contains(coordinate), ( "Requested coordinate is not contained in this array.") return self.data[self.__index(coordinate)] def __setitem__(self, roi, value): '''Set the data of this array within the given ROI. Args: roi (`class:Roi`): The ROI to write to. value (`class:Array`, or broadcastable to ``ndarray``): The value to write. If an `class:Array`, the ROIs do not have to match, however, the shape of ``value`` has to be broadcastable to the voxel shape of ``roi``. ''' assert isinstance(roi, Roi), ( "Roi expected, but got %s" % (type(roi))) assert roi.begin.is_multiple_of(self.voxel_size), ( "roi offset %s is not a multiple of voxel size %s" % ( roi.begin, self.voxel_size)) assert roi.shape.is_multiple_of(self.voxel_size), ( "roi shape %s is not a multiple of voxel size %s" % ( roi.shape, self.voxel_size)) target = self.data target_slices = self.__slices( roi, check_chunk_align=self.check_write_chunk_align) if not hasattr(value, '__getitem__'): target[target_slices] = value return if isinstance(value, Array): array = value source = array.data source_slices = array.__slices(array.roi) else: source = value source_slices = slice(None) target[target_slices] = source[source_slices] def materialize(self): '''Copy the data represented by this array to memory. This is equivalent to:: array = Array(array.to_ndarray(), array.roi, array.voxel_size) but modifies this array directly. ''' self.data = self.to_ndarray() self.data_roi = self.roi.copy() def to_ndarray(self, roi=None, fill_value=None): '''Copy the data represented by this array into an ``ndarray``. Args: roi (`class:Roi`, optional): If given, copy only the data represented by this ROI. This is equivalent to:: array[roi].to_ndarray() fill_value (scalar, optional): If given, allow ``roi`` to be outside of this array's ROI. Outside values will be filled with ``fill_value``. ''' if roi is None: return self.data[self.__slices(self.roi)] if fill_value is None: return self[roi].to_ndarray() shape = (roi/self.voxel_size).shape data = np.zeros( self.data.shape[:self.n_channel_dims] + shape, dtype=self.data.dtype) if fill_value != 0: data[:] = fill_value array = Array(data, roi, self.voxel_size) shared_roi = self.roi.intersect(roi) if not shared_roi.empty: array[shared_roi] = self[shared_roi] return data def intersect(self, roi): '''Get a sub-array obtained by intersecting this array with the given ROI. This is equivalent to:: array[array.roi.intersect(roi)] Args: roi (`class:Roi`): The ROI to intersect with. ''' intersection = self.roi.intersect(roi) return self[intersection] def __slices(self, roi, check_chunk_align=False): '''Get the voxel slices for the given roi.''' voxel_roi = (roi - self.data_roi.begin)/self.voxel_size if check_chunk_align: for d in range(roi.dims): end_of_array = roi.get_end()[d] == self.roi.get_end()[d] begin_align_with_chunks = ( voxel_roi.begin[d] % self.chunk_shape[d] == 0) shape_align_with_chunks = ( voxel_roi.shape[d] % self.chunk_shape[d] == 0) assert begin_align_with_chunks and ( shape_align_with_chunks or end_of_array), ( "ROI %s (in voxels: %s) does not align with chunks of " "size %s (mismatch in dimension %d)" % (roi, voxel_roi, self.chunk_shape, d)) return (slice(None),)*self.n_channel_dims + voxel_roi.to_slices() def __index(self, coordinate): '''Get the voxel slices for the given coordinate.''' index = (coordinate - self.data_roi.begin)/self.voxel_size if self.n_channel_dims > 0: index = (Ellipsis,) + index return index