__init__.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. """
  2. Python Library to Read FreeSurfer's Cortical Parcellation Anatomical Statistics
  3. ([lh]h.aparc(.*)?.stats)
  4. Freesurfer
  5. https://surfer.nmr.mgh.harvard.edu/
  6. >>> from freesurfer_stats import CorticalParcellationStats
  7. >>> stats = CorticalParcellationStats.read('tests/subjects/fabian/stats/lh.aparc.DKTatlas.stats')
  8. >>> stats.headers['CreationTime'].isoformat()
  9. '2019-05-09T21:05:54+00:00'
  10. >>> stats.headers['cvs_version']
  11. 'Id: mris_anatomical_stats.c,v 1.79 2016/03/14 15:15:34 greve Exp'
  12. >>> stats.headers['cmdline'][:64]
  13. 'mris_anatomical_stats -th3 -mgz -cortex ../label/lh.cortex.label'
  14. >>> stats.hemisphere
  15. >>> stats.whole_brain_measurements['estimated_total_intracranial_volume_mm^3']
  16. 0 1.670487e+06
  17. Name: estimated_total_intracranial_volume_mm^3, dtype: float64
  18. >>> stats.whole_brain_measurements['white_surface_total_area_mm^2']
  19. 0 98553
  20. Name: white_surface_total_area_mm^2, dtype: int64
  21. >>> stats.structural_measurements[['structure_name', 'surface_area_mm^2',
  22. ... 'gray_matter_volume_mm^3']].head()
  23. structure_name surface_area_mm^2 gray_matter_volume_mm^3
  24. 0 caudalanteriorcingulate 1472 4258
  25. 1 caudalmiddlefrontal 3039 8239
  26. 2 cuneus 2597 6722
  27. 3 entorhinal 499 2379
  28. 4 fusiform 3079 9064
  29. Copyright (C) 2019 Fabian Peter Hammerle <fabian@hammerle.me>
  30. This program is free software: you can redistribute it and/or modify
  31. it under the terms of the GNU General Public License as published by
  32. the Free Software Foundation, either version 3 of the License, or
  33. any later version.
  34. This program is distributed in the hope that it will be useful,
  35. but WITHOUT ANY WARRANTY; without even the implied warranty of
  36. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  37. GNU General Public License for more details.
  38. You should have received a copy of the GNU General Public License
  39. along with this program. If not, see <https://www.gnu.org/licenses/>.
  40. """
  41. import datetime
  42. import io
  43. import pathlib
  44. import re
  45. import typing
  46. import numpy
  47. import pandas
  48. from freesurfer_stats.version import __version__
  49. def _get_filepath_or_buffer(
  50. path: typing.Union[str, pathlib.Path]
  51. ) -> typing.Tuple[
  52. typing.Any, bool # pandas._typing.FileOrBuffer, bool)
  53. ]: # pragma: no cover
  54. # can't check coverage due to pandas version branching.
  55. # pipeline tests against multiple pandas versions.
  56. if not hasattr(pandas.io.common, "get_filepath_or_buffer"):
  57. # pandas.io.common.get_filepath_or_buffer was made private in v1.2.0:
  58. # https://github.com/pandas-dev/pandas/commit/6d1541e1782a7b94797d5432922e64a97934cfa4#diff-934d8564d648e7521db673c6399dcac98e45adfd5230ba47d3aabfcc21979febL247
  59. # semver?!? breaking change not even mentioned in changelog:
  60. # https://pandas.pydata.org/pandas-docs/stable/whatsnew/v1.2.0.html
  61. # new wrapper: get_handle
  62. # https://github.com/pandas-dev/pandas/blob/v1.2.0/pandas/io/common.py#L490
  63. # pandas v1.1's get_handle does not yet support urls
  64. # pylint: disable=no-member; for python<v1.2.0
  65. io_handle = pandas.io.common.get_handle(path, "r")
  66. return io_handle.handle, True
  67. # path_or_buffer: typing.Union[str, pathlib.Path, typing.IO[typing.AnyStr],
  68. # s3fs.S3File, gcsfs.GCSFile]
  69. # https://github.com/pandas-dev/pandas/blob/v0.25.3/pandas/io/parsers.py#L436
  70. # https://github.com/pandas-dev/pandas/blob/v0.25.3/pandas/_typing.py#L30
  71. # pylint: disable=no-member; for python>=v1.2.0
  72. (path_or_buffer, _, _, *instructions) = pandas.io.common.get_filepath_or_buffer(
  73. path
  74. )
  75. if instructions:
  76. # https://github.com/pandas-dev/pandas/blob/v0.25.3/pandas/io/common.py#L171
  77. assert len(instructions) == 1, instructions
  78. should_close = instructions[0]
  79. else:
  80. # https://github.com/pandas-dev/pandas/blob/v0.21.0/pandas/io/common.py#L171
  81. should_close = hasattr(path_or_buffer, "close")
  82. return path_or_buffer, should_close
  83. class CorticalParcellationStats:
  84. _HEMISPHERE_PREFIX_TO_SIDE = {"lh": "left", "rh": "right"}
  85. _GENERAL_MEASUREMENTS_REGEX = re.compile(
  86. r"^Measure \S+, ([^,\s]+),? ([^,]+), ([\d\.]+), (\S+)$"
  87. )
  88. _COLUMN_NAMES_NON_SAFE_REGEX = re.compile(r"\s+")
  89. def __init__(self):
  90. self.headers: typing.Dict[str, typing.Union[str, datetime.datetime]] = {}
  91. self.whole_brain_measurements: typing.Dict[str, typing.Tuple[float, int]] = {}
  92. self.structural_measurements: typing.Union[pandas.DataFrame, None] = {}
  93. @property
  94. def hemisphere(self) -> str:
  95. return self._HEMISPHERE_PREFIX_TO_SIDE[typing.cast(str, self.headers["hemi"])]
  96. @staticmethod
  97. def _read_header_line(stream: typing.TextIO) -> str:
  98. line = stream.readline()
  99. assert line.startswith("# ")
  100. return line[2:].rstrip()
  101. @classmethod
  102. def _read_column_header_line(
  103. cls, stream: typing.TextIO
  104. ) -> typing.Tuple[int, str, str]:
  105. line = cls._read_header_line(stream)
  106. assert line.startswith("TableCol"), line
  107. line = line[len("TableCol ") :].lstrip()
  108. index, key, value = line.split(maxsplit=2)
  109. return int(index), key, value
  110. def _read_headers(self, stream: typing.TextIO) -> None:
  111. self.headers = {}
  112. while True:
  113. line = self._read_header_line(stream)
  114. if line.startswith("Measure"):
  115. break
  116. if line:
  117. attr_name, attr_value_str = line.split(" ", maxsplit=1)
  118. attr_value_str = attr_value_str.lstrip()
  119. if attr_name in ["cvs_version", "mrisurf.c-cvs_version"]:
  120. attr_value = typing.cast(
  121. typing.Union[str, datetime.datetime],
  122. attr_value_str.strip("$").rstrip(),
  123. )
  124. elif attr_name == "CreationTime":
  125. attr_dt = datetime.datetime.strptime(
  126. attr_value_str, "%Y/%m/%d-%H:%M:%S-%Z"
  127. )
  128. if attr_dt.tzinfo is None:
  129. assert attr_value_str.endswith("-GMT")
  130. attr_dt = attr_dt.replace(tzinfo=datetime.timezone.utc)
  131. attr_value = attr_dt
  132. elif attr_name == "AnnotationFileTimeStamp":
  133. attr_value = datetime.datetime.strptime(
  134. attr_value_str, "%Y/%m/%d %H:%M:%S"
  135. )
  136. else:
  137. attr_value = attr_value_str
  138. self.headers[attr_name] = attr_value
  139. @classmethod
  140. def _format_column_name(cls, name: str, unit: str) -> str:
  141. column_name = name.lower()
  142. if unit not in ["unitless", "NA"]:
  143. column_name += "_" + unit
  144. return cls._COLUMN_NAMES_NON_SAFE_REGEX.sub("_", column_name)
  145. @classmethod
  146. def _parse_whole_brain_measurements_line(
  147. cls, line: str
  148. ) -> typing.Tuple[str, numpy.ndarray]:
  149. match = cls._GENERAL_MEASUREMENTS_REGEX.match(line)
  150. if not match:
  151. raise ValueError(f"unexpected line: {line!r}")
  152. key, name, value, unit = match.groups()
  153. if (
  154. key == "SupraTentorialVolNotVent"
  155. and name.lower() == "supratentorial volume"
  156. ):
  157. name += " Without Ventricles"
  158. column_name = cls._format_column_name(name, unit)
  159. return column_name, pandas.to_numeric([value], errors="raise")
  160. @classmethod
  161. def _read_column_attributes(
  162. cls, num: int, stream: typing.TextIO
  163. ) -> typing.List[typing.Dict[str, str]]:
  164. columns = []
  165. for column_index in range(1, int(num) + 1):
  166. column_attrs: typing.Dict[str, str] = {}
  167. for _ in range(3):
  168. column_index_line, key, value = cls._read_column_header_line(stream)
  169. assert column_index_line == column_index
  170. assert key not in column_attrs
  171. column_attrs[key] = value
  172. columns.append(column_attrs)
  173. return columns
  174. def _read(self, stream: typing.TextIO) -> None:
  175. assert (
  176. stream.readline().rstrip()
  177. == "# Table of FreeSurfer cortical parcellation anatomical statistics"
  178. )
  179. assert stream.readline().rstrip() == "#"
  180. self._read_headers(stream)
  181. self.whole_brain_measurements = pandas.DataFrame()
  182. line = self._read_header_line(stream)
  183. while not line.startswith("NTableCols"):
  184. if line.startswith("BrainVolStatsFixed"):
  185. # https://surfer.nmr.mgh.harvard.edu/fswiki/BrainVolStatsFixed
  186. assert (
  187. line.startswith("BrainVolStatsFixed see ")
  188. or line == "BrainVolStatsFixed-NotNeeded because voxelvolume=1mm3"
  189. )
  190. self.headers["BrainVolStatsFixed"] = line[len("BrainVolStatsFixed-") :]
  191. else:
  192. column_name, value = self._parse_whole_brain_measurements_line(line)
  193. assert column_name not in self.whole_brain_measurements, column_name
  194. self.whole_brain_measurements[column_name] = value
  195. line = self._read_header_line(stream)
  196. columns = self._read_column_attributes(int(line[len("NTableCols ") :]), stream)
  197. assert self._read_header_line(stream) == "ColHeaders " + " ".join(
  198. c["ColHeader"] for c in columns
  199. )
  200. self.structural_measurements = pandas.DataFrame(
  201. (line.rstrip().split() for line in stream),
  202. columns=[
  203. self._format_column_name(c["FieldName"], c["Units"]) for c in columns
  204. ],
  205. ).apply(pandas.to_numeric, errors="ignore")
  206. @classmethod
  207. def read(cls, path: typing.Union[str, pathlib.Path]) -> "CorticalParcellationStats":
  208. path_or_buffer, should_close = _get_filepath_or_buffer(path)
  209. stats = cls()
  210. try: # pragma: no cover
  211. # can't check coverage due to pandas version branching.
  212. # pylint: disable=protected-access; false-positive for ._read
  213. if isinstance(path_or_buffer, io.TextIOWrapper): # pandas>=v1.2.0
  214. stats._read(path_or_buffer)
  215. elif hasattr(path_or_buffer, "readline"):
  216. stats._read(io.TextIOWrapper(path_or_buffer))
  217. else:
  218. with open(path_or_buffer, "r", encoding="utf8") as stream:
  219. stats._read(stream)
  220. finally:
  221. if should_close:
  222. path_or_buffer.close()
  223. return stats