import logging
import os
from functools import partial
from yass.batch.batch import BatchProcessor
[docs]class BatchPipeline(object):
"""Chain batch operations and write partial results to disk
Parameters
----------
path_to_input: str
Path to input file
dtype: str
Numpy dtype
n_channels: int
Number of channels
data_order: str
Recordings order, one of ('channels', 'samples'). In a dataset with k
observations per channel and j channels: 'channels' means first k
contiguous observations come from channel 0, then channel 1, and so
on. 'sample' means first j contiguous data are the first observations
from all channels, then the second observations from all channels and
so on
max_memory: int or str
Max memory to use in each batch, interpreted as bytes if int,
if string, it can be any of {N}KB, {N}MB or {N}GB
output_path: str
Folder indicating where to store the files from every step
from_time: int, optional
Starting time, defaults to None, which means start from time 0
to_time: int, optional
Ending time, defaults to None, which means end at the last observation
channels: int, tuple or str, optional
A tuple with the channel indexes or 'all' to traverse all channels,
defaults to 'all'
Examples
--------
.. literalinclude:: ../../examples/batch/pipeline.py
"""
def __init__(self, path_to_input, dtype, n_channels, data_order,
max_memory, output_path, from_time=None, to_time=None,
channels='all'):
self.path_to_input = path_to_input
self.dtype = dtype
self.n_channels = n_channels
self.data_order = data_order
self.max_memory = max_memory
self.from_time = from_time
self.to_time = to_time
self.channels = channels
self.output_path = output_path
self.tasks = []
self.logger = logging.getLogger(__name__)
def add(self, tasks):
self.tasks.extend(tasks)
[docs] def run(self):
"""Run all tasks in the pipeline
Returns
-------
list
List with path to output files in the order they were run, if
keep is False, path is still returned but file will not exist
list
List with parameters
"""
path_to_input = self.path_to_input
bp = BatchProcessor(path_to_input, self.dtype,
self.n_channels,
self.data_order, self.max_memory)
output_paths = []
params = []
while self.tasks:
task = self.tasks.pop(0)
output_path = os.path.join(self.output_path, task.output_name)
output_paths.append(output_path)
if task.mode == 'single_channel_one_batch':
fn = partial(bp.single_channel_apply,
force_complete_channel_batch=True)
elif task.mode == 'single_channel':
fn = partial(bp.single_channel_apply,
force_complete_channel_batch=False)
elif task.mode == 'multi_channel':
fn = bp.multi_channel_apply
else:
raise ValueError("Invalid mode {}".format(task.mode))
_, p = fn(function=task.function,
output_path=output_path,
mode='disk',
from_time=self.from_time,
to_time=self.to_time,
channels=self.channels,
if_file_exists=task.if_file_exists,
cast_dtype=task.cast_dtype,
**task.kwargs)
params.append(p)
# update bp
bp = BatchProcessor(output_path, p['dtype'],
p['n_channels'], p['data_order'],
self.max_memory)
# delete the result if needed
if not task.keep:
self.logger.debug('Removing {}'.format(path_to_input))
os.remove(path_to_input)
# update path to input
path_to_input = output_path
return output_paths, params