Batch Processing

Batch Processor

class yass.batch.BatchProcessor(path_to_recordings, dtype=None, n_channels=None, data_order=None, max_memory='1GB', buffer_size=0, loader='memmap', show_progress_bar=True)[source]

Batch processing for large numpy matrices

Parameters:
path_to_recordings: str

Path to recordings file

dtype: str

Numpy dtype

n_channels: int

Number of channels

data_order: str

Recordings order, one of (‘channels’, ‘samples’). In a dataset with k observations per channel and j channels: ‘channels’ means first k contiguous observations come from channel 0, then channel 1, and so on. ‘sample’ means first j contiguous data are the first observations from all channels, then the second observations from all channels and so on

max_memory: int or str

Max memory to use in each batch, interpreted as bytes if int, if string, it can be any of {N}KB, {N}MB or {N}GB

buffer_size: int, optional

Buffer size, defaults to 0. Only relevant when performing multi-channel operations

loader: str (‘memmap’, ‘array’ or ‘python’), optional

How to load the data. memmap loads the data using a wrapper around np.memmap (see MemoryMap for details), ‘array’ using numpy.fromfile and ‘python’ loads it using a wrapper around Python file API. Defaults to ‘python’. Beware that the Python loader has limited indexing capabilities, see BinaryReader for details

show_progress_bar: bool, optional

Show progress bar when running operations, defaults to True

Raises:
ValueError

If dimensions do not match according to the file size, dtype and number of channels

multi_channel(from_time=None, to_time=None, channels='all', return_data=True)[source]

Generate indexes where each index has observations from more than one channel

Returns:
generator:

A tuple of size three: the first element is the subset of the data for the ith batch, second element is the slice object with the limits of the data in [observations, channels] format (excluding the buffer), the last element is the absolute index of the data again in [observations, channels] format

Examples

# coding: utf-8

# See notebook:
# https://github.com/paninski-lab/yass-examples/blob/master/batch/multi_channel.ipynb

"""
Splitting large files into batches where every batch has n
observations from m channels using BatchProcessor.multi_channel
"""

import os
from yass.batch import BatchProcessor


path_to_neuropixel_data = (os.path.expanduser('~/data/ucl-neuropixel'
                           '/rawDataSample.bin'))


bp = BatchProcessor(path_to_neuropixel_data,
                    dtype='int16', n_channels=385, data_format='wide',
                    max_memory='300MB')


# now, let's to some multi_channel operations, here we will
# traverse all channels and all observations, each batch will
# contain a subset in the temporal dimension, the window size
# is determined by max_memory
data = bp.multi_channel()

for d, _, idx in data:
    print('Shape: {}. Index: {}'.format(d.shape, idx))


# we can specify the temporal limits and subset channels
data = bp.multi_channel(from_time=100000, to_time=200000, channels=[0, 1, 2])

for d, _, idx in data:
    print('Shape: {}. Index: {}'.format(d.shape, idx))


# we can also create a BatchProcessor with a buffer
bp2 = BatchProcessor(path_to_neuropixel_data,
                     dtype='int16', n_channels=385, data_format='wide',
                     max_memory='100KB', buffer_size=10)

data = bp2.multi_channel(from_time=0, to_time=100000, channels=[0, 1, 2])

for d, idx_local, idx in data:
    # d is the batch data (with buffer), d[idx_local] returns the data
    # excluding buffer and idx is the absolute location for the
    # current batch in the recordings
    print('Shape: {}. Local: {}. Absolute: {}\n'
          .format(d.shape, idx_local, idx))
multi_channel_apply(function, mode, cleanup_function=None, output_path=None, from_time=None, to_time=None, channels='all', if_file_exists='overwrite', cast_dtype=None, pass_batch_info=False, pass_batch_results=False, processes=1, **kwargs)[source]

Apply a function where each batch has observations from more than one channel

Parameters:
function: callable

Function to be applied, first parameter passed will be a 2D numpy array in ‘long’ shape (number of observations, number of channels). If pass_batch_info is True, another two keyword parameters will be passed to function: ‘idx_local’ is the slice object with the limits of the data in [observations, channels] format (excluding the buffer), ‘idx’ is the absolute index of the data again in [observations, channels] format

mode: str

‘disk’ or ‘memory’, if ‘disk’, a binary file is created at the beginning of the operation and each partial result is saved (ussing numpy.ndarray.tofile function), at the end of the operation two files are generated: the binary file and a yaml file with some file parameters (useful if you want to later use RecordingsReader to read the file). If ‘memory’, partial results are kept in memory and returned as a list

cleanup_function: callable, optional

A function to be executed after function and before adding the partial result to the list of results (if memory mode) or to the biinary file (if in disk mode). cleanup_function will be called with the following parameters (in that order): result from applying function to the batch, slice object with the idx where the data is located (exludes buffer), slice object with the absolute location of the data and buffer size

output_path: str, optional

Where to save the output, required if ‘disk’ mode

force_complete_channel_batch: bool, optional

If True, every index generated will correspond to all the observations in a single channel, hence n_batches = n_selected_channels, defaults to True. If True from_time and to_time must be None

from_time: int, optional

Starting time, defaults to None

to_time: int, optional

Ending time, defaults to None

channels: int, tuple or str, optional

A tuple with the channel indexes or ‘all’ to traverse all channels, defaults to ‘all’

if_file_exists: str, optional

One of ‘overwrite’, ‘abort’, ‘skip’. If ‘overwrite’ it replaces the file if it exists, if ‘abort’ if raise a ValueError exception if the file exists, if ‘skip’ if skips the operation if the file exists. Only valid when mode = ‘disk’

cast_dtype: str, optional

Output dtype, defaults to None which means no cast is done

pass_batch_info: bool, optional

Whether to call the function with batch info or just call it with the batch data (see description in the function) parameter

pass_batch_results: bool, optional

Whether to pass results from the previous batch to the next one, defaults to False. Only relevant when mode=’memory’. If True, function will be called with the keyword parameter ‘previous_batch’ which contains the computation for the last batch, it is set to None in the first batch

**kwargs

kwargs to pass to function

Returns:
output_path, params (when mode is ‘disk’)

Path to output binary file, Binary file params

list (when mode is ‘memory’ and pass_batch_results is False)

List where every element is the result of applying the function to one batch. When pass_batch_results is True, it returns the output of the function for the last batch

Notes

Applying functions will incur in memory overhead, which depends on the function implementation, this is an important thing to consider if the transformation changes the data’s dtype (e.g. converts int16 to float64), which means that a chunk of 1MB in int16 will have a size of 4MB in float64. Take that into account when setting max_memory

For performance reasons, outputs data in ‘samples’ order.

Examples

# coding: utf-8

# See notebook:
# https://github.com/paninski-lab/yass-examples/blob/master/batch/multi_channel_apply_disk.ipynb

"""
Applying transformations to large files in batches:

BatchProcessor.multi_channel_apply lets you apply transformations to
batches of data where every batch has observations from every channel.

This example show how to process a large file in batches and save the
results to disk.
"""

import logging
import os

import matplotlib.pyplot as plt

from yass.batch import BatchProcessor
from yass.batch import RecordingsReader


# configure logging to get information about the process
logging.basicConfig(level=logging.INFO)


# raw data file
path_to_neuropixel_data = (os.path.expanduser('~/data/ucl-neuropixel'
                           '/rawDataSample.bin'))

# output file
path_to_modified_data = (os.path.expanduser('~/data/ucl-neuropixel'
                         '/tmp/modified.bin'))


# out example function just adds one to every observation
def sum_one(batch):
    """Add one to every element in the batch
    """
    return batch + 1


# create batch processor for the data
bp = BatchProcessor(path_to_neuropixel_data,
                    dtype='int16', n_channels=385, data_format='wide',
                    max_memory='500MB')

# appply a multi channel transformation, each batch will be a temporal
# subset with observations from all selected n_channels, the size
# of the subset is calculated depending on max_memory. Each batch is
# processed and when done, results are save to disk, the next batch is
# then loaded and so on
bp.multi_channel_apply(sum_one,
                       mode='disk',
                       output_path=path_to_modified_data,
                       channels=[0, 1, 2])


# let's visualize the results
raw = RecordingsReader(path_to_neuropixel_data, dtype='int16',
                       n_channels=385, data_format='wide')

# you do not need to specify the format since multi_channel_apply
# saves a yaml file with such parameters
filtered = RecordingsReader(path_to_modified_data)

fig, (ax1, ax2) = plt.subplots(2, 1)
ax1.plot(raw[:2000, 0])
ax2.plot(filtered[:2000, 0])
plt.show()
# coding: utf-8

# See notebook:
# https://github.com/paninski-lab/yass-examples/blob/master/batch/multi_channel_apply_memory.ipynb

"""
Applying transformations to large files in batches:

BatchProcessor.multi_channel_apply lets you apply transformations to
batches of data where every batch has observations from every channel.

This example show how to extract information from a large file by
processing it in batches.
"""

import logging
import os

import numpy as np

from yass.batch import BatchProcessor


# configure logging to get information about the process
logging.basicConfig(level=logging.INFO)


# raw data file
path_to_neuropixel_data = (os.path.expanduser('~/data/ucl-neuropixel'
                           '/rawDataSample.bin'))


# on each batch, we find the maximum value in every channel
def max_in_channel(batch):
    """Add one to every element in the batch
    """
    return np.max(batch, axis=0)


# create batch processor for the data
bp = BatchProcessor(path_to_neuropixel_data,
                    dtype='int16', n_channels=385, data_format='wide',
                    max_memory='10MB')

# appply a multi channel transformation, each batch will be a temporal
# subset with observations from all selected n_channels, the size
# of the subset is calculated depending on max_memory. Results
# from every batch are returned in a list
res = bp.multi_channel_apply(max_in_channel,
                             mode='memory',
                             channels=[0, 1, 2])


# we have one element per batch
len(res)


# output for the first batch
res[0]


# stack results from every batch
arr = np.stack(res, axis=0)


# let's find the maximum value along every channel in all the dataset
np.max(arr, axis=0)
single_channel(force_complete_channel_batch=True, from_time=None, to_time=None, channels='all')[source]

Generate batches where each index has observations from a single channel

Returns:
A generator that yields batches, if force_complete_channel_batch is
False, each generated value is a tuple with the batch and the
channel for the index for the corresponding channel

Examples

# coding: utf-8

# See notebook:
# https://github.com/paninski-lab/yass-examples/blob/master/batch/single_channel.ipynb

"""
Splitting large file into batches where every batch contains n
observations from 1 channel
"""

import os
from yass.batch import BatchProcessor


path_to_neuropixel_data = (os.path.expanduser('~/data/ucl-neuropixel'
                           '/rawDataSample.bin'))


bp = BatchProcessor(path_to_neuropixel_data,
                    dtype='int16', n_channels=385, data_format='wide',
                    max_memory='1MB')

# there are two ways of traversing the data: single_channel and multi_channel
# single_channel means that the data in a single batch comes from only one
# channel, multi_channel means that a batch can contain data from multiple
# channels, let's take a look at single_channel operations

# traverse the whole dataset, one channel at a time
data = bp.single_channel()

# the next for loop will raise an error since we cannot fit
# all observations for a single channel in memory, so we
# either increase max_memory or set
# force_complete_channel_batch to False

# for d in data:
#     print(d.shape)


# When force_complete_channel_batch is False, each batch does not necessarily
# correspond to all observations in the channel, the channel can be splitted
# in several batches (although every batch data is guaranteed to come from
# a single channel), in this case, every channel is splitted in two parts
data = bp.single_channel(force_complete_channel_batch=False,
                         channels=[0, 1, 2])

for d, ch in data:
    print(d.shape, 'Data from channel {}'.format(ch))


# finally, we can traverse a single channel in a temporal subset
data = bp.single_channel(from_time=100000, to_time=200000, channels=[0, 1, 2])

for d in data:
    print(d.shape)
single_channel_apply(function, mode, output_path=None, force_complete_channel_batch=True, from_time=None, to_time=None, channels='all', if_file_exists='overwrite', cast_dtype=None, **kwargs)[source]

Apply a transformation where each batch has observations from a single channel

Parameters:
function: callable

Function to be applied, must accept a 1D numpy array as its first parameter

mode: str

‘disk’ or ‘memory’, if ‘disk’, a binary file is created at the beginning of the operation and each partial result is saved (ussing numpy.ndarray.tofile function), at the end of the operation two files are generated: the binary file and a yaml file with some file parameters (useful if you want to later use RecordingsReader to read the file). If ‘memory’, partial results are kept in memory and returned as a list

output_path: str, optional

Where to save the output, required if ‘disk’ mode

force_complete_channel_batch: bool, optional

If True, every index generated will correspond to all the observations in a single channel, hence n_batches = n_selected_channels, defaults to True. If True from_time and to_time must be None

from_time: int, optional

Starting time, defaults to None

to_time: int, optional

Ending time, defaults to None

channels: int, tuple or str, optional

A tuple with the channel indexes or ‘all’ to traverse all channels, defaults to ‘all’

if_file_exists: str, optional

One of ‘overwrite’, ‘abort’, ‘skip’. If ‘overwrite’ it replaces the file if it exists, if ‘abort’ if raise a ValueError exception if the file exists, if ‘skip’ if skips the operation if the file exists. Only valid when mode = ‘disk’

cast_dtype: str, optional

Output dtype, defaults to None which means no cast is done

**kwargs

kwargs to pass to function

Notes

When applying functions in ‘disk’ mode will incur in memory overhead, which depends on the function implementation, this is an important thing to consider if the transformation changes the data’s dtype (e.g. converts int16 to float64), which means that a chunk of 1MB in int16 will have a size of 4MB in float64. Take that into account when setting max_memory.

For performance reasons in ‘disk’ mode, output data is in ‘channels’ order

Examples

# coding: utf-8

# See notebook:
# https://github.com/paninski-lab/yass-examples/blob/master/batch/single_channel_apply.ipynb

"""
Apply functions to large files using
BatchProcessor.single_channel_apply
"""

import logging
import os

import matplotlib.pyplot as plt

from yass.batch import BatchProcessor
from yass.batch import RecordingsReader
from yass.preprocess.filter import butterworth


logging.basicConfig(level=logging.INFO)


path_to_neuropixel_data = (os.path.expanduser('~/data/ucl-neuropixel'
                           '/rawDataSample.bin'))
path_to_filtered_data = (os.path.expanduser('~/data/ucl-neuropixel'
                         '/tmp/filtered.bin'))


# create batch processor for the data
bp = BatchProcessor(path_to_neuropixel_data,
                    dtype='int16', n_channels=385, data_format='wide',
                    max_memory='500MB')


# appply a single channel transformation, each batch will be all observations
# from one channel, results are saved to disk
bp.single_channel_apply(butterworth,
                        mode='disk',
                        output_path=path_to_filtered_data,
                        low_freq=300, high_factor=0.1,
                        order=3, sampling_freq=30000,
                        channels=[0, 1, 2])


# let's visualize the results
raw = RecordingsReader(path_to_neuropixel_data, dtype='int16',
                       n_channels=385, data_format='wide')

# you do not need to specify the format since single_channel_apply
# saves a yaml file with such parameters
filtered = RecordingsReader(path_to_filtered_data)

fig, (ax1, ax2) = plt.subplots(2, 1)
ax1.plot(raw[:2000, 0])
ax2.plot(filtered[:2000, 0])
plt.show()

Batch Pipeline

class yass.batch.BatchPipeline(path_to_input, dtype, n_channels, data_order, max_memory, output_path, from_time=None, to_time=None, channels='all')[source]

Chain batch operations and write partial results to disk

Parameters:
path_to_input: str

Path to input file

dtype: str

Numpy dtype

n_channels: int

Number of channels

data_order: str

Recordings order, one of (‘channels’, ‘samples’). In a dataset with k observations per channel and j channels: ‘channels’ means first k contiguous observations come from channel 0, then channel 1, and so on. ‘sample’ means first j contiguous data are the first observations from all channels, then the second observations from all channels and so on

max_memory: int or str

Max memory to use in each batch, interpreted as bytes if int, if string, it can be any of {N}KB, {N}MB or {N}GB

output_path: str

Folder indicating where to store the files from every step

from_time: int, optional

Starting time, defaults to None, which means start from time 0

to_time: int, optional

Ending time, defaults to None, which means end at the last observation

channels: int, tuple or str, optional

A tuple with the channel indexes or ‘all’ to traverse all channels, defaults to ‘all’

Examples

import logging
import os

import matplotlib.pyplot as plt

from yass.batch.pipeline import BatchPipeline, PipedTransformation
from yass.batch import RecordingsReader
from yass.preprocess.filter import butterworth, standarize

logging.basicConfig(level=logging.DEBUG)


path_to_neuropixel_data = (os.path.expanduser('~/data/ucl-neuropixel'
                                              '/rawDataSample.bin'))
path_output = os.path.expanduser('~/data/ucl-neuropixel/tmp')


pipeline = BatchPipeline(path_to_neuropixel_data, dtype='int16',
                         n_channels=385, data_format='wide',
                         max_memory='500MB',
                         from_time=None, to_time=None, channels='all',
                         output_path=path_output)

butterworth_op = PipedTransformation(butterworth, 'filtered.bin',
                                     mode='single_channel_one_batch',
                                     keep=True, low_freq=300, high_factor=0.1,
                                     order=3, sampling_freq=30000)

standarize_op = PipedTransformation(standarize, 'standarized.bin',
                                    mode='single_channel_one_batch',
                                    keep=True, sampling_freq=30000)

pipeline.add([butterworth_op, standarize_op])

pipeline.run()


raw = RecordingsReader(path_to_neuropixel_data, dtype='int16',
                       n_channels=385, data_format='wide')
filtered = RecordingsReader(os.path.join(path_output, 'filtered.bin'))
standarized = RecordingsReader(os.path.join(path_output, 'standarized.bin'))

# plot results
fig, (ax1, ax2, ax3) = plt.subplots(3, 1)
ax1.plot(raw[:2000, 0])
ax1.set_title('Raw data')
ax2.plot(filtered[:2000, 0])
ax2.set_title('Filtered data')
ax3.plot(standarized[:2000, 0])
ax3.set_title('Standarized data')
plt.tight_layout()
plt.show()
run()[source]

Run all tasks in the pipeline

Returns:
list

List with path to output files in the order they were run, if keep is False, path is still returned but file will not exist

list

List with parameters

class yass.batch.PipedTransformation(function, output_name, mode, keep=False, if_file_exists='overwrite', cast_dtype=None, **kwargs)[source]

Wrapper for functions run with BatchPipeline

Parameters:
function: function

Function to apply

output_name: str

Name of the file for the output

mode: str

Operation mode, one of ‘single_channel_one_batch’ (every batch are all observations from a single channel), ‘single_channel’ (every batch are observations from a single channel, but can be splitted in several batches to avoid exceeding max_memory) or ‘multi_channel’ (every batch has observations for every channel selected, batches are splitted not to exceed max_memory)

keep: bool, optional

Whether to keep the results from this step, otherwise the file is deleted after the next transformation is done

if_file_exists: str, optional

One of ‘overwrite’, ‘abort’, ‘skip’. If ‘overwrite’ it replaces the file if it exists, if ‘abort’ if raise a ValueError exception if the file exists, if ‘skip’ if skips the operation if the file exists. Only valid when mode = ‘disk’

cast_dtype: str, optional

Output dtype, defaults to None which means no cast is done

**kwargs

Function kwargs

Recordings Reader

class yass.batch.RecordingsReader(path_to_recordings, dtype=None, n_channels=None, data_order=None, loader='memmap', buffer_size=0, return_data_index=False)[source]

Neural recordings reader. If a file with the same name but yaml extension exists in the directory it looks for dtype, channels and data_order, otherwise you need to pass the parameters in the constructor

Parameters:
path_to_recordings: str

Path to recordings file

dtype: str

Numpy dtype

n_channels: int

Number of channels

data_order: str

Recordings order, one of (‘channels’, ‘samples’). In a dataset with k observations per channel and j channels: ‘channels’ means first k contiguous observations come from channel 0, then channel 1, and so on. ‘sample’ means first j contiguous data are the first observations from all channels, then the second observations from all channels and so on

loader: str (‘memmap’, ‘array’ or ‘python’), optional

How to load the data. memmap loads the data using a wrapper around np.memmap (see MemoryMap for details), ‘array’ using numpy.fromfile and ‘python’ loads it using a wrapper around Python file API. Defaults to ‘python’. Beware that the Python loader has limited indexing capabilities, see BinaryReader for details

buffer_size: int, optional

Adds buffer

return_data_index: bool, optional

If True, a tuple will be returned when indexing: the first element will be the data and the second the index corresponding to the actual data (excluding bufffer), when buffer is equal to zero, this just returns they original index since there is no buffer

Raises:
ValueError

If dimensions do not match according to the file size, dtype and number of channels

Notes

This is just an utility class to index binary files in a consistent way, it does not matter the order of the file (‘channels’ or ‘samples’), indexing is performed in [observations, channels] format. This class is mainly used by other internal YASS classes to maintain a consistent indexing order.

Examples

# coding: utf-8

# See notebook:
# https://github.com/paninski-lab/yass-examples/blob/master/batch/reader.ipynb

"""
Reading large files with RecordingsReader
"""

import os

import numpy as np
from yass.batch import RecordingsReader


# generate data
output_folder = os.path.join(os.path.expanduser('~'), 'data/yass')
wide_data = np.random.rand(50, 100000)
long_data = wide_data.T

path_to_wide = os.path.join(output_folder, 'wide.bin')
path_to_long = os.path.join(output_folder, 'long.bin')

wide_data.tofile(path_to_wide)
long_data.tofile(path_to_long)


# load the files using the readers, they are agnostic on the data shape
# and will behave exactly the same
reader_wide = RecordingsReader(path_to_wide, dtype='float64',
                               n_channels=50, data_order='channels')

reader_long = RecordingsReader(path_to_long, dtype='float64',
                               n_channels=50, data_order='samples')


reader_wide.shape, reader_long.shape


# first index is for observations and second index for channels
obs = reader_wide[10000:20000, 20:30]
obs, obs.shape


# same applies even if your data is in 'wide' shape, first index for
# observations, second for channels, the output is converted to 'long'
# by default but you can change it
obs = reader_long[10000:20000, 20:30]
obs, obs.shape
channels

Number of channels

data

Underlying numpy data

data_order

Data order

dtype

Numpy’s dtype

observations

Number of observations

shape

Data shape in (observations, channels) format

Binary Readers

class yass.batch.BinaryReader(path_to_file, dtype, shape, order='F')[source]

Reading batches from large array binary files on disk, similar to numpy.memmap. It is essentially just a wrapper around Python files API to read through large array binary file using the array[:,:] syntax.

Parameters:
order: str

Array order ‘C’ for ‘Row-major order’ or ‘F’ for ‘Column-major order’

Notes

https://en.wikipedia.org/wiki/Row-_and_column-major_order

class yass.batch.MemoryMap(*args, **kwargs)[source]

Wrapper for numpy.memmap that creates a new memmap on each __getitem__ call to save memory