__init__.py 22 KB


  1. """
  2. Python Library to Read and Write Surface Files in Freesurfer's TriangularSurface Format
  3. compatible with Freesurfer's MRISwriteTriangularSurface()
  4. https://github.com/freesurfer/freesurfer/blob/release_6_0_0/include/mrisurf.h#L1281
  5. https://github.com/freesurfer/freesurfer/blob/release_6_0_0/utils/mrisurf.c
  6. https://raw.githubusercontent.com/freesurfer/freesurfer/release_6_0_0/utils/mrisurf.c
  7. Freesurfer
  8. https://surfer.nmr.mgh.harvard.edu/
  9. Edit Surface File
  10. >>> from freesurfer_surface import Surface, Vertex, Triangle
  11. >>>
  12. >>> surface = Surface.read_triangular('bert/surf/lh.pial'))
  13. >>>
  14. >>> vertex_a = surface.add_vertex(Vertex(0.0, 0.0, 0.0))
  15. >>> vertex_b = surface.add_vertex(Vertex(1.0, 1.0, 1.0))
  16. >>> vertex_c = surface.add_vertex(Vertex(2.0, 2.0, 2.0))
  17. >>> surface.triangles.append(Triangle((vertex_a, vertex_b, vertex_c)))
  18. >>>
  19. >>> surface.write_triangular('somewhere/else/lh.pial')
  20. List Labels in Annotation File
  21. >>> from freesurfer_surface import Annotation
  22. >>>
  23. >>> annotation = Annotation.read('bert/label/lh.aparc.annot')
  24. >>> for label in annotation.labels.values():
  25. >>> print(label.index, label.hex_color_code, label.name)
  26. Find Border of Labelled Region
  27. >>> surface = Surface.read_triangular('bert/surf/lh.pial'))
  28. >>> surface.load_annotation_file('bert/label/lh.aparc.annot')
  29. >>> region, = filter(lambda l: l.name == 'precentral',
  30. >>> annotation.labels.values())
  31. >>> print(surface.find_label_border_polygonal_chains(region))
  32. """
  33. import collections
  34. import contextlib
  35. import copy
  36. import datetime
  37. import itertools
  38. import locale
  39. import re
  40. import struct
  41. import typing
  42. import numpy
  43. try:
  44. from freesurfer_surface.version import __version__
  45. except ImportError: # ModuleNotFoundError not available in python<3.6
  46. # package is not installed
  47. __version__ = None
  48. class UnsupportedLocaleSettingError(locale.Error):
  49. pass
  50. @contextlib.contextmanager
  51. def setlocale(temporary_locale):
  52. primary_locale = locale.setlocale(locale.LC_ALL)
  53. try:
  54. yield locale.setlocale(locale.LC_ALL, temporary_locale)
  55. except locale.Error as exc:
  56. if str(exc) == 'unsupported locale setting':
  57. raise UnsupportedLocaleSettingError(temporary_locale)
  58. raise exc # pragma: no cover
  59. finally:
  60. locale.setlocale(locale.LC_ALL, primary_locale)
  61. class Vertex(numpy.ndarray):
  62. def __new__(cls, right: float, anterior: float, superior: float):
  63. return numpy.array((right, anterior, superior),
  64. dtype=float).view(cls)
  65. @property
  66. def right(self) -> float:
  67. return self[0]
  68. @property
  69. def anterior(self) -> float:
  70. return self[1]
  71. @property
  72. def superior(self) -> float:
  73. return self[2]
  74. @property
  75. def __dict__(self) -> typing.Dict[str, float]:
  76. return {'right': self.right,
  77. 'anterior': self.anterior,
  78. 'superior': self.superior}
  79. def __format_coords(self) -> str:
  80. return ', '.join('{}={}'.format(name, getattr(self, name))
  81. for name in ['right', 'anterior', 'superior'])
  82. def __repr__(self) -> str:
  83. return '{}({})'.format(type(self).__name__, self.__format_coords())
  84. def distance_mm(self, others: typing.Union['Vertex',
  85. typing.Iterable['Vertex'],
  86. numpy.ndarray],
  87. ) -> numpy.ndarray:
  88. if isinstance(others, Vertex):
  89. others = others.reshape((1, 3))
  90. return numpy.linalg.norm(self - others, axis=1)
  91. class PolygonalCircuit:
  92. _VERTEX_INDICES_TYPE = typing.Tuple[int]
  93. def __init__(self, vertex_indices: _VERTEX_INDICES_TYPE):
  94. self._vertex_indices = tuple(vertex_indices)
  95. assert all(isinstance(idx, int) for idx in self._vertex_indices)
  96. @property
  97. def vertex_indices(self):
  98. return self._vertex_indices
  99. def _normalize(self) -> 'PolygonalCircuit':
  100. vertex_indices = collections.deque(self.vertex_indices)
  101. vertex_indices.rotate(-numpy.argmin(self.vertex_indices))
  102. if len(vertex_indices) > 2 and vertex_indices[-1] < vertex_indices[1]:
  103. vertex_indices.reverse()
  104. vertex_indices.rotate(1)
  105. return type(self)(vertex_indices)
  106. def __eq__(self, other: 'PolygonalCircuit') -> bool:
  107. # pylint: disable=protected-access
  108. return self._normalize().vertex_indices == other._normalize().vertex_indices
  109. def __hash__(self) -> int:
  110. # pylint: disable=protected-access
  111. return hash(self._normalize()._vertex_indices)
  112. def adjacent_vertex_indices(self, vertices_num: int = 2
  113. ) -> typing.Iterable[typing.Tuple[int]]:
  114. vertex_indices_cycle = list(itertools.islice(
  115. itertools.cycle(self.vertex_indices),
  116. 0,
  117. len(self.vertex_indices) + vertices_num - 1,
  118. ))
  119. return zip(*(itertools.islice(vertex_indices_cycle,
  120. offset,
  121. len(self.vertex_indices) + offset)
  122. for offset in range(vertices_num)))
  123. class LineSegment(PolygonalCircuit):
  124. def __init__(self, indices: PolygonalCircuit._VERTEX_INDICES_TYPE):
  125. super().__init__(indices)
  126. assert len(self.vertex_indices) == 2
  127. def __repr__(self) -> str:
  128. return 'LineSegment(vertex_indices={})'.format(self.vertex_indices)
  129. class Triangle(PolygonalCircuit):
  130. def __init__(self, indices: PolygonalCircuit._VERTEX_INDICES_TYPE):
  131. super().__init__(indices)
  132. assert len(self.vertex_indices) == 3
  133. def __repr__(self) -> str:
  134. return 'Triangle(vertex_indices={})'.format(self.vertex_indices)
  135. class PolygonalChainsNotOverlapingError(ValueError):
  136. pass
  137. class PolygonalChain:
  138. def __init__(self, vertex_indices: typing.Iterable[int]):
  139. self.vertex_indices \
  140. = collections.deque(vertex_indices) # type: Deque[int]
  141. def __eq__(self, other: 'PolygonalChain') -> bool:
  142. return self.vertex_indices == other.vertex_indices
  143. def __repr__(self) -> str:
  144. return 'PolygonalChain(vertex_indices={})'.format(tuple(self.vertex_indices))
  145. def connect(self, other: 'PolygonalChain') -> None:
  146. if self.vertex_indices[-1] == other.vertex_indices[0]:
  147. self.vertex_indices.pop()
  148. self.vertex_indices.extend(other.vertex_indices)
  149. elif self.vertex_indices[-1] == other.vertex_indices[-1]:
  150. self.vertex_indices.pop()
  151. self.vertex_indices.extend(reversed(other.vertex_indices))
  152. elif self.vertex_indices[0] == other.vertex_indices[0]:
  153. self.vertex_indices.popleft()
  154. self.vertex_indices.extendleft(other.vertex_indices)
  155. elif self.vertex_indices[0] == other.vertex_indices[-1]:
  156. self.vertex_indices.popleft()
  157. self.vertex_indices.extendleft(reversed(other.vertex_indices))
  158. else:
  159. raise PolygonalChainsNotOverlapingError()
  160. def adjacent_vertex_indices(self, vertices_num: int = 2
  161. ) -> typing.Iterable[typing.Tuple[int]]:
  162. return zip(*(itertools.islice(self.vertex_indices,
  163. offset,
  164. len(self.vertex_indices))
  165. for offset in range(vertices_num)))
  166. def segments(self) -> typing.Iterable[LineSegment]:
  167. return map(LineSegment, self.adjacent_vertex_indices(2))
  168. class Label:
  169. # pylint: disable=too-many-arguments
  170. def __init__(self, index: int, name: str, red: int,
  171. green: int, blue: int, transparency: int):
  172. self.index = index # type: int
  173. self.name = name # type: str
  174. self.red = red # type: int
  175. self.green = green # type: int
  176. self.blue = blue # type: int
  177. self.transparency = transparency # type: int
  178. @property
  179. def color_code(self) -> int:
  180. if self.index == 0: # unknown
  181. return 0
  182. return int.from_bytes((self.red, self.green, self.blue, self.transparency),
  183. byteorder='little', signed=False)
  184. @property
  185. def hex_color_code(self) -> str:
  186. return '#{:02x}{:02x}{:02x}'.format(self.red, self.green, self.blue)
  187. def __str__(self) -> str:
  188. return 'Label(name={}, index={}, color={})'.format(
  189. self.name, self.index, self.hex_color_code)
  190. def __repr__(self) -> str:
  191. return str(self)
  192. class Annotation:
  193. # pylint: disable=too-few-public-methods
  194. _TAG_OLD_COLORTABLE = b'\0\0\0\x01'
  195. def __init__(self):
  196. self.vertex_label_index = {} # type: Dict[int, int]
  197. self.colortable_path = None # type: Optional[bytes]
  198. self.labels = {} # type: Dict[int, Label]
  199. @staticmethod
  200. def _read_label(stream: typing.BinaryIO) -> Label:
  201. index, name_length = struct.unpack('>II', stream.read(4 * 2))
  202. name = stream.read(name_length - 1).decode()
  203. assert stream.read(1) == b'\0'
  204. red, green, blue, transparency \
  205. = struct.unpack('>IIII', stream.read(4 * 4))
  206. return Label(index=index, name=name, red=red, green=green,
  207. blue=blue, transparency=transparency)
  208. def _read(self, stream: typing.BinaryIO) -> None:
  209. # https://surfer.nmr.mgh.harvard.edu/fswiki/LabelsClutsAnnotationFiles
  210. annotations_num, = struct.unpack('>I', stream.read(4))
  211. annotations = [struct.unpack('>II', stream.read(4 * 2))
  212. for _ in range(annotations_num)]
  213. assert stream.read(4) == self._TAG_OLD_COLORTABLE
  214. colortable_version, _, filename_length \
  215. = struct.unpack('>III', stream.read(4 * 3))
  216. assert colortable_version > 0 # new version
  217. self.colortable_path = stream.read(filename_length - 1)
  218. assert stream.read(1) == b'\0'
  219. labels_num, = struct.unpack('>I', stream.read(4))
  220. self.labels = {label.index: label for label
  221. in (self._read_label(stream) for _ in range(labels_num))}
  222. label_index_by_color_code = {label.color_code: label.index
  223. for label in self.labels.values()}
  224. self.vertex_label_index = {vertex_index: label_index_by_color_code[color_code]
  225. for vertex_index, color_code in annotations}
  226. assert not stream.read(1)
  227. @classmethod
  228. def read(cls, annotation_file_path: str) -> 'Annotation':
  229. annotation = cls()
  230. with open(annotation_file_path, 'rb') as annotation_file:
  231. # pylint: disable=protected-access
  232. annotation._read(annotation_file)
  233. return annotation
  234. class Surface:
  235. # pylint: disable=too-many-instance-attributes
  236. _MAGIC_NUMBER = b'\xff\xff\xfe'
  237. _TAG_CMDLINE = b'\x00\x00\x00\x03'
  238. _TAG_OLD_SURF_GEOM = b'\x00\x00\x00\x14'
  239. _TAG_OLD_USEREALRAS = b'\x00\x00\x00\x02'
  240. _DATETIME_FORMAT = '%a %b %d %H:%M:%S %Y'
  241. def __init__(self):
  242. self.creator = None # type: Optional[bytes]
  243. self.creation_datetime = None # type: Optional[datetime.datetime]
  244. self.vertices = [] # type: List[Vertex]
  245. self.triangles = [] # type: List[Triangle]
  246. self.using_old_real_ras = False # type: bool
  247. self.volume_geometry_info = None # type: Optional[Tuple[bytes]]
  248. self.command_lines = [] # type: List[bytes]
  249. self.annotation = None # type: Optional[Annotation]
  250. @classmethod
  251. def _read_cmdlines(cls, stream: typing.BinaryIO) -> typing.Iterator[str]:
  252. while True:
  253. tag = stream.read(4)
  254. if not tag:
  255. return
  256. assert tag == cls._TAG_CMDLINE # might be TAG_GROUP_AVG_SURFACE_AREA
  257. # TAGwrite
  258. # https://github.com/freesurfer/freesurfer/blob/release_6_0_0/utils/tags.c#L94
  259. str_length, = struct.unpack('>Q', stream.read(8))
  260. yield stream.read(str_length - 1)
  261. assert stream.read(1) == b'\x00'
  262. def _read_triangular(self, stream: typing.BinaryIO):
  263. assert stream.read(3) == self._MAGIC_NUMBER
  264. self.creator, creation_dt_str = re.match(rb'^created by (\w+) on (.* \d{4})\n',
  265. stream.readline()).groups()
  266. with setlocale('C'):
  267. self.creation_datetime = datetime.datetime.strptime(creation_dt_str.decode(),
  268. self._DATETIME_FORMAT)
  269. assert stream.read(1) == b'\n'
  270. # fwriteInt
  271. # https://github.com/freesurfer/freesurfer/blob/release_6_0_0/utils/fio.c#L290
  272. vertices_num, triangles_num = struct.unpack('>II', stream.read(4 * 2))
  273. self.vertices = [Vertex(*struct.unpack('>fff', stream.read(4 * 3)))
  274. for _ in range(vertices_num)]
  275. self.triangles = [Triangle(struct.unpack('>III', stream.read(4 * 3)))
  276. for _ in range(triangles_num)]
  277. assert all(vertex_idx < vertices_num
  278. for triangle in self.triangles
  279. for vertex_idx in triangle.vertex_indices)
  280. assert stream.read(4) == self._TAG_OLD_USEREALRAS
  281. using_old_real_ras, = struct.unpack('>I', stream.read(4))
  282. assert using_old_real_ras in [0, 1], using_old_real_ras
  283. self.using_old_real_ras = bool(using_old_real_ras)
  284. assert stream.read(4) == self._TAG_OLD_SURF_GEOM
  285. # writeVolGeom
  286. # https://github.com/freesurfer/freesurfer/blob/release_6_0_0/utils/transform.c#L368
  287. self.volume_geometry_info = tuple(stream.readline() for _ in range(8))
  288. self.command_lines = list(self._read_cmdlines(stream))
  289. @classmethod
  290. def read_triangular(cls, surface_file_path: str) -> 'Surface':
  291. surface = cls()
  292. with open(surface_file_path, 'rb') as surface_file:
  293. # pylint: disable=protected-access
  294. surface._read_triangular(surface_file)
  295. return surface
  296. @classmethod
  297. def _triangular_strftime(cls, creation_datetime: datetime.datetime) -> bytes:
  298. padded_day = '{:>2}'.format(creation_datetime.day)
  299. fmt = cls._DATETIME_FORMAT.replace('%d', padded_day)
  300. with setlocale('C'):
  301. return creation_datetime.strftime(fmt).encode()
  302. def write_triangular(self, surface_file_path: str,
  303. creation_datetime: typing.Optional[datetime.datetime] = None):
  304. if creation_datetime is None:
  305. creation_datetime = datetime.datetime.now()
  306. with open(surface_file_path, 'wb') as surface_file:
  307. surface_file.write(
  308. self._MAGIC_NUMBER
  309. + b'created by ' + self.creator
  310. + b' on ' + self._triangular_strftime(creation_datetime)
  311. + b'\n\n'
  312. + struct.pack('>II', len(self.vertices), len(self.triangles))
  313. )
  314. for vertex in self.vertices:
  315. surface_file.write(struct.pack('>fff', *vertex))
  316. for triangle in self.triangles:
  317. assert all(vertex_index < len(self.vertices)
  318. for vertex_index in triangle.vertex_indices)
  319. surface_file.write(struct.pack('>III',
  320. *triangle.vertex_indices))
  321. surface_file.write(self._TAG_OLD_USEREALRAS
  322. + struct.pack('>I', 1 if self.using_old_real_ras else 0))
  323. surface_file.write(self._TAG_OLD_SURF_GEOM
  324. + b''.join(self.volume_geometry_info))
  325. for command_line in self.command_lines:
  326. surface_file.write(self._TAG_CMDLINE + struct.pack('>Q', len(command_line) + 1)
  327. + command_line + b'\0')
  328. def load_annotation_file(self, annotation_file_path: str) -> None:
  329. annotation = Annotation.read(annotation_file_path)
  330. assert len(annotation.vertex_label_index) <= len(self.vertices)
  331. assert max(annotation.vertex_label_index.keys()) < len(self.vertices)
  332. self.annotation = annotation
  333. def add_vertex(self, vertex: Vertex) -> int:
  334. self.vertices.append(vertex)
  335. return len(self.vertices) - 1
  336. def add_rectangle(self, vertex_indices: typing.Iterable[int]) -> typing.Iterable[int]:
  337. vertex_indices = list(vertex_indices)
  338. if len(vertex_indices) == 3:
  339. vertex_indices.append(self.add_vertex(
  340. self.vertices[vertex_indices[0]]
  341. + self.vertices[vertex_indices[2]]
  342. - self.vertices[vertex_indices[1]]
  343. ))
  344. assert len(vertex_indices) == 4
  345. self.triangles.append(Triangle(vertex_indices[:3]))
  346. self.triangles.append(Triangle(vertex_indices[2:]
  347. + vertex_indices[:1]))
  348. def _triangle_count_by_adjacent_vertex_indices(self) \
  349. -> typing.Dict[int, typing.Dict[int, int]]:
  350. counts = {vertex_index: collections.defaultdict(lambda: 0)
  351. for vertex_index in range(len(self.vertices))}
  352. for triangle in self.triangles:
  353. for vertex_index_pair in triangle.adjacent_vertex_indices(2):
  354. counts[vertex_index_pair[0]][vertex_index_pair[1]] += 1
  355. counts[vertex_index_pair[1]][vertex_index_pair[0]] += 1
  356. return counts
  357. def find_borders(self) -> typing.Iterator[PolygonalCircuit]:
  358. border_neighbours = {}
  359. for vertex_index, neighbour_counts \
  360. in self._triangle_count_by_adjacent_vertex_indices().items():
  361. if not neighbour_counts:
  362. yield PolygonalCircuit((vertex_index,))
  363. else:
  364. neighbours = [neighbour_index for neighbour_index, counts
  365. in neighbour_counts.items()
  366. if counts != 2]
  367. if neighbours:
  368. assert len(neighbours) % 2 == 0, \
  369. (vertex_index, neighbour_counts)
  370. border_neighbours[vertex_index] = neighbours
  371. while border_neighbours:
  372. vertex_index, neighbour_indices = border_neighbours.popitem()
  373. cycle_indices = [vertex_index]
  374. border_neighbours[vertex_index] = neighbour_indices[1:]
  375. vertex_index = neighbour_indices[0]
  376. while vertex_index != cycle_indices[0]:
  377. neighbour_indices = border_neighbours.pop(vertex_index)
  378. neighbour_indices.remove(cycle_indices[-1])
  379. cycle_indices.append(vertex_index)
  380. if len(neighbour_indices) > 1:
  381. border_neighbours[vertex_index] = neighbour_indices[1:]
  382. vertex_index = neighbour_indices[0]
  383. assert vertex_index in border_neighbours, \
  384. (vertex_index, cycle_indices, border_neighbours)
  385. final_neighbour_indices = border_neighbours.pop(vertex_index)
  386. assert final_neighbour_indices == [cycle_indices[-1]], \
  387. (vertex_index, final_neighbour_indices, cycle_indices)
  388. yield PolygonalCircuit(cycle_indices)
  389. def _get_vertex_label_index(self, vertex_index: int) -> typing.Optional[int]:
  390. return self.annotation.vertex_label_index.get(vertex_index, None)
  391. def _find_label_border_segments(self, label: Label) -> typing.Iterator[LineSegment]:
  392. for triangle in self.triangles:
  393. border_vertex_indices = tuple(filter(
  394. lambda i: self._get_vertex_label_index(i) == label.index,
  395. triangle.vertex_indices,
  396. ))
  397. if len(border_vertex_indices) == 2:
  398. yield LineSegment(border_vertex_indices)
  399. def find_label_border_polygonal_chains(self, label: Label) -> typing.Iterator[PolygonalChain]:
  400. segments = set(self._find_label_border_segments(label))
  401. available_chains = collections.deque(PolygonalChain(segment.vertex_indices)
  402. for segment in segments)
  403. # irrespective of its poor performance,
  404. # we keep this approach since it's easy to read and fast enough
  405. while available_chains:
  406. chain = available_chains.pop()
  407. last_chains_len = None
  408. while last_chains_len != len(available_chains):
  409. last_chains_len = len(available_chains)
  410. checked_chains = collections.deque()
  411. while available_chains:
  412. potential_neighbour = available_chains.pop()
  413. try:
  414. chain.connect(potential_neighbour)
  415. except PolygonalChainsNotOverlapingError:
  416. checked_chains.append(potential_neighbour)
  417. available_chains = checked_chains
  418. assert all((segment in segments) for segment in chain.segments())
  419. yield chain
  420. def _unused_vertices(self) -> typing.Set[int]:
  421. vertex_indices = set(range(len(self.vertices)))
  422. for triangle in self.triangles:
  423. for vertex_index in triangle.vertex_indices:
  424. vertex_indices.discard(vertex_index)
  425. return vertex_indices
  426. def remove_unused_vertices(self) -> None:
  427. vertex_index_conversion = [0] * len(self.vertices)
  428. for vertex_index in sorted(self._unused_vertices(), reverse=True):
  429. del self.vertices[vertex_index]
  430. vertex_index_conversion[vertex_index] -= 1
  431. vertex_index_conversion = numpy.cumsum(vertex_index_conversion)
  432. for triangle_index in range(len(self.triangles)):
  433. self.triangles[triangle_index] \
  434. = Triangle(map(lambda i: i + int(vertex_index_conversion[i]),
  435. self.triangles[triangle_index].vertex_indices))
  436. def select_vertices(self, vertex_indices: typing.Iterable[int]) \
  437. -> typing.List[Vertex]:
  438. return [self.vertices[idx] for idx in vertex_indices]
  439. @staticmethod
  440. def unite(surfaces: typing.Iterable['Surface']) -> 'Surface':
  441. surfaces_iter = iter(surfaces)
  442. union = copy.deepcopy(next(surfaces_iter))
  443. for surface in surfaces_iter:
  444. vertex_index_offset = len(union.vertices)
  445. union.vertices.extend(surface.vertices)
  446. union.triangles.extend(
  447. Triangle(vertex_idx + vertex_index_offset
  448. for vertex_idx in triangle.vertex_indices)
  449. for triangle in surface.triangles)
  450. return union