__init__.py 3.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import argparse
  2. import os
  3. import re
  4. import typing
  5. import pandas
  6. # https://surfer.nmr.mgh.harvard.edu/fswiki/HippocampalSubfields
  7. HIPPOCAMPAL_VOLUME_FILENAME_PATTERN = r'^(?P<h>[lr])h\.hippoSfVolumes' \
  8. r'(?P<T1>-T1)?(-(?P<analysis_id>.+?))?\.v10.txt$'
  9. HIPPOCAMPAL_VOLUME_FILENAME_REGEX = re.compile(HIPPOCAMPAL_VOLUME_FILENAME_PATTERN)
  10. DEFAULT_HIPPOCAMPAL_VOLUME_FIND_FILENAME_PATTERN = re.sub(r'\?P<.+?>', '',
  11. HIPPOCAMPAL_VOLUME_FILENAME_PATTERN)
  12. VOLUME_FILENAME_HEMISPHERE_MAP = {'l': 'left', 'r': 'right'}
  13. def find_hippocampal_volume_files(root_dir_path: str, filename_regex: typing.Pattern = HIPPOCAMPAL_VOLUME_FILENAME_REGEX) -> typing.Iterator[str]:
  14. for dirpath, _, filenames in os.walk(root_dir_path):
  15. for filename in filter(filename_regex.search, filenames):
  16. yield os.path.join(dirpath, filename)
  17. def read_hippocampal_volumes(volume_file_path: str) -> dict:
  18. subfield_volumes = {}
  19. with open(volume_file_path, 'r') as volume_file:
  20. for line in volume_file.read().rstrip().split('\n'):
  21. subfield_name, subfield_volume_str = line.split(' ')
  22. subfield_volumes[subfield_name] = float(subfield_volume_str)
  23. return subfield_volumes
  24. def parse_hippocampal_volume_file_path(volume_file_path: str) -> dict:
  25. subject_dir_path = os.path.dirname(os.path.dirname(os.path.abspath(volume_file_path)))
  26. filename_match = HIPPOCAMPAL_VOLUME_FILENAME_REGEX.match(os.path.basename(volume_file_path))
  27. assert filename_match, volume_file_path
  28. filename_groups = filename_match.groupdict()
  29. assert filename_groups['T1'] or filename_groups['analysis_id'], volume_file_path
  30. return {
  31. 'subject': os.path.basename(subject_dir_path),
  32. 'hemisphere': VOLUME_FILENAME_HEMISPHERE_MAP[filename_groups['h']],
  33. 'T1_input': filename_groups['T1'] is not None,
  34. 'analysis_id': filename_groups['analysis_id'],
  35. }
  36. def read_hippocampal_volume_file_dataframe(volume_file_path: str) -> pandas.DataFrame:
  37. volumes_frame = pandas.DataFrame(
  38. read_hippocampal_volumes(volume_file_path).items(),
  39. columns=['subfield', 'volume'])
  40. for key, value in parse_hippocampal_volume_file_path(volume_file_path).items():
  41. volumes_frame[key] = value
  42. # volumes_frame['hemisphere'] = volumes_frame['hemisphere'].astype('category')
  43. return volumes_frame
  44. def main():
  45. # TODO add description
  46. argparser = argparse.ArgumentParser(description='Read hippocampal volumes computed by Freesurfer'
  47. '\nhttps://surfer.nmr.mgh.harvard.edu/fswiki/HippocampalSubfields')
  48. argparser.add_argument('--filename-regex', dest='filename_pattern',
  49. default=DEFAULT_HIPPOCAMPAL_VOLUME_FIND_FILENAME_PATTERN,
  50. help='default: %(default)s')
  51. argparser.add_argument('--output-format', choices=['csv'], default='csv',
  52. help='default: %(default)s')
  53. # TODO default to $SUBJECTS_DIR
  54. argparser.add_argument('root_dir_path')
  55. args = argparser.parse_args()
  56. volume_frames = []
  57. for volume_file_path in find_hippocampal_volume_files(root_dir_path=args.root_dir_path,
  58. filename_regex=re.compile(args.filename_pattern)):
  59. volume_frame = read_hippocampal_volume_file_dataframe(volume_file_path)
  60. volume_frame['source_path'] = os.path.abspath(volume_file_path)
  61. volume_frames.append(volume_frame)
  62. united_volume_frame = pandas.concat(volume_frames, ignore_index=True)
  63. print(united_volume_frame.to_csv(index=False))
  64. if __name__ == '__main__':
  65. main()