Batch Processing¶
Batch Processor¶
-
class
yass.batch.
BatchProcessor
(path_to_recordings, dtype=None, n_channels=None, data_order=None, max_memory='1GB', buffer_size=0, loader='memmap', show_progress_bar=True)[source]¶ Batch processing for large numpy matrices
Parameters: - path_to_recordings: str
Path to recordings file
- dtype: str
Numpy dtype
- n_channels: int
Number of channels
- data_order: str
Recordings order, one of (‘channels’, ‘samples’). In a dataset with k observations per channel and j channels: ‘channels’ means first k contiguous observations come from channel 0, then channel 1, and so on. ‘sample’ means first j contiguous data are the first observations from all channels, then the second observations from all channels and so on
- max_memory: int or str
Max memory to use in each batch, interpreted as bytes if int, if string, it can be any of {N}KB, {N}MB or {N}GB
- buffer_size: int, optional
Buffer size, defaults to 0. Only relevant when performing multi-channel operations
- loader: str (‘memmap’, ‘array’ or ‘python’), optional
How to load the data. memmap loads the data using a wrapper around np.memmap (see
MemoryMap
for details), ‘array’ using numpy.fromfile and ‘python’ loads it using a wrapper around Python file API. Defaults to ‘python’. Beware that the Python loader has limited indexing capabilities, seeBinaryReader
for details- show_progress_bar: bool, optional
Show progress bar when running operations, defaults to True
Raises: - ValueError
If dimensions do not match according to the file size, dtype and number of channels
-
multi_channel
(from_time=None, to_time=None, channels='all', return_data=True)[source]¶ Generate indexes where each index has observations from more than one channel
Returns: - generator:
A tuple of size three: the first element is the subset of the data for the ith batch, second element is the slice object with the limits of the data in [observations, channels] format (excluding the buffer), the last element is the absolute index of the data again in [observations, channels] format
Examples
# coding: utf-8 # See notebook: # https://github.com/paninski-lab/yass-examples/blob/master/batch/multi_channel.ipynb """ Splitting large files into batches where every batch has n observations from m channels using BatchProcessor.multi_channel """ import os from yass.batch import BatchProcessor path_to_neuropixel_data = (os.path.expanduser('~/data/ucl-neuropixel' '/rawDataSample.bin')) bp = BatchProcessor(path_to_neuropixel_data, dtype='int16', n_channels=385, data_format='wide', max_memory='300MB') # now, let's to some multi_channel operations, here we will # traverse all channels and all observations, each batch will # contain a subset in the temporal dimension, the window size # is determined by max_memory data = bp.multi_channel() for d, _, idx in data: print('Shape: {}. Index: {}'.format(d.shape, idx)) # we can specify the temporal limits and subset channels data = bp.multi_channel(from_time=100000, to_time=200000, channels=[0, 1, 2]) for d, _, idx in data: print('Shape: {}. Index: {}'.format(d.shape, idx)) # we can also create a BatchProcessor with a buffer bp2 = BatchProcessor(path_to_neuropixel_data, dtype='int16', n_channels=385, data_format='wide', max_memory='100KB', buffer_size=10) data = bp2.multi_channel(from_time=0, to_time=100000, channels=[0, 1, 2]) for d, idx_local, idx in data: # d is the batch data (with buffer), d[idx_local] returns the data # excluding buffer and idx is the absolute location for the # current batch in the recordings print('Shape: {}. Local: {}. Absolute: {}\n' .format(d.shape, idx_local, idx))
-
multi_channel_apply
(function, mode, cleanup_function=None, output_path=None, from_time=None, to_time=None, channels='all', if_file_exists='overwrite', cast_dtype=None, pass_batch_info=False, pass_batch_results=False, processes=1, **kwargs)[source]¶ Apply a function where each batch has observations from more than one channel
Parameters: - function: callable
Function to be applied, first parameter passed will be a 2D numpy array in ‘long’ shape (number of observations, number of channels). If pass_batch_info is True, another two keyword parameters will be passed to function: ‘idx_local’ is the slice object with the limits of the data in [observations, channels] format (excluding the buffer), ‘idx’ is the absolute index of the data again in [observations, channels] format
- mode: str
‘disk’ or ‘memory’, if ‘disk’, a binary file is created at the beginning of the operation and each partial result is saved (ussing numpy.ndarray.tofile function), at the end of the operation two files are generated: the binary file and a yaml file with some file parameters (useful if you want to later use RecordingsReader to read the file). If ‘memory’, partial results are kept in memory and returned as a list
- cleanup_function: callable, optional
A function to be executed after function and before adding the partial result to the list of results (if memory mode) or to the biinary file (if in disk mode). cleanup_function will be called with the following parameters (in that order): result from applying function to the batch, slice object with the idx where the data is located (exludes buffer), slice object with the absolute location of the data and buffer size
- output_path: str, optional
Where to save the output, required if ‘disk’ mode
- force_complete_channel_batch: bool, optional
If True, every index generated will correspond to all the observations in a single channel, hence n_batches = n_selected_channels, defaults to True. If True from_time and to_time must be None
- from_time: int, optional
Starting time, defaults to None
- to_time: int, optional
Ending time, defaults to None
- channels: int, tuple or str, optional
A tuple with the channel indexes or ‘all’ to traverse all channels, defaults to ‘all’
- if_file_exists: str, optional
One of ‘overwrite’, ‘abort’, ‘skip’. If ‘overwrite’ it replaces the file if it exists, if ‘abort’ if raise a ValueError exception if the file exists, if ‘skip’ if skips the operation if the file exists. Only valid when mode = ‘disk’
- cast_dtype: str, optional
Output dtype, defaults to None which means no cast is done
- pass_batch_info: bool, optional
Whether to call the function with batch info or just call it with the batch data (see description in the function) parameter
- pass_batch_results: bool, optional
Whether to pass results from the previous batch to the next one, defaults to False. Only relevant when mode=’memory’. If True, function will be called with the keyword parameter ‘previous_batch’ which contains the computation for the last batch, it is set to None in the first batch
- **kwargs
kwargs to pass to function
Returns: - output_path, params (when mode is ‘disk’)
Path to output binary file, Binary file params
- list (when mode is ‘memory’ and pass_batch_results is False)
List where every element is the result of applying the function to one batch. When pass_batch_results is True, it returns the output of the function for the last batch
Notes
Applying functions will incur in memory overhead, which depends on the function implementation, this is an important thing to consider if the transformation changes the data’s dtype (e.g. converts int16 to float64), which means that a chunk of 1MB in int16 will have a size of 4MB in float64. Take that into account when setting max_memory
For performance reasons, outputs data in ‘samples’ order.
Examples
# coding: utf-8 # See notebook: # https://github.com/paninski-lab/yass-examples/blob/master/batch/multi_channel_apply_disk.ipynb """ Applying transformations to large files in batches: BatchProcessor.multi_channel_apply lets you apply transformations to batches of data where every batch has observations from every channel. This example show how to process a large file in batches and save the results to disk. """ import logging import os import matplotlib.pyplot as plt from yass.batch import BatchProcessor from yass.batch import RecordingsReader # configure logging to get information about the process logging.basicConfig(level=logging.INFO) # raw data file path_to_neuropixel_data = (os.path.expanduser('~/data/ucl-neuropixel' '/rawDataSample.bin')) # output file path_to_modified_data = (os.path.expanduser('~/data/ucl-neuropixel' '/tmp/modified.bin')) # out example function just adds one to every observation def sum_one(batch): """Add one to every element in the batch """ return batch + 1 # create batch processor for the data bp = BatchProcessor(path_to_neuropixel_data, dtype='int16', n_channels=385, data_format='wide', max_memory='500MB') # appply a multi channel transformation, each batch will be a temporal # subset with observations from all selected n_channels, the size # of the subset is calculated depending on max_memory. Each batch is # processed and when done, results are save to disk, the next batch is # then loaded and so on bp.multi_channel_apply(sum_one, mode='disk', output_path=path_to_modified_data, channels=[0, 1, 2]) # let's visualize the results raw = RecordingsReader(path_to_neuropixel_data, dtype='int16', n_channels=385, data_format='wide') # you do not need to specify the format since multi_channel_apply # saves a yaml file with such parameters filtered = RecordingsReader(path_to_modified_data) fig, (ax1, ax2) = plt.subplots(2, 1) ax1.plot(raw[:2000, 0]) ax2.plot(filtered[:2000, 0]) plt.show()
# coding: utf-8 # See notebook: # https://github.com/paninski-lab/yass-examples/blob/master/batch/multi_channel_apply_memory.ipynb """ Applying transformations to large files in batches: BatchProcessor.multi_channel_apply lets you apply transformations to batches of data where every batch has observations from every channel. This example show how to extract information from a large file by processing it in batches. """ import logging import os import numpy as np from yass.batch import BatchProcessor # configure logging to get information about the process logging.basicConfig(level=logging.INFO) # raw data file path_to_neuropixel_data = (os.path.expanduser('~/data/ucl-neuropixel' '/rawDataSample.bin')) # on each batch, we find the maximum value in every channel def max_in_channel(batch): """Add one to every element in the batch """ return np.max(batch, axis=0) # create batch processor for the data bp = BatchProcessor(path_to_neuropixel_data, dtype='int16', n_channels=385, data_format='wide', max_memory='10MB') # appply a multi channel transformation, each batch will be a temporal # subset with observations from all selected n_channels, the size # of the subset is calculated depending on max_memory. Results # from every batch are returned in a list res = bp.multi_channel_apply(max_in_channel, mode='memory', channels=[0, 1, 2]) # we have one element per batch len(res) # output for the first batch res[0] # stack results from every batch arr = np.stack(res, axis=0) # let's find the maximum value along every channel in all the dataset np.max(arr, axis=0)
-
single_channel
(force_complete_channel_batch=True, from_time=None, to_time=None, channels='all')[source]¶ Generate batches where each index has observations from a single channel
Returns: - A generator that yields batches, if force_complete_channel_batch is
- False, each generated value is a tuple with the batch and the
- channel for the index for the corresponding channel
Examples
# coding: utf-8 # See notebook: # https://github.com/paninski-lab/yass-examples/blob/master/batch/single_channel.ipynb """ Splitting large file into batches where every batch contains n observations from 1 channel """ import os from yass.batch import BatchProcessor path_to_neuropixel_data = (os.path.expanduser('~/data/ucl-neuropixel' '/rawDataSample.bin')) bp = BatchProcessor(path_to_neuropixel_data, dtype='int16', n_channels=385, data_format='wide', max_memory='1MB') # there are two ways of traversing the data: single_channel and multi_channel # single_channel means that the data in a single batch comes from only one # channel, multi_channel means that a batch can contain data from multiple # channels, let's take a look at single_channel operations # traverse the whole dataset, one channel at a time data = bp.single_channel() # the next for loop will raise an error since we cannot fit # all observations for a single channel in memory, so we # either increase max_memory or set # force_complete_channel_batch to False # for d in data: # print(d.shape) # When force_complete_channel_batch is False, each batch does not necessarily # correspond to all observations in the channel, the channel can be splitted # in several batches (although every batch data is guaranteed to come from # a single channel), in this case, every channel is splitted in two parts data = bp.single_channel(force_complete_channel_batch=False, channels=[0, 1, 2]) for d, ch in data: print(d.shape, 'Data from channel {}'.format(ch)) # finally, we can traverse a single channel in a temporal subset data = bp.single_channel(from_time=100000, to_time=200000, channels=[0, 1, 2]) for d in data: print(d.shape)
-
single_channel_apply
(function, mode, output_path=None, force_complete_channel_batch=True, from_time=None, to_time=None, channels='all', if_file_exists='overwrite', cast_dtype=None, **kwargs)[source]¶ Apply a transformation where each batch has observations from a single channel
Parameters: - function: callable
Function to be applied, must accept a 1D numpy array as its first parameter
- mode: str
‘disk’ or ‘memory’, if ‘disk’, a binary file is created at the beginning of the operation and each partial result is saved (ussing numpy.ndarray.tofile function), at the end of the operation two files are generated: the binary file and a yaml file with some file parameters (useful if you want to later use RecordingsReader to read the file). If ‘memory’, partial results are kept in memory and returned as a list
- output_path: str, optional
Where to save the output, required if ‘disk’ mode
- force_complete_channel_batch: bool, optional
If True, every index generated will correspond to all the observations in a single channel, hence n_batches = n_selected_channels, defaults to True. If True from_time and to_time must be None
- from_time: int, optional
Starting time, defaults to None
- to_time: int, optional
Ending time, defaults to None
- channels: int, tuple or str, optional
A tuple with the channel indexes or ‘all’ to traverse all channels, defaults to ‘all’
- if_file_exists: str, optional
One of ‘overwrite’, ‘abort’, ‘skip’. If ‘overwrite’ it replaces the file if it exists, if ‘abort’ if raise a ValueError exception if the file exists, if ‘skip’ if skips the operation if the file exists. Only valid when mode = ‘disk’
- cast_dtype: str, optional
Output dtype, defaults to None which means no cast is done
- **kwargs
kwargs to pass to function
Notes
When applying functions in ‘disk’ mode will incur in memory overhead, which depends on the function implementation, this is an important thing to consider if the transformation changes the data’s dtype (e.g. converts int16 to float64), which means that a chunk of 1MB in int16 will have a size of 4MB in float64. Take that into account when setting max_memory.
For performance reasons in ‘disk’ mode, output data is in ‘channels’ order
Examples
# coding: utf-8 # See notebook: # https://github.com/paninski-lab/yass-examples/blob/master/batch/single_channel_apply.ipynb """ Apply functions to large files using BatchProcessor.single_channel_apply """ import logging import os import matplotlib.pyplot as plt from yass.batch import BatchProcessor from yass.batch import RecordingsReader from yass.preprocess.filter import butterworth logging.basicConfig(level=logging.INFO) path_to_neuropixel_data = (os.path.expanduser('~/data/ucl-neuropixel' '/rawDataSample.bin')) path_to_filtered_data = (os.path.expanduser('~/data/ucl-neuropixel' '/tmp/filtered.bin')) # create batch processor for the data bp = BatchProcessor(path_to_neuropixel_data, dtype='int16', n_channels=385, data_format='wide', max_memory='500MB') # appply a single channel transformation, each batch will be all observations # from one channel, results are saved to disk bp.single_channel_apply(butterworth, mode='disk', output_path=path_to_filtered_data, low_freq=300, high_factor=0.1, order=3, sampling_freq=30000, channels=[0, 1, 2]) # let's visualize the results raw = RecordingsReader(path_to_neuropixel_data, dtype='int16', n_channels=385, data_format='wide') # you do not need to specify the format since single_channel_apply # saves a yaml file with such parameters filtered = RecordingsReader(path_to_filtered_data) fig, (ax1, ax2) = plt.subplots(2, 1) ax1.plot(raw[:2000, 0]) ax2.plot(filtered[:2000, 0]) plt.show()
Batch Pipeline¶
-
class
yass.batch.
BatchPipeline
(path_to_input, dtype, n_channels, data_order, max_memory, output_path, from_time=None, to_time=None, channels='all')[source]¶ Chain batch operations and write partial results to disk
Parameters: - path_to_input: str
Path to input file
- dtype: str
Numpy dtype
- n_channels: int
Number of channels
- data_order: str
Recordings order, one of (‘channels’, ‘samples’). In a dataset with k observations per channel and j channels: ‘channels’ means first k contiguous observations come from channel 0, then channel 1, and so on. ‘sample’ means first j contiguous data are the first observations from all channels, then the second observations from all channels and so on
- max_memory: int or str
Max memory to use in each batch, interpreted as bytes if int, if string, it can be any of {N}KB, {N}MB or {N}GB
- output_path: str
Folder indicating where to store the files from every step
- from_time: int, optional
Starting time, defaults to None, which means start from time 0
- to_time: int, optional
Ending time, defaults to None, which means end at the last observation
- channels: int, tuple or str, optional
A tuple with the channel indexes or ‘all’ to traverse all channels, defaults to ‘all’
Examples
import logging import os import matplotlib.pyplot as plt from yass.batch.pipeline import BatchPipeline, PipedTransformation from yass.batch import RecordingsReader from yass.preprocess.filter import butterworth, standarize logging.basicConfig(level=logging.DEBUG) path_to_neuropixel_data = (os.path.expanduser('~/data/ucl-neuropixel' '/rawDataSample.bin')) path_output = os.path.expanduser('~/data/ucl-neuropixel/tmp') pipeline = BatchPipeline(path_to_neuropixel_data, dtype='int16', n_channels=385, data_format='wide', max_memory='500MB', from_time=None, to_time=None, channels='all', output_path=path_output) butterworth_op = PipedTransformation(butterworth, 'filtered.bin', mode='single_channel_one_batch', keep=True, low_freq=300, high_factor=0.1, order=3, sampling_freq=30000) standarize_op = PipedTransformation(standarize, 'standarized.bin', mode='single_channel_one_batch', keep=True, sampling_freq=30000) pipeline.add([butterworth_op, standarize_op]) pipeline.run() raw = RecordingsReader(path_to_neuropixel_data, dtype='int16', n_channels=385, data_format='wide') filtered = RecordingsReader(os.path.join(path_output, 'filtered.bin')) standarized = RecordingsReader(os.path.join(path_output, 'standarized.bin')) # plot results fig, (ax1, ax2, ax3) = plt.subplots(3, 1) ax1.plot(raw[:2000, 0]) ax1.set_title('Raw data') ax2.plot(filtered[:2000, 0]) ax2.set_title('Filtered data') ax3.plot(standarized[:2000, 0]) ax3.set_title('Standarized data') plt.tight_layout() plt.show()
-
class
yass.batch.
PipedTransformation
(function, output_name, mode, keep=False, if_file_exists='overwrite', cast_dtype=None, **kwargs)[source]¶ Wrapper for functions run with BatchPipeline
Parameters: - function: function
Function to apply
- output_name: str
Name of the file for the output
- mode: str
Operation mode, one of ‘single_channel_one_batch’ (every batch are all observations from a single channel), ‘single_channel’ (every batch are observations from a single channel, but can be splitted in several batches to avoid exceeding max_memory) or ‘multi_channel’ (every batch has observations for every channel selected, batches are splitted not to exceed max_memory)
- keep: bool, optional
Whether to keep the results from this step, otherwise the file is deleted after the next transformation is done
- if_file_exists: str, optional
One of ‘overwrite’, ‘abort’, ‘skip’. If ‘overwrite’ it replaces the file if it exists, if ‘abort’ if raise a ValueError exception if the file exists, if ‘skip’ if skips the operation if the file exists. Only valid when mode = ‘disk’
- cast_dtype: str, optional
Output dtype, defaults to None which means no cast is done
- **kwargs
Function kwargs
Recordings Reader¶
-
class
yass.batch.
RecordingsReader
(path_to_recordings, dtype=None, n_channels=None, data_order=None, loader='memmap', buffer_size=0, return_data_index=False)[source]¶ Neural recordings reader. If a file with the same name but yaml extension exists in the directory it looks for dtype, channels and data_order, otherwise you need to pass the parameters in the constructor
Parameters: - path_to_recordings: str
Path to recordings file
- dtype: str
Numpy dtype
- n_channels: int
Number of channels
- data_order: str
Recordings order, one of (‘channels’, ‘samples’). In a dataset with k observations per channel and j channels: ‘channels’ means first k contiguous observations come from channel 0, then channel 1, and so on. ‘sample’ means first j contiguous data are the first observations from all channels, then the second observations from all channels and so on
- loader: str (‘memmap’, ‘array’ or ‘python’), optional
How to load the data. memmap loads the data using a wrapper around np.memmap (see
MemoryMap
for details), ‘array’ using numpy.fromfile and ‘python’ loads it using a wrapper around Python file API. Defaults to ‘python’. Beware that the Python loader has limited indexing capabilities, seeBinaryReader
for details- buffer_size: int, optional
Adds buffer
- return_data_index: bool, optional
If True, a tuple will be returned when indexing: the first element will be the data and the second the index corresponding to the actual data (excluding bufffer), when buffer is equal to zero, this just returns they original index since there is no buffer
Raises: - ValueError
If dimensions do not match according to the file size, dtype and number of channels
Notes
This is just an utility class to index binary files in a consistent way, it does not matter the order of the file (‘channels’ or ‘samples’), indexing is performed in [observations, channels] format. This class is mainly used by other internal YASS classes to maintain a consistent indexing order.
Examples
# coding: utf-8 # See notebook: # https://github.com/paninski-lab/yass-examples/blob/master/batch/reader.ipynb """ Reading large files with RecordingsReader """ import os import numpy as np from yass.batch import RecordingsReader # generate data output_folder = os.path.join(os.path.expanduser('~'), 'data/yass') wide_data = np.random.rand(50, 100000) long_data = wide_data.T path_to_wide = os.path.join(output_folder, 'wide.bin') path_to_long = os.path.join(output_folder, 'long.bin') wide_data.tofile(path_to_wide) long_data.tofile(path_to_long) # load the files using the readers, they are agnostic on the data shape # and will behave exactly the same reader_wide = RecordingsReader(path_to_wide, dtype='float64', n_channels=50, data_order='channels') reader_long = RecordingsReader(path_to_long, dtype='float64', n_channels=50, data_order='samples') reader_wide.shape, reader_long.shape # first index is for observations and second index for channels obs = reader_wide[10000:20000, 20:30] obs, obs.shape # same applies even if your data is in 'wide' shape, first index for # observations, second for channels, the output is converted to 'long' # by default but you can change it obs = reader_long[10000:20000, 20:30] obs, obs.shape
-
channels
¶ Number of channels
-
data
¶ Underlying numpy data
-
data_order
¶ Data order
-
dtype
¶ Numpy’s dtype
-
observations
¶ Number of observations
-
shape
¶ Data shape in (observations, channels) format
Binary Readers¶
-
class
yass.batch.
BinaryReader
(path_to_file, dtype, shape, order='F')[source]¶ Reading batches from large array binary files on disk, similar to numpy.memmap. It is essentially just a wrapper around Python files API to read through large array binary file using the array[:,:] syntax.
Parameters: - order: str
Array order ‘C’ for ‘Row-major order’ or ‘F’ for ‘Column-major order’
Notes