"""
Functions for parsing geometry data
"""
import logging
import numpy as np
from scipy.spatial.distance import pdist, squareform
def _parse_txt(path, n_channels):
"""Parse a geometry file in txt format
"""
f = open(path)
lines = f.readlines()
f.close()
geom = np.zeros((0, 2))
for i, line in zip(range(n_channels), lines):
line = line.replace('\r', '')
line = line.replace('\n', '')
row = line.split(' ')
geom = np.vstack((geom, row[:2])).astype('float')
return geom
[docs]def parse(path, n_channels):
"""
Parse a geometry txt (one x, y pair per line, separated
by spaces) or a npy file with shape (n_channels, 2), where every row
contains a x, y pair
path: str
Path to geometry file
n_channels: int
Number of channels
Returns
-------
numpy.ndarray
2-dimensional numpy array where each row contains the x, y coordinates
for a channel
Examples
--------
.. code-block:: python
from yass import geometry
geom = geometry.parse('path/to/geom.npy', n_channels=500)
geom = geometry.parse('path/to/geom.txt', n_channels=500)
"""
# TODO: infer the number of channels by the number of lines
extension = path.split('.')[-1]
if extension == 'txt':
geom = _parse_txt(path, n_channels)
elif extension == 'npy':
geom = np.load(path)
else:
raise ValueError('Invalid file: {} extension not supported'
.format(extension))
read_channels, _ = geom.shape
if read_channels != n_channels:
raise ValueError('Expected {} channels, but read {}'
.format(n_channels, read_channels))
return geom
[docs]def find_channel_neighbors(geom, radius):
"""Compute a neighbors matrix by using a radius
Parameters
----------
geom: np.array
Array with the cartesian coordinates for the channels
radius: float
Maximum radius for the channels to be considered neighbors
Returns
-------
numpy.ndarray (n_channels, n_channels)
Symmetric boolean matrix with the i, j as True if the ith and jth
channels are considered neighbors
"""
return (squareform(pdist(geom)) <= radius)
[docs]def n_steps_neigh_channels(neighbors_matrix, steps):
"""Compute a neighbors matrix by considering neighbors of neighbors
Parameters
----------
neighbors_matrix: numpy.ndarray
Neighbors matrix
steps: int
Number of steps to still consider channels as neighbors
Returns
-------
numpy.ndarray (n_channels, n_channels)
Symmetric boolean matrix with the i, j as True if the ith and jth
channels are considered neighbors
"""
C = neighbors_matrix.shape[0]
# each channel is its own neighbor (diagonal of trues)
output = np.eye(C, dtype='bool')
# for every step
for _ in range(steps):
# go trough every channel
for current in range(C):
# neighbors of the current channel
neighbors_current = output[current]
# get the neighbors of all the neighbors of the current channel
neighbors_of_neighbors = neighbors_matrix[neighbors_current]
# sub over rows and convert to bool, this will turn to true entries
# where at least one of the neighbors has each channel as its
# neighbor
is_neighbor_of_neighbor = np.sum(neighbors_of_neighbors,
axis=0).astype('bool')
# set the channels that are neighbors to true
output[current][is_neighbor_of_neighbor] = True
return output
# TODO: add documentation
# TODO: remove n_channels, we can infer it from neighbors or geom
[docs]def make_channel_groups(n_channels, neighbors, geom):
"""[DESCRIPTION]
Parameters
----------
n_channels: int
Number of channels
neighbors: numpy.ndarray
Neighbors matrix
geom: numpy.ndarray
geometry matrix
Returns
-------
list
List of channel groups based on [?]
"""
channel_groups = list()
c_left = np.array(range(n_channels))
neighChan_temp = np.array(neighbors)
while len(c_left) > 0:
c_tops = c_left[geom[c_left, 1] == np.max(geom[c_left, 1])]
c_topleft = c_tops[np.argmin(geom[c_tops, 0])]
c_group = np.where(
np.sum(neighChan_temp[neighChan_temp[c_topleft]], 0))[0]
neighChan_temp[c_group, :] = 0
neighChan_temp[:, c_group] = 0
for c in c_group:
c_left = np.delete(c_left, int(np.where(c_left == c)[0]))
channel_groups.append(c_group)
return channel_groups
[docs]def order_channels_by_distance(reference, channels, geom):
"""Order channels by distance using certain channel as reference
Parameters
----------
reference: int
Reference channel
channels: np.ndarray
Channels to order
geom
Geometry matrix
Returns
-------
numpy.ndarray
1D array with the channels ordered by distance using the reference
channels
numpy.ndarray
1D array with the indexes for the ordered channels
"""
coord_main = geom[reference]
coord_others = geom[channels]
idx = np.argsort(np.sum(np.square(coord_others - coord_main), axis=1))
return channels[idx], idx
[docs]def make_channel_index(neighbors, channel_geometry, steps=1):
"""
Compute an array whose whose ith row contains the ordered
(by distance) neighbors for the ith channel
"""
C, C2 = neighbors.shape
if C != C2:
raise ValueError('neighbors is not a square matrix, verify')
# get neighbors matrix
neighbors = n_steps_neigh_channels(neighbors, steps=steps)
# max number of neighbors for all channels
n_neighbors = np.max(np.sum(neighbors, 0))
# FIXME: we are using C as a dummy value which is confusing, it may
# be better to use something else, maybe np.nan
# initialize channel index, initially with a dummy C value (a channel)
# that does not exists
channel_index = np.ones((C, n_neighbors), 'int32') * C
# fill every row in the matrix (one per channel)
for current in range(C):
# indexes of current channel neighbors
neighbor_channels = np.where(neighbors[current])[0]
# sort them by distance
ch_idx, _ = order_channels_by_distance(current, neighbor_channels,
channel_geometry)
# fill entries with the sorted neighbor indexes
channel_index[current, :ch_idx.shape[0]] = ch_idx
return channel_index
# def random_channel_with_max_neighbors(channel_index):
# logger = logging.getLogger(__name__)
# channel_n_neighbors = np.sum(neighbors, 0)
# max_neighbors = np.max(channel_n_neighbors)
# channels_with_max_neighbors = np.where(channel_n_neighbors
# == max_neighbors)[0]
# logger.debug('The following channels have %i neighbors: %s',
# max_neighbors, channels_with_max_neighbors)
# # reference channel: channel with max number of neighbors
# channel_selected = np.random.choice(channels_with_max_neighbors)
# logger.debug('Selected channel %i', channel_selected)
# # neighbors for the reference channel
# channel_neighbors = np.where(neighbors[channel_selected])[0]
# # ordered neighbors for reference channel
# channel_idx, _ = order_channels_by_distance(channel_selected,
# channel_neighbors,
# geom)