Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Functions to parse data in the rather bespoke format used by the MCD.""" 

2import re 

3from collections import namedtuple 

4from datetime import datetime 

5from pathlib import Path 

6from typing import List, Union 

7 

8import numpy as np 

9 

10 

11def parse_number(num: str) -> Union[float, int, None]: 

12 """ 

13 Parse a number into the best representation. Return None if not possible. 

14 

15 Args: 

16 num (str): number to parse. 

17 

18 Returns: 

19 (float or int or None): parsed number. 

20 

21 """ 

22 if num == "----": 

23 return None 

24 if "." in num: 

25 return float(num) 

26 try: 

27 return int(num) 

28 except ValueError: 

29 return float(num) 

30 

31 

32def parse_header(lines: List[str]) -> dict: 

33 """Parse header. 

34 

35 Args: 

36 lines (List[str]): lines to parse. 

37 

38 Returns: 

39 Dict representing extracted data. 

40 """ 

41 # written to be readable by people beginning python, so rather verbose. 

42 data = {} 

43 match = re.search("MCD_(.+) with (.+).", lines[0]) 

44 data["mcd_version"] = match.group(1) # type: ignore 

45 data["model"] = match.group(2) # type: ignore 

46 match = re.search("Ls (.+). Altitude (.+) ALS Local time (.+)", lines[1]) 

47 data["ls"] = match.group(1) # type: ignore 

48 data["altitude"] = match.group(2) # type: ignore 

49 data["local_time"] = match.group(3).strip() # type: ignore 

50 assert "-" * 6 in lines[2] 

51 match = re.search("Column 1 is (.+)", lines[3]) 

52 data["column_1"] = match.group(1) # type: ignore 

53 

54 match = re.search(r"Columns 2\+ are (.+)", lines[4]) 

55 data["variable"] = match.group(1) # type: ignore 

56 

57 match = re.search("Line 1 is (.+)", lines[5]) 

58 data["keys"] = match.group(1) # type: ignore 

59 assert "-" * 6 in lines[6] 

60 match = re.search("Retrieved on: (.+)", lines[7]) 

61 data["retrieval_date"] = datetime.fromisoformat(match.group(1)) # type: ignore 

62 return data 

63 

64 

65_DataTable = namedtuple("_DataTable", ["data", "xlabels", "ylabels"]) 

66 

67 

68def parse_body(body: List[str]) -> "_DataTable": 

69 """ 

70 Parse body of data from the MCD. 

71 

72 Args: 

73 body (List[str]): lines to parse. 

74 

75 Returns: 

76 (_DataTable): The parsed data. 

77 """ 

78 # here we use the map (/reduce, but here we don't reduce) paradigm 

79 # to show how sometimes functional programming is a *lot* simpler 

80 # than writing the loops out by hand. 

81 

82 # map applies a function (here an anonymous function decared with lambda) 

83 # over an iterable 

84 

85 # numpy has it's own map/reduce fns which are implemented in C 

86 # and can be a lot faster than python's. 

87 

88 body = list(map(lambda row: " ".join(row.strip().split()), body)) 

89 xlabels = body[0].split("||")[1].strip().split(" ") 

90 body = body[2:] 

91 xlabels = map(parse_number, xlabels) # type: ignore 

92 ylabels_map = map(lambda row: row.split("||")[0].strip(), body) 

93 ylabels = map(parse_number, ylabels_map) 

94 data_map = map(lambda row: row.split("||")[1].strip().split(" "), body) 

95 data = np.array(list(data_map), dtype=float) 

96 return _DataTable(np.rot90(data), list(xlabels), list(ylabels)) 

97 

98 

99def read_ascii_data(dataf: Path) -> dict: 

100 """ 

101 Parse a file downloaded from the MCD. 

102 

103 Args: 

104 dataf (Path): The file to pass. 

105 

106 Returns: 

107 (dict): The data. 

108 

109 Raises: 

110 ValueError: Failed to parse supplied file. 

111 

112 """ 

113 sections = {} 

114 with dataf.open() as f: 

115 row = f.readline() 

116 while True: 

117 if not row: 

118 break 

119 row = f.readline() # skip ###### row 

120 header_rows = [] 

121 while "#" * 8 not in row: 

122 header_rows.append(row) 

123 row = f.readline() 

124 if not row: 

125 break 

126 if len(header_rows) < 8: 

127 raise ValueError("Unable to get header from file") 

128 header = parse_header(header_rows) 

129 

130 # parse body 

131 body_rows = [] 

132 row = f.readline() 

133 while row and "#" * 8 not in row: # start header section 

134 body_rows.append(row) 

135 row = f.readline() 

136 body = parse_body(body_rows) 

137 header["data"] = body 

138 sections[header["variable"]] = header 

139 return sections