"""
Copright © 2023 Howard Hughes Medical Institute, Authored by Carsen Stringer and Marius Pachitariu.
"""
import os, warnings, time, tempfile, datetime, pathlib, shutil, subprocess
from tqdm import tqdm
from urllib.request import urlopen
from urllib.parse import urlparse
import cv2
from scipy.ndimage import find_objects, gaussian_filter, generate_binary_structure, label, maximum_filter1d, binary_fill_holes
from scipy.spatial import ConvexHull
from scipy.stats import gmean
import numpy as np
import colorsys
import io
from multiprocessing import Pool, cpu_count
from . import metrics
try:
from skimage.morphology import remove_small_holes
SKIMAGE_ENABLED = True
except:
SKIMAGE_ENABLED = False
[docs]class TqdmToLogger(io.StringIO):
"""
Output stream for TQDM which will output to logger module instead of
the StdOut.
"""
logger = None
level = None
buf = ''
def __init__(self,logger,level=None):
super(TqdmToLogger, self).__init__()
self.logger = logger
self.level = level or logging.INFO
[docs] def write(self,buf):
self.buf = buf.strip('\r\n\t ')
[docs] def flush(self):
self.logger.log(self.level, self.buf)
def rgb_to_hsv(arr):
rgb_to_hsv_channels = np.vectorize(colorsys.rgb_to_hsv)
r, g, b = np.rollaxis(arr, axis=-1)
h, s, v = rgb_to_hsv_channels(r, g, b)
hsv = np.stack((h,s,v), axis=-1)
return hsv
def hsv_to_rgb(arr):
hsv_to_rgb_channels = np.vectorize(colorsys.hsv_to_rgb)
h, s, v = np.rollaxis(arr, axis=-1)
r, g, b = hsv_to_rgb_channels(h, s, v)
rgb = np.stack((r,g,b), axis=-1)
return rgb
[docs]def download_url_to_file(url, dst, progress=True):
r"""Download object at the given URL to a local path.
Thanks to torch, slightly modified
Args:
url (string): URL of the object to download
dst (string): Full path where object will be saved, e.g. `/tmp/temporary_file`
progress (bool, optional): whether or not to display a progress bar to stderr
Default: True
"""
file_size = None
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
u = urlopen(url)
meta = u.info()
if hasattr(meta, 'getheaders'):
content_length = meta.getheaders("Content-Length")
else:
content_length = meta.get_all("Content-Length")
if content_length is not None and len(content_length) > 0:
file_size = int(content_length[0])
# We deliberately save it in a temp file and move it after
dst = os.path.expanduser(dst)
dst_dir = os.path.dirname(dst)
f = tempfile.NamedTemporaryFile(delete=False, dir=dst_dir)
try:
with tqdm(total=file_size, disable=not progress,
unit='B', unit_scale=True, unit_divisor=1024) as pbar:
while True:
buffer = u.read(8192)
if len(buffer) == 0:
break
f.write(buffer)
pbar.update(len(buffer))
f.close()
shutil.move(f.name, dst)
finally:
f.close()
if os.path.exists(f.name):
os.remove(f.name)
[docs]def distance_to_boundary(masks):
""" get distance to boundary of mask pixels
Parameters
----------------
masks: int, 2D or 3D array
size [Ly x Lx] or [Lz x Ly x Lx], 0=NO masks; 1,2,...=mask labels
Returns
----------------
dist_to_bound: 2D or 3D array
size [Ly x Lx] or [Lz x Ly x Lx]
"""
if masks.ndim > 3 or masks.ndim < 2:
raise ValueError('distance_to_boundary takes 2D or 3D array, not %dD array'%masks.ndim)
dist_to_bound = np.zeros(masks.shape, np.float64)
if masks.ndim==3:
for i in range(masks.shape[0]):
dist_to_bound[i] = distance_to_boundary(masks[i])
return dist_to_bound
else:
slices = find_objects(masks)
for i,si in enumerate(slices):
if si is not None:
sr,sc = si
mask = (masks[sr, sc] == (i+1)).astype(np.uint8)
contours = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
pvc, pvr = np.concatenate(contours[-2], axis=0).squeeze().T
ypix, xpix = np.nonzero(mask)
min_dist = ((ypix[:,np.newaxis] - pvr)**2 +
(xpix[:,np.newaxis] - pvc)**2).min(axis=1)
dist_to_bound[ypix + sr.start, xpix + sc.start] = min_dist
return dist_to_bound
[docs]def masks_to_edges(masks, threshold=1.0):
""" get edges of masks as a 0-1 array
Parameters
----------------
masks: int, 2D or 3D array
size [Ly x Lx] or [Lz x Ly x Lx], 0=NO masks; 1,2,...=mask labels
Returns
----------------
edges: 2D or 3D array
size [Ly x Lx] or [Lz x Ly x Lx], True pixels are edge pixels
"""
dist_to_bound = distance_to_boundary(masks)
edges = (dist_to_bound < threshold) * (masks > 0)
return edges
[docs]def remove_edge_masks(masks, change_index=True):
""" remove masks with pixels on edge of image
Parameters
----------------
masks: int, 2D or 3D array
size [Ly x Lx] or [Lz x Ly x Lx], 0=NO masks; 1,2,...=mask labels
change_index: bool (optional, default True)
if True, after removing masks change indexing so no missing label numbers
Returns
----------------
outlines: 2D or 3D array
size [Ly x Lx] or [Lz x Ly x Lx], 0=NO masks; 1,2,...=mask labels
"""
slices = find_objects(masks.astype(int))
for i,si in enumerate(slices):
remove = False
if si is not None:
for d,sid in enumerate(si):
if sid.start==0 or sid.stop==masks.shape[d]:
remove=True
break
if remove:
masks[si][masks[si]==i+1] = 0
shape = masks.shape
if change_index:
_,masks = np.unique(masks, return_inverse=True)
masks = np.reshape(masks, shape).astype(np.int32)
return masks
[docs]def masks_to_outlines(masks):
""" get outlines of masks as a 0-1 array
Parameters
----------------
masks: int, 2D or 3D array
size [Ly x Lx] or [Lz x Ly x Lx], 0=NO masks; 1,2,...=mask labels
Returns
----------------
outlines: 2D or 3D array
size [Ly x Lx] or [Lz x Ly x Lx], True pixels are outlines
"""
if masks.ndim > 3 or masks.ndim < 2:
raise ValueError('masks_to_outlines takes 2D or 3D array, not %dD array'%masks.ndim)
outlines = np.zeros(masks.shape, bool)
if masks.ndim==3:
for i in range(masks.shape[0]):
outlines[i] = masks_to_outlines(masks[i])
return outlines
else:
slices = find_objects(masks.astype(int))
for i,si in enumerate(slices):
if si is not None:
sr,sc = si
mask = (masks[sr, sc] == (i+1)).astype(np.uint8)
contours = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
pvc, pvr = np.concatenate(contours[-2], axis=0).squeeze().T
vr, vc = pvr + sr.start, pvc + sc.start
outlines[vr, vc] = 1
return outlines
[docs]def outlines_list(masks, multiprocessing=True):
""" get outlines of masks as a list to loop over for plotting
This function is a wrapper for outlines_list_single and outlines_list_multi """
if multiprocessing:
return outlines_list_multi(masks)
else:
return outlines_list_single(masks)
[docs]def outlines_list_single(masks):
""" get outlines of masks as a list to loop over for plotting """
outpix=[]
for n in np.unique(masks)[1:]:
mn = masks==n
if mn.sum() > 0:
contours = cv2.findContours(mn.astype(np.uint8), mode=cv2.RETR_EXTERNAL, method=cv2.CHAIN_APPROX_NONE)
contours = contours[-2]
cmax = np.argmax([c.shape[0] for c in contours])
pix = contours[cmax].astype(int).squeeze()
if len(pix)>4:
outpix.append(pix)
else:
outpix.append(np.zeros((0,2)))
return outpix
[docs]def outlines_list_multi(masks, num_processes=None):
""" get outlines of masks as a list to loop over for plotting """
if num_processes is None:
num_processes = cpu_count()
unique_masks = np.unique(masks)[1:]
with Pool(processes=num_processes) as pool:
outpix = pool.map(get_outline_multi, [(masks, n) for n in unique_masks])
return outpix
def get_outline_multi(args):
masks, n = args
mn = masks == n
if mn.sum() > 0:
contours = cv2.findContours(mn.astype(np.uint8), mode=cv2.RETR_EXTERNAL, method=cv2.CHAIN_APPROX_NONE)
contours = contours[-2]
cmax = np.argmax([c.shape[0] for c in contours])
pix = contours[cmax].astype(int).squeeze()
return pix if len(pix) > 4 else np.zeros((0, 2))
return np.zeros((0, 2))
[docs]def get_perimeter(points):
""" perimeter of points - npoints x ndim """
if points.shape[0]>4:
points = np.append(points, points[:1], axis=0)
return ((np.diff(points, axis=0)**2).sum(axis=1)**0.5).sum()
else:
return 0
def get_mask_compactness(masks):
perimeters = get_mask_perimeters(masks)
#outlines = masks_to_outlines(masks)
#perimeters = np.unique(outlines*masks, return_counts=True)[1][1:]
npoints = np.unique(masks, return_counts=True)[1][1:]
areas = npoints
compactness = 4 * np.pi * areas / perimeters**2
compactness[perimeters==0] = 0
compactness[compactness>1.0] = 1.0
return compactness
[docs]def get_mask_perimeters(masks):
""" get perimeters of masks """
perimeters = np.zeros(masks.max())
for n in range(masks.max()):
mn = masks==(n+1)
if mn.sum() > 0:
contours = cv2.findContours(mn.astype(np.uint8), mode=cv2.RETR_EXTERNAL,
method=cv2.CHAIN_APPROX_NONE)[-2]
#cmax = np.argmax([c.shape[0] for c in contours])
#perimeters[n] = get_perimeter(contours[cmax].astype(int).squeeze())
perimeters[n] = np.array([get_perimeter(c.astype(int).squeeze()) for c in contours]).sum()
return perimeters
[docs]def circleMask(d0):
""" creates array with indices which are the radius of that x,y point
inputs:
d0 (patch of (-d0,d0+1) over which radius computed
outputs:
rs: array (2*d0+1,2*d0+1) of radii
dx,dy: indices of patch
"""
dx = np.tile(np.arange(-d0[1],d0[1]+1), (2*d0[0]+1,1))
dy = np.tile(np.arange(-d0[0],d0[0]+1), (2*d0[1]+1,1))
dy = dy.transpose()
rs = (dy**2 + dx**2) ** 0.5
return rs, dx, dy
def get_mask_stats(masks_true):
mask_perimeters = get_mask_perimeters(masks_true)
# disk for compactness
rs,dy,dx = circleMask(np.array([100, 100]))
rsort = np.sort(rs.flatten())
# area for solidity
npoints = np.unique(masks_true, return_counts=True)[1][1:]
areas = npoints - mask_perimeters / 2 - 1
compactness = np.zeros(masks_true.max())
convexity = np.zeros(masks_true.max())
solidity = np.zeros(masks_true.max())
convex_perimeters = np.zeros(masks_true.max())
convex_areas = np.zeros(masks_true.max())
for ic in range(masks_true.max()):
points = np.array(np.nonzero(masks_true==(ic+1))).T
if len(points)>15 and mask_perimeters[ic] > 0:
med = np.median(points, axis=0)
# compute compactness of ROI
r2 = ((points - med)**2).sum(axis=1)**0.5
compactness[ic] = (rsort[:r2.size].mean() + 1e-10) / r2.mean()
try:
hull = ConvexHull(points)
convex_perimeters[ic] = hull.area
convex_areas[ic] = hull.volume
except:
convex_perimeters[ic] = 0
convexity[mask_perimeters>0.0] = (convex_perimeters[mask_perimeters>0.0] /
mask_perimeters[mask_perimeters>0.0])
solidity[convex_areas>0.0] = (areas[convex_areas>0.0] /
convex_areas[convex_areas>0.0])
convexity = np.clip(convexity, 0.0, 1.0)
solidity = np.clip(solidity, 0.0, 1.0)
compactness = np.clip(compactness, 0.0, 1.0)
return convexity, solidity, compactness
[docs]def get_masks_unet(output, cell_threshold=0, boundary_threshold=0):
""" create masks using cell probability and cell boundary """
cells = (output[...,1] - output[...,0])>cell_threshold
selem = generate_binary_structure(cells.ndim, connectivity=1)
labels, nlabels = label(cells, selem)
if output.shape[-1]>2:
slices = find_objects(labels)
dists = 10000*np.ones(labels.shape, np.float32)
mins = np.zeros(labels.shape, np.int32)
borders = np.logical_and(~(labels>0), output[...,2]>boundary_threshold)
pad = 10
for i,slc in enumerate(slices):
if slc is not None:
slc_pad = tuple([slice(max(0,sli.start-pad), min(labels.shape[j], sli.stop+pad))
for j,sli in enumerate(slc)])
msk = (labels[slc_pad] == (i+1)).astype(np.float32)
msk = 1 - gaussian_filter(msk, 5)
dists[slc_pad] = np.minimum(dists[slc_pad], msk)
mins[slc_pad][dists[slc_pad]==msk] = (i+1)
labels[labels==0] = borders[labels==0] * mins[labels==0]
masks = labels
shape0 = masks.shape
_,masks = np.unique(masks, return_inverse=True)
masks = np.reshape(masks, shape0)
return masks
[docs]def stitch3D(masks, stitch_threshold=0.25):
""" stitch 2D masks into 3D volume with stitch_threshold on IOU """
mmax = masks[0].max()
empty = 0
for i in range(len(masks)-1):
iou = metrics._intersection_over_union(masks[i+1], masks[i])[1:,1:]
if not iou.size and empty == 0:
masks[i+1] = masks[i+1]
mmax = masks[i+1].max()
elif not iou.size and not empty == 0:
icount = masks[i+1].max()
istitch = np.arange(mmax+1, mmax + icount+1, 1, int)
mmax += icount
istitch = np.append(np.array(0), istitch)
masks[i+1] = istitch[masks[i+1]]
else:
iou[iou < stitch_threshold] = 0.0
iou[iou < iou.max(axis=0)] = 0.0
istitch = iou.argmax(axis=1) + 1
ino = np.nonzero(iou.max(axis=1)==0.0)[0]
istitch[ino] = np.arange(mmax+1, mmax+len(ino)+1, 1, int)
mmax += len(ino)
istitch = np.append(np.array(0), istitch)
masks[i+1] = istitch[masks[i+1]]
empty = 1
return masks
def diameters(masks):
_, counts = np.unique(np.int32(masks), return_counts=True)
counts = counts[1:]
md = np.median(counts**0.5)
if np.isnan(md):
md = 0
md /= (np.pi**0.5)/2
return md, counts**0.5
def radius_distribution(masks, bins):
unique, counts = np.unique(masks, return_counts=True)
counts = counts[unique!=0]
nb, _ = np.histogram((counts**0.5)*0.5, bins)
nb = nb.astype(np.float32)
if nb.sum() > 0:
nb = nb / nb.sum()
md = np.median(counts**0.5)*0.5
if np.isnan(md):
md = 0
md /= (np.pi**0.5)/2
return nb, md, (counts**0.5)/2
def size_distribution(masks):
counts = np.unique(masks, return_counts=True)[1][1:]
return np.percentile(counts, 25) / np.percentile(counts, 75)
def process_cells(M0, npix=20):
unq, ic = np.unique(M0, return_counts=True)
for j in range(len(unq)):
if ic[j]<npix:
M0[M0==unq[j]] = 0
return M0
[docs]def fill_holes_and_remove_small_masks(masks, min_size=15):
""" fill holes in masks (2D/3D) and discard masks smaller than min_size (2D)
fill holes in each mask using scipy.ndimage.morphology.binary_fill_holes
(might have issues at borders between cells, todo: check and fix)
Parameters
----------------
masks: int, 2D or 3D array
labelled masks, 0=NO masks; 1,2,...=mask labels,
size [Ly x Lx] or [Lz x Ly x Lx]
min_size: int (optional, default 15)
minimum number of pixels per mask, can turn off with -1
Returns
---------------
masks: int, 2D or 3D array
masks with holes filled and masks smaller than min_size removed,
0=NO masks; 1,2,...=mask labels,
size [Ly x Lx] or [Lz x Ly x Lx]
"""
if masks.ndim > 3 or masks.ndim < 2:
raise ValueError('masks_to_outlines takes 2D or 3D array, not %dD array'%masks.ndim)
slices = find_objects(masks)
j = 0
for i,slc in enumerate(slices):
if slc is not None:
msk = masks[slc] == (i+1)
npix = msk.sum()
if min_size > 0 and npix < min_size:
masks[slc][msk] = 0
elif npix > 0:
if msk.ndim==3:
for k in range(msk.shape[0]):
msk[k] = binary_fill_holes(msk[k])
else:
msk = binary_fill_holes(msk)
masks[slc][msk] = (j+1)
j+=1
return masks