__init__.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. """
  2. Python Library to Read FreeSurfer's Cortical Parcellation Anatomical Statistics
  3. ([lh]h.aparc(.*)?.stats)
  4. Freesurfer
  5. https://surfer.nmr.mgh.harvard.edu/
  6. >>> from freesurfer_stats import CorticalParcellationStats
  7. >>> stats = CorticalParcellationStats.read('tests/subjects/fabian/stats/lh.aparc.DKTatlas.stats')
  8. >>> stats.headers['CreationTime'].isoformat()
  9. '2019-05-09T21:05:54+00:00'
  10. >>> stats.headers['cvs_version']
  11. 'Id: mris_anatomical_stats.c,v 1.79 2016/03/14 15:15:34 greve Exp'
  12. >>> stats.headers['cmdline'][:64]
  13. 'mris_anatomical_stats -th3 -mgz -cortex ../label/lh.cortex.label'
  14. >>> stats.hemisphere
  15. >>> stats.whole_brain_measurements['estimated_total_intracranial_volume_mm^3']
  16. 0 1.670487e+06
  17. Name: estimated_total_intracranial_volume_mm^3, dtype: float64
  18. >>> stats.whole_brain_measurements['white_surface_total_area_mm^2']
  19. 0 98553
  20. Name: white_surface_total_area_mm^2, dtype: int64
  21. >>> stats.structural_measurements[['structure_name', 'surface_area_mm^2',
  22. ... 'gray_matter_volume_mm^3']].head()
  23. structure_name surface_area_mm^2 gray_matter_volume_mm^3
  24. 0 caudalanteriorcingulate 1472 4258
  25. 1 caudalmiddlefrontal 3039 8239
  26. 2 cuneus 2597 6722
  27. 3 entorhinal 499 2379
  28. 4 fusiform 3079 9064
  29. Copyright (C) 2019 Fabian Peter Hammerle <fabian@hammerle.me>
  30. This program is free software: you can redistribute it and/or modify
  31. it under the terms of the GNU General Public License as published by
  32. the Free Software Foundation, either version 3 of the License, or
  33. any later version.
  34. This program is distributed in the hope that it will be useful,
  35. but WITHOUT ANY WARRANTY; without even the implied warranty of
  36. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  37. GNU General Public License for more details.
  38. You should have received a copy of the GNU General Public License
  39. along with this program. If not, see <https://www.gnu.org/licenses/>.
  40. """
  41. from __future__ import annotations
  42. import datetime
  43. import io
  44. import pathlib
  45. import re
  46. import typing
  47. import numpy
  48. import pandas
  49. from freesurfer_stats.version import __version__
  50. def _get_filepath_or_buffer(
  51. path: typing.Union[str, pathlib.Path]
  52. ) -> typing.Tuple[
  53. typing.Any, bool # pandas._typing.FileOrBuffer, bool)
  54. ]: # pragma: no cover
  55. # can't check coverage due to pandas version branching.
  56. # pipeline tests against multiple pandas versions.
  57. if not hasattr(pandas.io.common, "get_filepath_or_buffer"):
  58. # pandas.io.common.get_filepath_or_buffer was made private in v1.2.0:
  59. # https://github.com/pandas-dev/pandas/commit/6d1541e1782a7b94797d5432922e64a97934cfa4#diff-934d8564d648e7521db673c6399dcac98e45adfd5230ba47d3aabfcc21979febL247
  60. # semver?!? breaking change not even mentioned in changelog:
  61. # https://pandas.pydata.org/pandas-docs/stable/whatsnew/v1.2.0.html
  62. # new wrapper: get_handle
  63. # https://github.com/pandas-dev/pandas/blob/v1.2.0/pandas/io/common.py#L490
  64. # pandas v1.1's get_handle does not yet support urls
  65. # pylint: disable=no-member; for python<v1.2.0
  66. io_handle = pandas.io.common.get_handle(path, "r")
  67. return io_handle.handle, True
  68. # path_or_buffer: typing.Union[str, pathlib.Path, typing.IO[typing.AnyStr],
  69. # s3fs.S3File, gcsfs.GCSFile]
  70. # https://github.com/pandas-dev/pandas/blob/v0.25.3/pandas/io/parsers.py#L436
  71. # https://github.com/pandas-dev/pandas/blob/v0.25.3/pandas/_typing.py#L30
  72. # pylint: disable=no-member; for python>=v1.2.0
  73. (path_or_buffer, _, _, should_close) = pandas.io.common.get_filepath_or_buffer(path)
  74. return path_or_buffer, should_close
  75. class CorticalParcellationStats:
  76. _HEMISPHERE_PREFIX_TO_SIDE = {"lh": "left", "rh": "right"}
  77. _GENERAL_MEASUREMENTS_REGEX = re.compile(
  78. r"^Measure \S+, ([^,\s]+),? ([^,]+), ([\d\.]+), (\S+)$"
  79. )
  80. _COLUMN_NAMES_NON_SAFE_REGEX = re.compile(r"\s+")
  81. def __init__(self):
  82. self.headers: typing.Dict[str, typing.Union[str, datetime.datetime]] = {}
  83. self.whole_brain_measurements: typing.Dict[str, typing.Tuple[float, int]] = {}
  84. self.structural_measurements: typing.Union[pandas.DataFrame, None] = {}
  85. @property
  86. def hemisphere(self) -> str:
  87. return self._HEMISPHERE_PREFIX_TO_SIDE[typing.cast(str, self.headers["hemi"])]
  88. @staticmethod
  89. def _read_header_line(stream: typing.TextIO) -> str:
  90. line = stream.readline()
  91. assert line.startswith("# ")
  92. return line[2:].rstrip()
  93. @classmethod
  94. def _read_column_header_line(
  95. cls, stream: typing.TextIO
  96. ) -> typing.Tuple[int, str, str]:
  97. line = cls._read_header_line(stream)
  98. assert line.startswith("TableCol"), line
  99. line = line[len("TableCol ") :].lstrip()
  100. index, key, value = line.split(maxsplit=2)
  101. return int(index), key, value
  102. def _read_headers(self, stream: typing.TextIO) -> None:
  103. self.headers = {}
  104. while True:
  105. line = self._read_header_line(stream)
  106. if line.startswith("Measure"):
  107. break
  108. if line:
  109. attr_name, attr_value_str = line.split(" ", maxsplit=1)
  110. attr_value_str = attr_value_str.lstrip()
  111. if attr_name in ["cvs_version", "mrisurf.c-cvs_version"]:
  112. attr_value = typing.cast(
  113. typing.Union[str, datetime.datetime],
  114. attr_value_str.strip("$").rstrip(),
  115. )
  116. elif attr_name == "CreationTime":
  117. attr_dt = datetime.datetime.strptime(
  118. attr_value_str, "%Y/%m/%d-%H:%M:%S-%Z"
  119. )
  120. if attr_dt.tzinfo is None:
  121. assert attr_value_str.endswith("-GMT")
  122. attr_dt = attr_dt.replace(tzinfo=datetime.timezone.utc)
  123. attr_value = attr_dt
  124. elif attr_name == "AnnotationFileTimeStamp":
  125. attr_value = datetime.datetime.strptime(
  126. attr_value_str, "%Y/%m/%d %H:%M:%S"
  127. )
  128. else:
  129. attr_value = attr_value_str
  130. self.headers[attr_name] = attr_value
  131. @classmethod
  132. def _format_column_name(cls, name: str, unit: str) -> str:
  133. column_name = name.lower()
  134. if unit not in ["unitless", "NA"]:
  135. column_name += "_" + unit
  136. return cls._COLUMN_NAMES_NON_SAFE_REGEX.sub("_", column_name)
  137. @classmethod
  138. def _parse_whole_brain_measurements_line(
  139. cls, line: str
  140. ) -> typing.Tuple[str, numpy.ndarray]:
  141. match = cls._GENERAL_MEASUREMENTS_REGEX.match(line)
  142. if not match:
  143. raise ValueError(f"unexpected line: {line!r}")
  144. key, name, value, unit = match.groups()
  145. if (
  146. key == "SupraTentorialVolNotVent"
  147. and name.lower() == "supratentorial volume"
  148. ):
  149. name += " Without Ventricles"
  150. column_name = cls._format_column_name(name, unit)
  151. return column_name, pandas.to_numeric([value], errors="raise")
  152. @classmethod
  153. def _read_column_attributes(
  154. cls, num: int, stream: typing.TextIO
  155. ) -> typing.List[typing.Dict[str, str]]:
  156. columns = []
  157. for column_index in range(1, int(num) + 1):
  158. column_attrs: typing.Dict[str, str] = {}
  159. for _ in range(3):
  160. column_index_line, key, value = cls._read_column_header_line(stream)
  161. assert column_index_line == column_index
  162. assert key not in column_attrs
  163. column_attrs[key] = value
  164. columns.append(column_attrs)
  165. return columns
  166. def _read(self, stream: typing.TextIO) -> None:
  167. assert (
  168. stream.readline().rstrip()
  169. == "# Table of FreeSurfer cortical parcellation anatomical statistics"
  170. )
  171. assert stream.readline().rstrip() == "#"
  172. self._read_headers(stream)
  173. self.whole_brain_measurements = pandas.DataFrame()
  174. line = self._read_header_line(stream)
  175. while not line.startswith("NTableCols"):
  176. if line.startswith("BrainVolStatsFixed"):
  177. # https://surfer.nmr.mgh.harvard.edu/fswiki/BrainVolStatsFixed
  178. assert (
  179. line.startswith("BrainVolStatsFixed see ")
  180. or line == "BrainVolStatsFixed-NotNeeded because voxelvolume=1mm3"
  181. )
  182. self.headers["BrainVolStatsFixed"] = line[len("BrainVolStatsFixed-") :]
  183. else:
  184. column_name, value = self._parse_whole_brain_measurements_line(line)
  185. assert column_name not in self.whole_brain_measurements, column_name
  186. self.whole_brain_measurements[column_name] = value # type: ignore
  187. line = self._read_header_line(stream)
  188. columns = self._read_column_attributes(int(line[len("NTableCols ") :]), stream)
  189. assert self._read_header_line(stream) == "ColHeaders " + " ".join(
  190. c["ColHeader"] for c in columns
  191. )
  192. self.structural_measurements = pandas.DataFrame(
  193. (line.rstrip().split() for line in stream),
  194. columns=[
  195. self._format_column_name(c["FieldName"], c["Units"]) for c in columns
  196. ],
  197. ).apply(pandas.to_numeric, errors="ignore")
  198. @classmethod
  199. def read(cls, path: typing.Union[str, pathlib.Path]) -> CorticalParcellationStats:
  200. path_or_buffer, should_close = _get_filepath_or_buffer(path)
  201. stats = cls()
  202. try: # pragma: no cover
  203. # can't check coverage due to pandas version branching.
  204. # pylint: disable=protected-access; false-positive for ._read
  205. if isinstance(path_or_buffer, io.TextIOWrapper): # pandas>=v1.2.0
  206. stats._read(path_or_buffer)
  207. elif hasattr(path_or_buffer, "readline"):
  208. stats._read(io.TextIOWrapper(path_or_buffer))
  209. else:
  210. with open(path_or_buffer, "r", encoding="utf8") as stream:
  211. stats._read(stream)
  212. finally:
  213. if should_close:
  214. path_or_buffer.close()
  215. return stats