  1. """
  2. Python Library to Read FreeSurfer's Cortical Parcellation Anatomical Statistics
  3. ([lh]h.aparc(.*)?.stats)
  4. Freesurfer
  5. https://surfer.nmr.mgh.harvard.edu/
  6. >>> from freesurfer_stats import CorticalParcellationStats
  7. >>> stats = CorticalParcellationStats.read('tests/subjects/fabian/stats/lh.aparc.DKTatlas.stats')
  8. >>> stats.headers['CreationTime'].isoformat()
  9. '2019-05-09T21:05:54+00:00'
  10. >>> stats.headers['cvs_version']
  11. 'Id: mris_anatomical_stats.c,v 1.79 2016/03/14 15:15:34 greve Exp'
  12. >>> stats.headers['cmdline'][:64]
  13. 'mris_anatomical_stats -th3 -mgz -cortex ../label/lh.cortex.label'
  14. >>> stats.hemisphere
  15. >>> stats.whole_brain_measurements['estimated_total_intracranial_volume_mm^3']
  16. 0 1.670487e+06
  17. Name: estimated_total_intracranial_volume_mm^3, dtype: float64
  18. >>> stats.whole_brain_measurements['white_surface_total_area_mm^2']
  19. 0 98553
  20. Name: white_surface_total_area_mm^2, dtype: int64
  21. >>> stats.structural_measurements[['structure_name', 'surface_area_mm^2',
  22. ... 'gray_matter_volume_mm^3']].head()
  23. structure_name surface_area_mm^2 gray_matter_volume_mm^3
  24. 0 caudalanteriorcingulate 1472 4258
  25. 1 caudalmiddlefrontal 3039 8239
  26. 2 cuneus 2597 6722
  27. 3 entorhinal 499 2379
  28. 4 fusiform 3079 9064
  29. Copyright (C) 2019 Fabian Peter Hammerle <fabian@hammerle.me>
  30. This program is free software: you can redistribute it and/or modify
  31. it under the terms of the GNU General Public License as published by
  32. the Free Software Foundation, either version 3 of the License, or
  33. any later version.
  34. This program is distributed in the hope that it will be useful,
  35. but WITHOUT ANY WARRANTY; without even the implied warranty of
  37. GNU General Public License for more details.
  38. You should have received a copy of the GNU General Public License
  39. along with this program. If not, see <https://www.gnu.org/licenses/>.
  40. """
  41. import datetime
  42. import io
  43. import pathlib
  44. import re
  45. import typing
  46. import numpy
  47. import pandas
  48. from freesurfer_stats.version import __version__
  49. def _get_filepath_or_buffer(
  50. path: typing.Union[str, pathlib.Path]
  51. ) -> typing.Tuple[typing.Any, bool]: # (pandas._typing.FileOrBuffer, bool)
  52. # path_or_buffer: typing.Union[str, pathlib.Path, typing.IO[typing.AnyStr],
  53. # s3fs.S3File, gcsfs.GCSFile]
  54. # https://github.com/pandas-dev/pandas/blob/v0.25.3/pandas/io/parsers.py#L436
  55. # https://github.com/pandas-dev/pandas/blob/v0.25.3/pandas/_typing.py#L30
  56. (path_or_buffer, _, _, *instructions) = pandas.io.common.get_filepath_or_buffer(
  57. path
  58. )
  59. if instructions: # pragma: no cover
  60. # https://github.com/pandas-dev/pandas/blob/v0.25.3/pandas/io/common.py#L171
  61. assert len(instructions) == 1, instructions
  62. should_close = instructions[0]
  63. else: # pragma: no cover
  64. # https://github.com/pandas-dev/pandas/blob/v0.21.0/pandas/io/common.py#L171
  65. should_close = hasattr(path_or_buffer, "close")
  66. return path_or_buffer, should_close
  67. class CorticalParcellationStats:
  68. _HEMISPHERE_PREFIX_TO_SIDE = {"lh": "left", "rh": "right"}
  70. r"^Measure \S+, ([^,\s]+),? ([^,]+), ([\d\.]+), (\S+)$"
  71. )
  72. _COLUMN_NAMES_NON_SAFE_REGEX = re.compile(r"\s+")
  73. def __init__(self):
  74. self.headers = (
  75. {}
  76. ) # type: typing.Dict[str, typing.Union[str, datetime.datetime]]
  77. self.whole_brain_measurements = (
  78. {}
  79. ) # type: typing.Dict[str, typing.Tuple[float, int]]
  80. self.structural_measurements = {} # type: typing.Union[pandas.DataFrame, None]
  81. @property
  82. def hemisphere(self) -> str:
  83. return self._HEMISPHERE_PREFIX_TO_SIDE[typing.cast(str, self.headers["hemi"])]
  84. @staticmethod
  85. def _read_header_line(stream: typing.TextIO) -> str:
  86. line = stream.readline()
  87. assert line.startswith("# ")
  88. return line[2:].rstrip()
  89. @classmethod
  90. def _read_column_header_line(
  91. cls, stream: typing.TextIO
  92. ) -> typing.Tuple[int, str, str]:
  93. line = cls._read_header_line(stream)
  94. assert line.startswith("TableCol"), line
  95. line = line[len("TableCol ") :].lstrip()
  96. index, key, value = line.split(maxsplit=2)
  97. return int(index), key, value
  98. def _read_headers(self, stream: typing.TextIO) -> None:
  99. self.headers = {}
  100. while True:
  101. line = self._read_header_line(stream)
  102. if line.startswith("Measure"):
  103. break
  104. if line:
  105. attr_name, attr_value_str = line.split(" ", maxsplit=1)
  106. attr_value_str = attr_value_str.lstrip()
  107. if attr_name in ["cvs_version", "mrisurf.c-cvs_version"]:
  108. attr_value = typing.cast(
  109. typing.Union[str, datetime.datetime],
  110. attr_value_str.strip("$").rstrip(),
  111. )
  112. elif attr_name == "CreationTime":
  113. attr_dt = datetime.datetime.strptime(
  114. attr_value_str, "%Y/%m/%d-%H:%M:%S-%Z"
  115. )
  116. if attr_dt.tzinfo is None:
  117. assert attr_value_str.endswith("-GMT")
  118. attr_dt = attr_dt.replace(tzinfo=datetime.timezone.utc)
  119. attr_value = attr_dt
  120. elif attr_name == "AnnotationFileTimeStamp":
  121. attr_value = datetime.datetime.strptime(
  122. attr_value_str, "%Y/%m/%d %H:%M:%S"
  123. )
  124. else:
  125. attr_value = attr_value_str
  126. self.headers[attr_name] = attr_value
  127. @classmethod
  128. def _format_column_name(cls, name: str, unit: str) -> str:
  129. column_name = name.lower()
  130. if unit not in ["unitless", "NA"]:
  131. column_name += "_" + unit
  132. return cls._COLUMN_NAMES_NON_SAFE_REGEX.sub("_", column_name)
  133. @classmethod
  134. def _parse_whole_brain_measurements_line(
  135. cls, line: str
  136. ) -> typing.Tuple[str, numpy.ndarray]:
  137. match = cls._GENERAL_MEASUREMENTS_REGEX.match(line)
  138. if not match:
  139. raise ValueError("unexpected line: {!r}".format(line))
  140. key, name, value, unit = match.groups()
  141. if (
  142. key == "SupraTentorialVolNotVent"
  143. and name.lower() == "supratentorial volume"
  144. ):
  145. name += " Without Ventricles"
  146. column_name = cls._format_column_name(name, unit)
  147. return column_name, pandas.to_numeric([value], errors="raise")
  148. @classmethod
  149. def _read_column_attributes(
  150. cls, num: int, stream: typing.TextIO
  151. ) -> typing.List[typing.Dict[str, str]]:
  152. columns = []
  153. for column_index in range(1, int(num) + 1):
  154. column_attrs = {} # type: typing.Dict[str, str]
  155. for _ in range(3):
  156. column_index_line, key, value = cls._read_column_header_line(stream)
  157. assert column_index_line == column_index
  158. assert key not in column_attrs
  159. column_attrs[key] = value
  160. columns.append(column_attrs)
  161. return columns
  162. def _read(self, stream: typing.TextIO) -> None:
  163. assert (
  164. stream.readline().rstrip()
  165. == "# Table of FreeSurfer cortical parcellation anatomical statistics"
  166. )
  167. assert stream.readline().rstrip() == "#"
  168. self._read_headers(stream)
  169. self.whole_brain_measurements = pandas.DataFrame()
  170. line = self._read_header_line(stream)
  171. while not line.startswith("NTableCols"):
  172. if line.startswith("BrainVolStatsFixed"):
  173. # https://surfer.nmr.mgh.harvard.edu/fswiki/BrainVolStatsFixed
  174. assert (
  175. line.startswith("BrainVolStatsFixed see ")
  176. or line == "BrainVolStatsFixed-NotNeeded because voxelvolume=1mm3"
  177. )
  178. self.headers["BrainVolStatsFixed"] = line[len("BrainVolStatsFixed-") :]
  179. else:
  180. column_name, value = self._parse_whole_brain_measurements_line(line)
  181. assert column_name not in self.whole_brain_measurements, column_name
  182. self.whole_brain_measurements[column_name] = value
  183. line = self._read_header_line(stream)
  184. columns = self._read_column_attributes(int(line[len("NTableCols ") :]), stream)
  185. assert self._read_header_line(stream) == "ColHeaders " + " ".join(
  186. c["ColHeader"] for c in columns
  187. )
  188. self.structural_measurements = pandas.DataFrame(
  189. (line.rstrip().split() for line in stream),
  190. columns=[
  191. self._format_column_name(c["FieldName"], c["Units"]) for c in columns
  192. ],
  193. ).apply(pandas.to_numeric, errors="ignore")
  194. @classmethod
  195. def read(cls, path: typing.Union[str, pathlib.Path]) -> "CorticalParcellationStats":
  196. path_or_buffer, should_close = _get_filepath_or_buffer(path)
  197. stats = cls()
  198. try:
  199. if hasattr(path_or_buffer, "readline"):
  200. # pylint: disable=protected-access
  201. stats._read(io.TextIOWrapper(path_or_buffer))
  202. else:
  203. with open(path_or_buffer, "r") as stream:
  204. # pylint: disable=protected-access
  205. stats._read(stream)
  206. finally:
  207. if should_close:
  208. path_or_buffer.close()
  209. return stats