__init__.py 4.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import argparse
  2. import os
  3. import re
  4. import typing
  5. import pandas
  6. # https://surfer.nmr.mgh.harvard.edu/fswiki/HippocampalSubfields
  7. HIPPOCAMPAL_VOLUME_FILENAME_PATTERN = r'^(?P<h>[lr])h\.hippoSfVolumes' \
  8. r'(?P<T1>-T1)?(-(?P<analysis_id>.+?))?\.v10.txt$'
  9. HIPPOCAMPAL_VOLUME_FILENAME_REGEX = re.compile(HIPPOCAMPAL_VOLUME_FILENAME_PATTERN)
  10. DEFAULT_HIPPOCAMPAL_VOLUME_FIND_FILENAME_PATTERN = re.sub(r'\?P<.+?>', '',
  11. HIPPOCAMPAL_VOLUME_FILENAME_PATTERN)
  12. VOLUME_FILENAME_HEMISPHERE_MAP = {'l': 'left', 'r': 'right'}
  13. def find_hippocampal_volume_files(root_dir_path: str,
  14. filename_regex: typing.Pattern = HIPPOCAMPAL_VOLUME_FILENAME_REGEX
  15. ) -> typing.Iterator[str]:
  16. for dirpath, _, filenames in os.walk(root_dir_path):
  17. for filename in filter(filename_regex.search, filenames):
  18. yield os.path.join(dirpath, filename)
  19. def read_hippocampal_volumes_mm3(volume_file_path: str) -> dict:
  20. subfield_volumes = {}
  21. with open(volume_file_path, 'r') as volume_file:
  22. for line in volume_file.read().rstrip().split('\n'):
  23. # https://github.com/freesurfer/freesurfer/blob/release_6_0_0/HippoSF/src/segmentSubjectT1T2_autoEstimateAlveusML.m#L8
  24. # https://github.com/freesurfer/freesurfer/blob/release_6_0_0/HippoSF/src/segmentSubjectT1T2_autoEstimateAlveusML.m#L1946
  25. subfield_name, subfield_volume_mm3_str = line.split(' ')
  26. subfield_volumes[subfield_name] = float(subfield_volume_mm3_str)
  27. return subfield_volumes
  28. def parse_hippocampal_volume_file_path(volume_file_path: str) -> dict:
  29. subject_dir_path = os.path.dirname(os.path.dirname(os.path.abspath(volume_file_path)))
  30. filename_match = HIPPOCAMPAL_VOLUME_FILENAME_REGEX.match(os.path.basename(volume_file_path))
  31. assert filename_match, volume_file_path
  32. filename_groups = filename_match.groupdict()
  33. assert filename_groups['T1'] or filename_groups['analysis_id'], volume_file_path
  34. return {
  35. 'subject': os.path.basename(subject_dir_path),
  36. 'hemisphere': VOLUME_FILENAME_HEMISPHERE_MAP[filename_groups['h']],
  37. 'T1_input': filename_groups['T1'] is not None,
  38. 'analysis_id': filename_groups['analysis_id'],
  39. }
  40. def read_hippocampal_volume_file_dataframe(volume_file_path: str) -> pandas.DataFrame:
  41. volumes_frame = pandas.DataFrame(
  42. read_hippocampal_volumes_mm3(volume_file_path).items(),
  43. columns=['subfield', 'volume_mm^3'])
  44. for key, value in parse_hippocampal_volume_file_path(volume_file_path).items():
  45. volumes_frame[key] = value
  46. # volumes_frame['hemisphere'] = volumes_frame['hemisphere'].astype('category')
  47. return volumes_frame
  48. def main():
  49. argparser = argparse.ArgumentParser(
  50. description='Read hippocampal subfield volumes computed by Freesurfer'
  51. '\nhttps://surfer.nmr.mgh.harvard.edu/fswiki/HippocampalSubfields')
  52. argparser.add_argument('--filename-regex', dest='filename_pattern',
  53. default=DEFAULT_HIPPOCAMPAL_VOLUME_FIND_FILENAME_PATTERN,
  54. help='default: %(default)s')
  55. argparser.add_argument('--output-format', choices=['csv'], default='csv',
  56. help='default: %(default)s')
  57. subjects_dir_path = os.environ.get('SUBJECTS_DIR', None)
  58. argparser.add_argument('root_dir_path',
  59. nargs='?' if subjects_dir_path else 1,
  60. default=[subjects_dir_path],
  61. help='default: $SUBJECTS_DIR ({})'.format(subjects_dir_path))
  62. args = argparser.parse_args()
  63. volume_file_paths = find_hippocampal_volume_files(
  64. root_dir_path=args.root_dir_path[0],
  65. filename_regex=re.compile(args.filename_pattern))
  66. volume_frames = []
  67. for volume_file_path in volume_file_paths:
  68. volume_frame = read_hippocampal_volume_file_dataframe(volume_file_path)
  69. volume_frame['source_path'] = os.path.abspath(volume_file_path)
  70. volume_frames.append(volume_frame)
  71. united_volume_frame = pandas.concat(volume_frames, ignore_index=True)
  72. print(united_volume_frame.to_csv(index=False))
  73. if __name__ == '__main__':
  74. main()