__init__.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. """
  2. Python Library to Read FreeSurfer's Cortical Parcellation Anatomical Statistics
  3. ([lh]h.aparc(.*)?.stats)
  4. Freesurfer
  5. https://surfer.nmr.mgh.harvard.edu/
  6. >>> from freesurfer_stats import CorticalParcellationStats
  7. >>> stats = CorticalParcellationStats.read('tests/subjects/fabian/stats/lh.aparc.DKTatlas.stats')
  8. >>> stats.headers['CreationTime'].isoformat()
  9. '2019-05-09T21:05:54+00:00'
  10. >>> stats.headers['cvs_version']
  11. 'Id: mris_anatomical_stats.c,v 1.79 2016/03/14 15:15:34 greve Exp'
  12. >>> stats.headers['cmdline'][:64]
  13. 'mris_anatomical_stats -th3 -mgz -cortex ../label/lh.cortex.label'
  14. >>> stats.hemisphere
  15. >>> stats.whole_brain_measurements['estimated_total_intracranial_volume_mm^3']
  16. 0 1.670487e+06
  17. Name: estimated_total_intracranial_volume_mm^3, dtype: float64
  18. >>> stats.whole_brain_measurements['white_surface_total_area_mm^2']
  19. 0 98553
  20. Name: white_surface_total_area_mm^2, dtype: int64
  21. >>> stats.structural_measurements[['structure_name', 'surface_area_mm^2',
  22. ... 'gray_matter_volume_mm^3']].head()
  23. structure_name surface_area_mm^2 gray_matter_volume_mm^3
  24. 0 caudalanteriorcingulate 1472 4258
  25. 1 caudalmiddlefrontal 3039 8239
  26. 2 cuneus 2597 6722
  27. 3 entorhinal 499 2379
  28. 4 fusiform 3079 9064
  29. Copyright (C) 2019 Fabian Peter Hammerle <fabian@hammerle.me>
  30. This program is free software: you can redistribute it and/or modify
  31. it under the terms of the GNU General Public License as published by
  32. the Free Software Foundation, either version 3 of the License, or
  33. any later version.
  34. This program is distributed in the hope that it will be useful,
  35. but WITHOUT ANY WARRANTY; without even the implied warranty of
  36. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  37. GNU General Public License for more details.
  38. You should have received a copy of the GNU General Public License
  39. along with this program. If not, see <https://www.gnu.org/licenses/>.
  40. """
  41. import datetime
  42. import io
  43. import pathlib
  44. import re
  45. import typing
  46. import numpy
  47. import pandas
  48. from freesurfer_stats.version import __version__
  49. class CorticalParcellationStats:
  50. _HEMISPHERE_PREFIX_TO_SIDE = {'lh': 'left', 'rh': 'right'}
  51. _GENERAL_MEASUREMENTS_REGEX = re.compile(
  52. r'^Measure \S+, ([^,\s]+),? ([^,]+), ([\d\.]+), (\S+)$')
  53. _COLUMN_NAMES_NON_SAFE_REGEX = re.compile(r'\s+')
  54. def __init__(self):
  55. self.headers \
  56. = {} # type: typing.Dict[str, typing.Union[str, datetime.datetime]]
  57. self.whole_brain_measurements \
  58. = {} # type: typing.Dict[str, typing.Tuple[float, int]]
  59. self.structural_measurements \
  60. = {} # type: typing.Union[pandas.DataFrame, None]
  61. @property
  62. def hemisphere(self) -> str:
  63. return self._HEMISPHERE_PREFIX_TO_SIDE[self.headers['hemi']]
  64. @staticmethod
  65. def _read_header_line(stream: typing.TextIO) -> str:
  66. line = stream.readline()
  67. assert line.startswith('# ')
  68. return line[2:].rstrip()
  69. @classmethod
  70. def _read_column_header_line(cls, stream: typing.TextIO) -> typing.Tuple[int, str, str]:
  71. line = cls._read_header_line(stream)
  72. assert line.startswith('TableCol'), line
  73. line = line[len('TableCol '):].lstrip()
  74. index, key, value = line.split(maxsplit=2)
  75. return int(index), key, value
  76. def _read_headers(self, stream: typing.TextIO) -> None:
  77. self.headers = {}
  78. while True:
  79. line = self._read_header_line(stream)
  80. if line.startswith('Measure'):
  81. break
  82. if line:
  83. attr_name, attr_value = line.split(" ", maxsplit=1)
  84. attr_value = attr_value.lstrip()
  85. if attr_name in ['cvs_version', 'mrisurf.c-cvs_version']:
  86. attr_value = attr_value.strip('$').rstrip()
  87. if attr_name == 'CreationTime':
  88. attr_dt = datetime.datetime.strptime(
  89. attr_value, '%Y/%m/%d-%H:%M:%S-%Z')
  90. if attr_dt.tzinfo is None:
  91. assert attr_value.endswith('-GMT')
  92. attr_dt = attr_dt.replace(tzinfo=datetime.timezone.utc)
  93. attr_value = attr_dt
  94. if attr_name == 'AnnotationFileTimeStamp':
  95. attr_value = datetime.datetime.strptime(
  96. attr_value, '%Y/%m/%d %H:%M:%S')
  97. self.headers[attr_name] = attr_value
  98. @classmethod
  99. def _format_column_name(cls, name: str, unit: typing.Optional[str]) -> str:
  100. column_name = name.lower()
  101. if unit not in ["unitless", "NA"]:
  102. column_name += "_" + unit
  103. return cls._COLUMN_NAMES_NON_SAFE_REGEX.sub("_", column_name)
  104. @classmethod
  105. def _parse_whole_brain_measurements_line(
  106. cls, line: str,
  107. ) -> typing.Tuple[str, numpy.ndarray]:
  108. match = cls._GENERAL_MEASUREMENTS_REGEX.match(line)
  109. if not match:
  110. raise ValueError("unexpected line: {!r}".format(line))
  111. key, name, value, unit = match.groups()
  112. if (
  113. key == "SupraTentorialVolNotVent"
  114. and name.lower() == "supratentorial volume"
  115. ):
  116. name += " Without Ventricles"
  117. column_name = cls._format_column_name(name, unit)
  118. return column_name, pandas.to_numeric([value], errors="raise")
  119. @classmethod
  120. def _read_column_attributes(cls, num: int, stream: typing.TextIO) \
  121. -> typing.List[typing.Dict[str, str]]:
  122. columns = []
  123. for column_index in range(1, int(num) + 1):
  124. column_attrs = {}
  125. for _ in range(3):
  126. column_index_line, key, value \
  127. = cls._read_column_header_line(stream)
  128. assert column_index_line == column_index
  129. assert key not in column_attrs
  130. column_attrs[key] = value
  131. columns.append(column_attrs)
  132. return columns
  133. def _read(self, stream: typing.TextIO) -> None:
  134. assert stream.readline().rstrip() \
  135. == '# Table of FreeSurfer cortical parcellation anatomical statistics'
  136. assert stream.readline().rstrip() == '#'
  137. self._read_headers(stream)
  138. self.whole_brain_measurements = pandas.DataFrame()
  139. line = self._read_header_line(stream)
  140. while not line.startswith("NTableCols"):
  141. if line.startswith("BrainVolStatsFixed"):
  142. # https://surfer.nmr.mgh.harvard.edu/fswiki/BrainVolStatsFixed
  143. assert (
  144. line.startswith("BrainVolStatsFixed see ")
  145. or line == "BrainVolStatsFixed-NotNeeded because voxelvolume=1mm3"
  146. )
  147. self.headers["BrainVolStatsFixed"] = line[len("BrainVolStatsFixed-") :]
  148. else:
  149. column_name, value = self._parse_whole_brain_measurements_line(line)
  150. assert column_name not in self.whole_brain_measurements, column_name
  151. self.whole_brain_measurements[column_name] = value
  152. line = self._read_header_line(stream)
  153. columns = self._read_column_attributes(
  154. int(line[len('NTableCols '):]), stream)
  155. assert self._read_header_line(stream) \
  156. == 'ColHeaders ' + ' '.join(c['ColHeader'] for c in columns)
  157. self.structural_measurements = pandas.DataFrame(
  158. (line.rstrip().split() for line in stream),
  159. columns=[self._format_column_name(c['FieldName'], c['Units']) for c in columns]) \
  160. .apply(pandas.to_numeric, errors='ignore')
  161. @classmethod
  162. def read(cls, path: typing.Union[str, pathlib.Path]) -> "CorticalParcellationStats":
  163. # path_or_buffer: typing.Union[str, pathlib.Path, typing.IO[typing.AnyStr],
  164. # s3fs.S3File, gcsfs.GCSFile]
  165. # https://github.com/pandas-dev/pandas/blob/v0.25.3/pandas/io/parsers.py#L436
  166. # https://github.com/pandas-dev/pandas/blob/v0.25.3/pandas/_typing.py#L30
  167. (
  168. path_or_buffer,
  169. _,
  170. _,
  171. *instructions,
  172. ) = pandas.io.common.get_filepath_or_buffer(path)
  173. # https://github.com/pandas-dev/pandas/blob/v0.25.3/pandas/io/common.py#L171
  174. # https://github.com/pandas-dev/pandas/blob/v0.21.0/pandas/io/common.py#L171
  175. if instructions: # pragma: no cover
  176. assert len(instructions) == 1, instructions
  177. should_close = instructions[0]
  178. else: # pragma: no cover
  179. should_close = hasattr(path_or_buffer, "close")
  180. stats = cls()
  181. if hasattr(path_or_buffer, "readline"):
  182. # pylint: disable=protected-access
  183. stats._read(io.TextIOWrapper(path_or_buffer))
  184. else:
  185. with open(path_or_buffer, "r") as stream:
  186. # pylint: disable=protected-access
  187. stats._read(stream)
  188. if should_close:
  189. path_or_buffer.close()
  190. return stats