-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpreprocess_data.py
More file actions
350 lines (283 loc) · 14.3 KB
/
preprocess_data.py
File metadata and controls
350 lines (283 loc) · 14.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
#!/usr/bin/env python3
"""
Comprehensive Data Preprocessing Pipeline for Afrobarometer Data
This script processes the raw data once and creates a clean, ready-to-use dataset.
"""
import pandas as pd
import pyreadstat
import json
from pathlib import Path
import sys
import time
from typing import Dict, Any, Tuple
def read_sav_data(sav_file: str) -> Tuple[pd.DataFrame, Any]:
"""Read data from .sav file with metadata."""
print(f"📖 Reading .sav file: {sav_file}")
start_time = time.time()
df, meta = pyreadstat.read_sav(sav_file)
load_time = time.time() - start_time
print(f"✅ Data loaded successfully!")
print(f" Shape: {df.shape}")
print(f" Columns: {len(df.columns)}")
print(f" Load time: {load_time:.2f} seconds")
return df, meta
def parse_afrobarometer_codebook(codebook_file: str) -> Dict[str, Any]:
"""Parse the Afrobarometer code list carefully for correct label mapping."""
print(f"📋 Parsing codebook: {codebook_file}")
try:
# Read the Excel codebook
df_codebook = pd.read_excel(codebook_file)
print(f" Codebook rows: {len(df_codebook)}")
print(f" Codebook columns: {df_codebook.columns.tolist()}")
var_labels = {}
value_labels = {}
variable_types = {}
for _, row in df_codebook.iterrows():
var_name = str(row['variable']).strip()
# Skip if variable name is invalid
if pd.isna(var_name) or var_name == 'nan' or var_name == '':
continue
# Get variable label
var_label = str(row['variable_label']).strip()
if pd.notna(var_label) and var_label != 'nan' and var_label != '':
var_labels[var_name] = var_label
# Parse value labels from JSON column
value_labels_dict = {}
if pd.notna(row['value_labels_json']) and str(row['value_labels_json']).strip() != '{}':
try:
value_labels_json = json.loads(str(row['value_labels_json']))
for key, value in value_labels_json.items():
try:
int_key = int(key)
value_labels_dict[int_key] = str(value).strip()
except ValueError:
value_labels_dict[key] = str(value).strip()
except json.JSONDecodeError:
# Fallback to raw parsing if JSON fails
if pd.notna(row['value_labels_raw']):
raw_labels = str(row['value_labels_raw']).strip()
if raw_labels and raw_labels != 'String variable':
for item in raw_labels.split(','):
item = item.strip()
if '=' in item:
parts = item.split('=', 1)
try:
code = int(parts[0].strip())
label = parts[1].strip()
value_labels_dict[code] = label
except ValueError:
pass
if value_labels_dict:
value_labels[var_name] = value_labels_dict
variable_types[var_name] = 'categorical_numeric'
else:
variable_types[var_name] = 'unknown'
print(f"✅ Codebook parsed successfully!")
print(f" Variable labels: {len(var_labels)}")
print(f" Value label sets: {len(value_labels)}")
return {
'var_labels': var_labels,
'value_labels': value_labels,
'variable_types': variable_types
}
except Exception as e:
print(f"❌ Error parsing codebook: {e}")
import traceback
traceback.print_exc()
return {'var_labels': {}, 'value_labels': {}, 'variable_types': {}}
def apply_labels_to_data(df: pd.DataFrame, codebook_data: Dict[str, Any]) -> pd.DataFrame:
"""Apply variable and value labels to the data."""
print(f"🏷️ Applying labels to data...")
df_labeled = df.copy()
var_labels = codebook_data['var_labels']
value_labels = codebook_data['value_labels']
# Apply variable labels (rename columns)
new_columns = {}
for col in df_labeled.columns:
if col in var_labels:
new_columns[col] = f"{var_labels[col]} ({col})"
else:
new_columns[col] = col
df_labeled = df_labeled.rename(columns=new_columns)
# Apply value labels
for col in df.columns:
labeled_col = new_columns[col]
if col in value_labels:
# Map numeric codes to labels
value_mapping = value_labels[col]
df_labeled[labeled_col] = df_labeled[labeled_col].map(value_mapping).fillna(df_labeled[labeled_col])
print(f"✅ Labels applied successfully!")
print(f" Original columns: {len(df.columns)}")
print(f" Labeled columns: {len(df_labeled.columns)}")
return df_labeled
def merge_country_data(df: pd.DataFrame, country_file: str, codebook_data: Dict[str, Any]) -> pd.DataFrame:
"""Merge with Country_grouping.csv for additional country information."""
print(f"🌍 Merging with country data: {country_file}")
try:
# Read country grouping data
country_df = pd.read_csv(country_file)
print(f" Country data rows: {len(country_df)}")
# Find the country column in the main data
country_col = None
for col in df.columns:
if 'country' in col.lower() and 'COUNTRY' in col:
country_col = col
break
if country_col is None:
print("⚠️ No country column found, skipping country merge")
return df
print(f" Using country column: {country_col}")
# Get unique country codes in the data
unique_country_codes = df[country_col].dropna().unique()
print(f" Unique country codes in data: {len(unique_country_codes)}")
print(f" Country codes: {sorted(unique_country_codes)}")
# Since the data already has country names, we can work directly with them
print(f" Working with country names directly from data")
# Create a mapping from country names to country data
country_mapping = {}
for country_name in unique_country_codes:
# Find matching country in the country grouping data
matching_countries = country_df[country_df['Country or Area'].str.contains(country_name, case=False, na=False)]
if len(matching_countries) > 0:
# Take the first match for basic info
row = matching_countries.iloc[0]
# Get all groupings for this country
all_groupings = matching_countries['country_grouping'].dropna().unique().tolist()
country_mapping[country_name] = {
'Country_Name': country_name,
'M49_Code': row['M49 Code'],
'ISO2': row['iso2'],
'ISO3': row['iso3'],
'Country_Grouping': all_groupings # List of all groupings
}
print(f" Countries matched: {len(country_mapping)}")
matched_countries_preview = [f"{code}: {info['Country_Name']}" for code, info in list(country_mapping.items())[:5]]
print(f" Matched countries: {matched_countries_preview}")
# Add country information columns
df_merged = df.copy()
df_merged['Country_Name'] = df_merged[country_col].map(lambda x: country_mapping.get(x, {}).get('Country_Name', None))
df_merged['M49_Code'] = df_merged[country_col].map(lambda x: country_mapping.get(x, {}).get('M49_Code', None))
df_merged['ISO2'] = df_merged[country_col].map(lambda x: country_mapping.get(x, {}).get('ISO2', None))
df_merged['ISO3'] = df_merged[country_col].map(lambda x: country_mapping.get(x, {}).get('ISO3', None))
# For Country_Grouping, we'll create a semicolon-separated string of all groupings
df_merged['Country_Grouping'] = df_merged[country_col].map(lambda x: ';'.join(country_mapping.get(x, {}).get('Country_Grouping', [])) if country_mapping.get(x, {}).get('Country_Grouping') else None)
print(f"✅ Country data merged successfully!")
print(f" New columns added: 5")
print(f" Final shape: {df_merged.shape}")
return df_merged
except Exception as e:
print(f"❌ Error merging country data: {e}")
import traceback
traceback.print_exc()
return df
def clean_data_for_streamlit(df: pd.DataFrame) -> pd.DataFrame:
"""Clean data for Streamlit compatibility."""
print(f"🧹 Cleaning data for Streamlit compatibility...")
df_clean = df.copy()
# Convert all columns to string to avoid Arrow serialization issues
for col in df_clean.columns:
if df_clean[col].dtype == 'object':
df_clean[col] = df_clean[col].astype(str)
df_clean[col] = df_clean[col].replace('nan', None)
df_clean[col] = df_clean[col].replace('None', None)
df_clean[col] = df_clean[col].replace('NaT', None)
df_clean[col] = df_clean[col].replace('', None)
print(f"✅ Data cleaned successfully!")
return df_clean
def save_optimized_dataset(df: pd.DataFrame, output_file: str, metadata: Dict[str, Any]) -> None:
"""Save the final dataset in space-efficient format."""
print(f"💾 Saving optimized dataset...")
# Test different compression methods
compression_methods = [
('csv_bz2', lambda: df.to_csv(output_file.replace('.parquet', '.csv'), index=False, compression='bz2')),
('parquet_snappy', lambda: df.to_parquet(output_file, index=False, compression='snappy', engine='pyarrow')),
('parquet_gzip', lambda: df.to_parquet(output_file.replace('.parquet', '_gzip.parquet'), index=False, compression='gzip', engine='pyarrow'))
]
best_method = None
best_size = float('inf')
for method_name, save_func in compression_methods:
try:
start_time = time.time()
save_func()
save_time = time.time() - start_time
file_size = Path(output_file.replace('.parquet', '.csv' if 'csv' in method_name else '.parquet')).stat().st_size
if 'gzip' in method_name:
file_size = Path(output_file.replace('.parquet', '_gzip.parquet')).stat().st_size
print(f" {method_name}: {file_size / (1024*1024):.1f} MB ({save_time:.2f}s)")
if file_size < best_size:
best_size = file_size
best_method = method_name
except Exception as e:
print(f" {method_name}: Failed - {e}")
# Keep the best method and clean up others
if best_method:
print(f"✅ Best method: {best_method} ({best_size / (1024*1024):.1f} MB)")
# Clean up other files
for method_name, _ in compression_methods:
if method_name != best_method:
if 'csv' in method_name:
cleanup_file = output_file.replace('.parquet', '.csv')
elif 'gzip' in method_name:
cleanup_file = output_file.replace('.parquet', '_gzip.parquet')
else:
cleanup_file = output_file
if Path(cleanup_file).exists():
Path(cleanup_file).unlink()
# Save metadata
metadata_file = output_file.replace('.parquet', '_metadata.json')
with open(metadata_file, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False, default=str)
print(f"✅ Metadata saved: {metadata_file}")
def main():
"""Main preprocessing pipeline."""
print("🚀 Afrobarometer Data Preprocessing Pipeline")
print("=" * 60)
# File paths
sav_file = "data/raw_data/R9.Merge_39ctry.20Nov23.final_.release_Updated.4Jun25-3.sav"
codebook_file = "data/reference/afrobarometer_round9_codelist.xlsx"
country_file = "data/reference/Country_grouping.csv"
output_file = "data/processed/afrobarometer_processed_data.parquet"
# Create output directory
Path("data/processed").mkdir(exist_ok=True)
try:
# Step 1: Read .sav data
df, meta = read_sav_data(sav_file)
# Step 2: Parse codebook
codebook_data = parse_afrobarometer_codebook(codebook_file)
# Step 3: Apply labels
df_labeled = apply_labels_to_data(df, codebook_data)
# Step 4: Merge country data
df_merged = merge_country_data(df_labeled, country_file, codebook_data)
# Step 5: Clean data
df_clean = clean_data_for_streamlit(df_merged)
# Step 6: Prepare metadata
metadata = {
'original_shape': df.shape,
'processed_shape': df_clean.shape,
'var_labels': codebook_data['var_labels'],
'value_labels': codebook_data['value_labels'],
'variable_types': codebook_data['variable_types'],
'processing_timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'source_files': {
'sav_file': sav_file,
'codebook_file': codebook_file,
'country_file': country_file
}
}
# Step 7: Save optimized dataset
save_optimized_dataset(df_clean, output_file, metadata)
print(f"\n🎉 Preprocessing completed successfully!")
print(f"✅ Original data: {df.shape}")
print(f"✅ Processed data: {df_clean.shape}")
print(f"✅ Variable labels: {len(codebook_data['var_labels'])}")
print(f"✅ Value label sets: {len(codebook_data['value_labels'])}")
print(f"✅ Output file: {output_file}")
return True
except Exception as e:
print(f"\n❌ Preprocessing failed: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)