daf.access.writers

Write API for daf data sets.

All names used for set_* methods are simple property, axis#property or rows_axis,columns_axis#property names. This is in contrast to DafReader get_* methods which accept much more complex names.

Note

Since daf aggressively caches data derived from the stored data, the only safe operation is to add new data to a data set (as this invalidates no caches). Even then one has to take care that no old StorageView objects are used as these need not reflect the new data. Actually overwriting data is even worse as it will not invalidate any stale data in the caches. Deletion isn’t even supported in the API.

When overwriting or deletion is needed, the recommended idiom is to create a new DafWriter with a separate storage to hold the data-we’ll-want-to-overwrite-or-delete. After we are done with this stale data, we can simply discard this storage and create a fresh DafWriter without it. This requires foresight, but is automated by functions such as DafWriter.adapter and computation.

Todo

Track which views/caches refer to each base storage and automatically invalidate any cached data on change, and provide delete operations? This would massively complicate the implementation…

Classes:

DafWriter(storage, *[, base, derived, name])

Read-write access to a daf data set.

BackAxis([name, optional, copy_on_error])

How to copy data axis from processing results into the original data set.

BackData(name, default, optional, overwrite, ...)

How to copy data back from processing results into the original data set.

Data:

COMPLETE_DATA

Specify that computed data must be complete (will not use axes that were sliced).

Functions:

computation([required_inputs, ...])

Wrap a computation on a daf data set.

class daf.access.writers.DafWriter(storage: StorageWriter, *, base: Optional[StorageReader] = None, derived: Optional[StorageWriter] = None, name: str = '.daf#')[source]

Bases: DafReader

Read-write access to a daf data set.

If the name starts with ., it is appended to the base name. If the name ends with #, we append the object id to it to make it unique.

Attributes:

storage

Where to store modifications to the data set.

Methods:

as_reader()

Return the data set as a DafReader.

set_item(name, item, *[, overwrite])

Set a name 0D data item.

create_axis(axis, entries)

Create a new axis and the unique entries identifying each entry along the axis.

set_data1d(name, data1d, *[, overwrite])

Set a name AnyData data.

set_data2d(name, data2d, *[, overwrite])

Set a name .AnyData data.

create_dense_in_rows(name, *, dtype[, overwrite])

Create an uninitialized ROW_MAJOR .`DenseInRows` of some dtype to be set by the name in the data set, expecting the code to initialize it.

adapter(*[, axes, data, name, cache, ...])

Execute some code on a view of this data set; when done, transfer (some of) the results back into it.

computation([required_inputs, ...])

Implement some computation on a daf data set, with explicit input and output data names.

storage

Where to store modifications to the data set. By default the base is also set to this. Specifying an explicit base allows, for example, to use a MemoryStorage to hold modifications (such as additional type annotations), without actually modifying some read-only base FilesReader cells atlas.

as_reader() DafReader[source]

Return the data set as a DafReader.

This is a no-op (returns self) for “real” read-only data sets, but for writable data sets, it returns a “real” read-only wrapper object (that does not implement the writing methods). This ensures that the result can’t be used to modify the data if passed by mistake to a function that takes a DafWriter.

set_item(name: str, item: Any, *, overwrite: bool = False) None[source]

Set a name 0D data item.

If overwrite, will silently overwrite an existing item of the same name, otherwise overwriting will fail.

create_axis(axis: str, entries: Union[Sequence[Any], ndarray, _fake_sparse.spmatrix, Series, DataFrame]) None[source]

Create a new axis and the unique entries identifying each entry along the axis.

The entries must be is_frozen and contain string data.

It is always an error to overwrite an existing axis.

Note

We verify that the axis entries are unique. However, we can’t guarantee that the entries will be unique for axes in arbitrary data accessed through some storage adapter (e.g., AnnData).

set_data1d(name: str, data1d: Union[Sequence[Any], ndarray, _fake_sparse.spmatrix, Series, DataFrame], *, overwrite: bool = False) None[source]

Set a name AnyData data.

The name must be in the format axis#name which uniquely identifies the 1D data.

If overwrite, will silently overwrite an existing 1D data of the same name, otherwise overwriting will fail.

set_data2d(name: str, data2d: Union[Sequence[Any], ndarray, _fake_sparse.spmatrix, Series, DataFrame], *, overwrite: bool = False) None[source]

Set a name .AnyData data.

The name must be in the format rows_axis,columns_axis#name which uniquely identifies the 2D data.

If overwrite, will silently overwrite an existing 2D data of the same name, otherwise overwriting will fail.

create_dense_in_rows(name: str, *, dtype: Union[str, dtype], overwrite: bool = False) Generator[DenseInRows, None, None][source]

Create an uninitialized ROW_MAJOR .`DenseInRows` of some dtype to be set by the name in the data set, expecting the code to initialize it.

The name must be in the format rows_axis,columns_axis#name which uniquely identifies the 2D data.

Expected usage is:

with data.create_dense_in_rows(name="rows_axis,columns_axis#name", dtype="...") as dense:
    # Here the dense is still not necessarily set inside the data set.
    # That is, one can't assume ``get_matrix`` will access it.
    # It is only available for filling in the values:
    dense[..., ...] = ...

# Here the array is set inside the storage.
# That is, one can use ``get_matrix`` to access it.

This allows FilesWriter to create the array on disk without first having to create an in-memory copy. By default (for other storage adapters), this just creates and returns an uninitialized in-memory 2D dense array, then sets it as the 2D data value.

adapter(*, axes: Optional[Mapping[str, Union[None, str, Sequence[Any], ndarray, _fake_sparse.spmatrix, Series, DataFrame, AxisView]]] = None, data: Optional[Mapping[str, Optional[str]]] = None, name: str = '.adapter#', cache: Optional[StorageWriter] = None, storage: Optional[StorageWriter] = None, hide_implicit: bool = False, back_axes: Union[None, Collection[str], Mapping[str, Union[str, BackAxis]]] = None, back_data: Union[None, Collection[str], Mapping[str, Union[str, BackData]]] = None) Generator[DafWriter, None, None][source]

Execute some code on a view of this data set; when done, transfer (some of) the results back into it.

If the name starts with ., it is appended to both the StorageView and the DafWriter names. If the name ends with #, we append the object id to it to make it unique.

This sets up a StorageView to adapt the data set to the expectation of some processing code. It then uses this as the base for a DafWriter which is provided to the processing code. By default this uses MemoryStorage as the storage for the computed results. When the processing completes, (some of) the computed results are copied back into the original data set:

  • If back_axes is specified, it should list (some of) the new axes created by the processing code. Each of these will be copied into the original data set. If back_axes is a dict, it provides either a name or a complete BackAxis specifying exactly how to copy each axis back. Otherwise it is just a collection of the new axes to copy on success, preserving their name.

  • If back_data is specified, it should list (some of) the new data created (or modified) by the processing code. Each of these will be copied back into the original data set. If back_data is a dict, it provides either a name of a complete BackData specifying exactly how to copy each data back. Otherwise it is a just a collection of the data to copy on success, preserving the names and requiring that such data will not use any sliced axes.

Todo

Currently the adapter data mapping is restricted to simple properties such as cell#age. Lift this restriction to allow for derived properties such as cell#batch#age.

A contrived example might look like:

import daf

data = daf.DafWriter(
    storage=daf.MemoryStorage(name="example.storage"),
    base=daf.FilesReader(daf.DAF_EXAMPLE_PATH, name="example.base"),
    name="example"
)

with data.adapter(
    axes=dict(metacell="y", gene="x"),
    data={"metacell,gene#UMIs|Fraction": "z"},
    hide_implicit=True,
    back_data={"x#mean": "mean_fraction"}
) as adapter:
    # The `adapter` data set has only `x` and `y` axes, and a per-x-per-y `z` matrix,
    # matching the expectations of `collect_stats`:

    # Assume this is some generic code for capturing statistics.
    adapter.set_data1d("x#mean", adapter.get_vector("x#y,z|Mean"))

print(data.get_series("gene#mean_fraction"))
RSPO3    0.320572
FOXA1    0.033420
WNT6     0.131119
TNNI1    0.020580
MSGN1    0.078621
LMO2     0.056920
SFRP5    0.026982
DLX5     0.088142
ITGA4    0.192769
FOXA2    0.050876
dtype: float32

Note

This idiom is key for creating an ecosystem of generic daf processing tools. Such tools can require and compute generic axes and data names, and still be applied to data that uses specific descriptive axes and data names. Often the same generic tool can be applied to the same data set in multiple ways, using different mappings between the specific names and the generic names.

Todo

Provide a more efficient implementation of DafWriter._copy_back_data2d (used by DafWriter.adapter). The current implementation uses a few temporary buffers the size of the partial data. If this were implemented in a C/C++ extension it would avoid the temporary buffers, giving a significant performance boost for large data sizes. So far we have chosen to keep daf as a pure Python package so we suffer this inefficiency. Perhaps using numba would provide the efficiency while avoiding C/C++ extension code? Of course this really should be a part of numpy and/or scipy.sparse in the 1st place.

computation(required_inputs: Optional[Collection[str]] = None, optional_inputs: Optional[Collection[str]] = None, assured_outputs: Optional[Collection[str]] = None, optional_outputs: Optional[Collection[str]] = None, overwrite: bool = False, name: str = '.computation#', storage: Optional[StorageWriter] = None, derived: Optional[StorageWriter] = None) Generator[DafWriter, None, None][source]

Implement some computation on a daf data set, with explicit input and output data names. This is rarely invoked directly; typically a computation is wrapped in a function which is annotated by daf.access.writers.computation.

If the name starts with ., it is appended to both the StorageView and the DafWriter names. If the name ends with #, we append the object id to it to make it unique.

Note

This is expected to be used by “well behaved” computation tools for daf data. Typically this is called implicitly by computation. In theory, you only need to invoke this manually if the list of inputs and outputs depends on the parameters. The description here is still useful for better understanding of the behavior of computation.

This restricts the data available for the computation to just the listed required_inputs and, if they exist, optional_inputs. Once the computation is complete, it ensures the required_outputs exist and ensures that only the required_outputs and optional_outputs are copied into the data set. If overwrite, this will overwrite any existing data.

During the computation, any intermediate and derived results are placed in the storage and derived, which by default are simple MemoryStorage objects. Only explicitly listed results, or derived data based on pre-existing (or explicitly listed result) data, are copied back into the data set.

The expected (manual) usage is:

def my_computation(data: daf.DafWriter, ...) -> ...:
    '''
    Describe the computation...

    Required Inputs: ...

    Optional Inputs: ...

    Assured Outputs: ...

    Optional Outputs: ...
    '''

    # Here the `data` may contain more than we think and documented that we need.
    # If the code accesses unlisted data, it would still work when it should fail.
    # If the code fails to create some "assured" data, it is the caller that will fail.
    # Likewise, the code can easily leaks temporary results into the `data`.

    my_computation_implementation(data, ...)  # Unsafe: do not do this!

    # Instead do this:

    with daf.computation(data, name=".my_computation",
                         required_inputs="...", optional_inputs="...",
                         assured_outputs="...", optional_outputs="...") as work:

        # Here the `work` data set contains just the inputs,
        # so accidently accessing other data will fail.
        # Also, here we can freely write temporary results to the `work` data set,
        # without it leaking back to the original `data` set.

        return my_computation_implementation(work, ...)  # Safe!

    # Here the `data` set is updated with only the outputs.
    # Required outputs are guaranteed to exist here.

Todo

If both the final and temporary storage are FilesWriter, avoid copying large 2D data files and instead directly move them from one directory to another.

class daf.access.writers.BackAxis(name: Optional[str] = None, optional: bool = False, copy_on_error: bool = False)[source]

Bases: tuple

How to copy data axis from processing results into the original data set.

Create new instance of BackAxis(name, optional, copy_on_error)

Attributes:

name

The simple name to copy the axis into in the original data set.

optional

Whether the axis is not required to exist in the computed results.

copy_on_error

Whether to (try to) copy the axis into the original data set even if the processing code failed with some exception.

property name

The simple name to copy the axis into in the original data set. By default the axis is not renamed.

property optional

Whether the axis is not required to exist in the computed results.

property copy_on_error

Whether to (try to) copy the axis into the original data set even if the processing code failed with some exception.

class daf.access.writers.BackData(name: ~typing.Optional[str] = None, default: ~typing.Any = <daf.access.writers.CompleteData object>, optional: bool = False, overwrite: bool = False, copy_on_error: bool = False)[source]

Bases: tuple

How to copy data back from processing results into the original data set.

Create new instance of BackData(name, default, optional, overwrite, copy_on_error)

Attributes:

name

The simple name to copy the data into in the original data set.

default

Whether to overwrite existing data in the original data set.

optional

Whether the data is not required to exist in the computed results.

overwrite

Whether to overwrite existing data in the original data set.

copy_on_error

Whether to (try to) copy the data into the original data set even if the processing code failed with some exception.

property name

The simple name to copy the data into in the original data set. By default the data is not renamed.

property default

Whether to overwrite existing data in the original data set.

property optional

Whether the data is not required to exist in the computed results.

property overwrite

Whether to overwrite existing data in the original data set.

property copy_on_error

Whether to (try to) copy the data into the original data set even if the processing code failed with some exception.

daf.access.writers.COMPLETE_DATA = <daf.access.writers.CompleteData object>

Specify that computed data must be complete (will not use axes that were sliced).

daf.access.writers.computation(required_inputs: ~typing.Optional[~typing.Mapping[str, str]] = None, optional_inputs: ~typing.Optional[~typing.Mapping[str, str]] = None, assured_outputs: ~typing.Optional[~typing.Mapping[str, str]] = None, optional_outputs: ~typing.Optional[~typing.Mapping[str, str]] = None, name: ~typing.Optional[str] = None, storage: ~typing.Callable[[~daf.access.writers.DafWriter], ~typing.Optional[~daf.storage.interface.StorageWriter]] = <function <lambda>>, derived: ~typing.Callable[[~daf.access.writers.DafWriter], ~typing.Optional[~daf.storage.interface.StorageWriter]] = <function <lambda>>) Callable[[CALLABLE], CALLABLE][source]

Wrap a computation on a daf data set.

Note

This is the simplest way to write a “well behaved” generic computation tool using daf.

The wrapped function must take a DafWriter data set as its first argument and overwrite as a keyword argument with a default value of False. The function can taker additional arguments if needed. The data set parameter is automatically replaced by a restricted view of the original data set, by using DafWriter.computation. The wrapped function will therefore only have access to the required_inputs and optional_inputs. It may freely write temporary results into the data, but only results listed in assured_outputs and optional_outputs will be copied into the original data set. If overwrite, this will overwrite existing data.

Note

If the computation creates a new axis, list it in the outputs as axis#. You can also document required axes by listing them as axis# required inputs.

By default, the name will append the wrapped function’s name (with a # suffix). The storage and derived used will, by default, be simple MemoryStorage objects. You can overwrite this in an arbitrary way using a helper function that takes all the arguments of the wrapped function and returns the StorageWriter.

In addition, this will embed the documentation of the inputs and outputs into the function’s documentation string. We also capture the inputs and outputs of the computation in properties __daf_required_inputs__, __daf_optional_inputs__, __daf_assured_outputs__ and __daf_optional_inputs to support additional meta-programming.

For example:

import daf

@daf.computation(
    required_inputs={
        "x#": "The axis to compute statistics for.",
        "y#": "The axis of the data to compute statistics for.",
        "x,y#z": "The data to compute statistics for.",
    },
    assured_outputs={
        "x#mean": "The mean value of the data for each of the X values.",
    },
)
def compute_statistics(data: daf.DafWriter, *, overwrite: bool = False) -> None:
    '''
    Compute statistics for arbitrary 2D data.

    For brevity here we only compute the mean for each row.

    __DAF__
    '''
    data.set_data1d("x#mean", data.get_vector("x#y,z|Mean"), overwrite=overwrite)

print(compute_statistics.__doc__.strip())
Compute statistics for arbitrary 2D data.

For brevity here we only compute the mean for each row.

**Required Inputs**

``x#``
    The axis to compute statistics for.
``y#``
    The axis of the data to compute statistics for.
``x,y#z``
    The data to compute statistics for.

**Assured Outputs**

``x#mean``
    The mean value of the data for each of the X values.

If ``overwrite``, will overwrite existing data.