__init__.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. """
  2. Python Library to Read FreeSurfer's Cortical Parcellation Anatomical Statistics
  3. ([lh]h.aparc(.*)?.stats)
  4. Freesurfer
  5. https://surfer.nmr.mgh.harvard.edu/
  6. >>> from freesurfer_stats import CorticalParcellationStats
  7. >>> stats = CorticalParcellationStats.read('tests/subjects/fabian/stats/lh.aparc.DKTatlas.stats')
  8. >>> stats.headers['CreationTime'].isoformat()
  9. '2019-05-09T21:05:54+00:00'
  10. >>> stats.headers['cvs_version']
  11. 'Id: mris_anatomical_stats.c,v 1.79 2016/03/14 15:15:34 greve Exp'
  12. >>> stats.headers['cmdline'][:64]
  13. 'mris_anatomical_stats -th3 -mgz -cortex ../label/lh.cortex.label'
  14. >>> stats.hemisphere
  15. >>> stats.whole_brain_measurements['estimated_total_intracranial_volume_mm^3']
  16. 0 1.670487e+06
  17. Name: estimated_total_intracranial_volume_mm^3, dtype: float64
  18. >>> stats.whole_brain_measurements['white_surface_total_area_mm^2']
  19. 0 98553
  20. Name: white_surface_total_area_mm^2, dtype: int64
  21. >>> stats.structural_measurements[['structure_name', 'surface_area_mm^2',
  22. ... 'gray_matter_volume_mm^3']].head()
  23. structure_name surface_area_mm^2 gray_matter_volume_mm^3
  24. 0 caudalanteriorcingulate 1472 4258
  25. 1 caudalmiddlefrontal 3039 8239
  26. 2 cuneus 2597 6722
  27. 3 entorhinal 499 2379
  28. 4 fusiform 3079 9064
  29. Copyright (C) 2019 Fabian Peter Hammerle <fabian@hammerle.me>
  30. This program is free software: you can redistribute it and/or modify
  31. it under the terms of the GNU General Public License as published by
  32. the Free Software Foundation, either version 3 of the License, or
  33. any later version.
  34. This program is distributed in the hope that it will be useful,
  35. but WITHOUT ANY WARRANTY; without even the implied warranty of
  36. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  37. GNU General Public License for more details.
  38. You should have received a copy of the GNU General Public License
  39. along with this program. If not, see <https://www.gnu.org/licenses/>.
  40. """
  41. import datetime
  42. import io
  43. import pathlib
  44. import re
  45. import typing
  46. import numpy
  47. import pandas
  48. from freesurfer_stats.version import __version__
  49. def _get_filepath_or_buffer(
  50. path: typing.Union[str, pathlib.Path]
  51. ) -> typing.Tuple[typing.Any, bool]: # (pandas._typing.FileOrBuffer, bool)
  52. # path_or_buffer: typing.Union[str, pathlib.Path, typing.IO[typing.AnyStr],
  53. # s3fs.S3File, gcsfs.GCSFile]
  54. # https://github.com/pandas-dev/pandas/blob/v0.25.3/pandas/io/parsers.py#L436
  55. # https://github.com/pandas-dev/pandas/blob/v0.25.3/pandas/_typing.py#L30
  56. (path_or_buffer, _, _, *instructions) = pandas.io.common.get_filepath_or_buffer(
  57. path
  58. )
  59. if instructions: # pragma: no cover
  60. # https://github.com/pandas-dev/pandas/blob/v0.25.3/pandas/io/common.py#L171
  61. assert len(instructions) == 1, instructions
  62. should_close = instructions[0]
  63. else: # pragma: no cover
  64. # https://github.com/pandas-dev/pandas/blob/v0.21.0/pandas/io/common.py#L171
  65. should_close = hasattr(path_or_buffer, "close")
  66. return path_or_buffer, should_close
  67. class CorticalParcellationStats:
  68. _HEMISPHERE_PREFIX_TO_SIDE = {"lh": "left", "rh": "right"}
  69. _GENERAL_MEASUREMENTS_REGEX = re.compile(
  70. r"^Measure \S+, ([^,\s]+),? ([^,]+), ([\d\.]+), (\S+)$"
  71. )
  72. _COLUMN_NAMES_NON_SAFE_REGEX = re.compile(r"\s+")
  73. def __init__(self):
  74. self.headers = (
  75. {}
  76. ) # type: typing.Dict[str, typing.Union[str, datetime.datetime]]
  77. self.whole_brain_measurements = (
  78. {}
  79. ) # type: typing.Dict[str, typing.Tuple[float, int]]
  80. self.structural_measurements = {} # type: typing.Union[pandas.DataFrame, None]
  81. @property
  82. def hemisphere(self) -> str:
  83. return self._HEMISPHERE_PREFIX_TO_SIDE[self.headers["hemi"]]
  84. @staticmethod
  85. def _read_header_line(stream: typing.TextIO) -> str:
  86. line = stream.readline()
  87. assert line.startswith("# ")
  88. return line[2:].rstrip()
  89. @classmethod
  90. def _read_column_header_line(
  91. cls, stream: typing.TextIO
  92. ) -> typing.Tuple[int, str, str]:
  93. line = cls._read_header_line(stream)
  94. assert line.startswith("TableCol"), line
  95. line = line[len("TableCol ") :].lstrip()
  96. index, key, value = line.split(maxsplit=2)
  97. return int(index), key, value
  98. def _read_headers(self, stream: typing.TextIO) -> None:
  99. self.headers = {}
  100. while True:
  101. line = self._read_header_line(stream)
  102. if line.startswith("Measure"):
  103. break
  104. if line:
  105. attr_name, attr_value = line.split(" ", maxsplit=1)
  106. attr_value = attr_value.lstrip()
  107. if attr_name in ["cvs_version", "mrisurf.c-cvs_version"]:
  108. attr_value = attr_value.strip("$").rstrip()
  109. if attr_name == "CreationTime":
  110. attr_dt = datetime.datetime.strptime(
  111. attr_value, "%Y/%m/%d-%H:%M:%S-%Z"
  112. )
  113. if attr_dt.tzinfo is None:
  114. assert attr_value.endswith("-GMT")
  115. attr_dt = attr_dt.replace(tzinfo=datetime.timezone.utc)
  116. attr_value = attr_dt
  117. if attr_name == "AnnotationFileTimeStamp":
  118. attr_value = datetime.datetime.strptime(
  119. attr_value, "%Y/%m/%d %H:%M:%S"
  120. )
  121. self.headers[attr_name] = attr_value
  122. @classmethod
  123. def _format_column_name(cls, name: str, unit: typing.Optional[str]) -> str:
  124. column_name = name.lower()
  125. if unit not in ["unitless", "NA"]:
  126. column_name += "_" + unit
  127. return cls._COLUMN_NAMES_NON_SAFE_REGEX.sub("_", column_name)
  128. @classmethod
  129. def _parse_whole_brain_measurements_line(
  130. cls, line: str
  131. ) -> typing.Tuple[str, numpy.ndarray]:
  132. match = cls._GENERAL_MEASUREMENTS_REGEX.match(line)
  133. if not match:
  134. raise ValueError("unexpected line: {!r}".format(line))
  135. key, name, value, unit = match.groups()
  136. if (
  137. key == "SupraTentorialVolNotVent"
  138. and name.lower() == "supratentorial volume"
  139. ):
  140. name += " Without Ventricles"
  141. column_name = cls._format_column_name(name, unit)
  142. return column_name, pandas.to_numeric([value], errors="raise")
  143. @classmethod
  144. def _read_column_attributes(
  145. cls, num: int, stream: typing.TextIO
  146. ) -> typing.List[typing.Dict[str, str]]:
  147. columns = []
  148. for column_index in range(1, int(num) + 1):
  149. column_attrs = {}
  150. for _ in range(3):
  151. column_index_line, key, value = cls._read_column_header_line(stream)
  152. assert column_index_line == column_index
  153. assert key not in column_attrs
  154. column_attrs[key] = value
  155. columns.append(column_attrs)
  156. return columns
  157. def _read(self, stream: typing.TextIO) -> None:
  158. assert (
  159. stream.readline().rstrip()
  160. == "# Table of FreeSurfer cortical parcellation anatomical statistics"
  161. )
  162. assert stream.readline().rstrip() == "#"
  163. self._read_headers(stream)
  164. self.whole_brain_measurements = pandas.DataFrame()
  165. line = self._read_header_line(stream)
  166. while not line.startswith("NTableCols"):
  167. if line.startswith("BrainVolStatsFixed"):
  168. # https://surfer.nmr.mgh.harvard.edu/fswiki/BrainVolStatsFixed
  169. assert (
  170. line.startswith("BrainVolStatsFixed see ")
  171. or line == "BrainVolStatsFixed-NotNeeded because voxelvolume=1mm3"
  172. )
  173. self.headers["BrainVolStatsFixed"] = line[len("BrainVolStatsFixed-") :]
  174. else:
  175. column_name, value = self._parse_whole_brain_measurements_line(line)
  176. assert column_name not in self.whole_brain_measurements, column_name
  177. self.whole_brain_measurements[column_name] = value
  178. line = self._read_header_line(stream)
  179. columns = self._read_column_attributes(int(line[len("NTableCols ") :]), stream)
  180. assert self._read_header_line(stream) == "ColHeaders " + " ".join(
  181. c["ColHeader"] for c in columns
  182. )
  183. self.structural_measurements = pandas.DataFrame(
  184. (line.rstrip().split() for line in stream),
  185. columns=[
  186. self._format_column_name(c["FieldName"], c["Units"]) for c in columns
  187. ],
  188. ).apply(pandas.to_numeric, errors="ignore")
  189. @classmethod
  190. def read(cls, path: typing.Union[str, pathlib.Path]) -> "CorticalParcellationStats":
  191. path_or_buffer, should_close = _get_filepath_or_buffer(path)
  192. stats = cls()
  193. try:
  194. if hasattr(path_or_buffer, "readline"):
  195. # pylint: disable=protected-access
  196. stats._read(io.TextIOWrapper(path_or_buffer))
  197. else:
  198. with open(path_or_buffer, "r") as stream:
  199. # pylint: disable=protected-access
  200. stats._read(stream)
  201. finally:
  202. if should_close:
  203. path_or_buffer.close()
  204. return stats