app.memos.cron_job
1import os 2import json 3from collections import defaultdict, Counter 4from datetime import datetime, timedelta 5 6from django.conf import settings 7from django.utils import timezone 8from django.db.models import OuterRef, Subquery , Min 9 10from .models import Memo, MemoSnapshot 11from .serializers import MemoSerializer, MemoSnapshotSerializer 12 13# --------------------------- 14# Dashboard Data Computation 15# --------------------------- 16 17def get_card_metrics(data): 18 """ 19 Compute card metrics for dashboard: 20 - totalMemos: Total number of memos. 21 - completedMemos: Number of completed memos. 22 - completionRate: Percentage of completed memos. 23 - avgResolution: Average resolution time (hours). 24 - fastestResponder: Role with fastest average resolution. 25 - fastestAvgTime: Fastest average resolution time (hours). 26 """ 27 total = len(data) 28 completed = [m for m in data if m["info"]["status"] == "completed" and m["worker_status"]] 29 completion_rate = round(len(completed) / total * 100) if total else 0 30 31 total_res_time = 0 32 responder_times = defaultdict(list) 33 34 for m in completed: 35 start = datetime.fromisoformat(m["timestamp"]) 36 end = datetime.fromisoformat(m["worker_status"][0]["timestamp"]) 37 res_time = (end - start).total_seconds() / 3600 38 total_res_time += res_time 39 responder_times[m["info"]["tagged_role_name"]].append(res_time) 40 41 avg_res = round(total_res_time / len(completed), 1) if completed else 0 42 fastest = min(((sum(times)/len(times), role) for role, times in responder_times.items()), default=(None, "N/A")) 43 44 return { 45 "totalMemos": total, 46 "completedMemos": len(completed), 47 "completionRate": completion_rate, 48 "avgResolution": avg_res, 49 "fastestResponder": fastest[1], 50 "fastestAvgTime": round(fastest[0], 1) if fastest[0] is not None else None 51 } 52 53def get_block_ward_heatmap(data): 54 """ 55 Compute block and ward distribution for dashboard heatmap. 56 Returns: 57 block_counts: Dict of block name to count. 58 ward_counts: Dict of ward name to count. 59 """ 60 block_counts = Counter(m["info"].get("current_block_name", "Unknown") for m in data) 61 ward_counts = Counter(m["info"].get("current_ward_name", "Unknown") for m in data) 62 return dict(block_counts), dict(ward_counts) 63 64def get_responder_resolution(data): 65 """ 66 Compute average resolution time per responder role. 67 Returns: 68 labels: List of role names. 69 values: List of average resolution times (hours). 70 """ 71 completed = [m for m in data if m["info"]["status"] == "completed" and m["worker_status"]] 72 responder_stats = defaultdict(list) 73 for m in completed: 74 start = datetime.fromisoformat(m["timestamp"]) 75 end = datetime.fromisoformat(m["worker_status"][0]["timestamp"]) 76 res_time = (end - start).total_seconds() / 3600 77 responder_stats[m["info"].get("tagged_role_name", "Unknown")].append(res_time) 78 return { 79 "labels": list(responder_stats.keys()), 80 "values": [round(sum(v)/len(v), 1) for v in responder_stats.values()] 81 } 82 83def get_status_pie(data): 84 """ 85 Compute memo status pie chart data. 86 Returns: 87 completed: Number of completed memos. 88 rejected: Number of rejected memos. 89 ongoing: Number of ongoing memos. 90 """ 91 completed = 0 92 rejected = 0 93 ongoing = 0 94 for m in data: 95 if m["info"]["status"] == "completed": 96 completed += 1 97 elif any(h.get("rejected") for h in m.get("hierarchy", [])): 98 rejected += 1 99 else: 100 ongoing += 1 101 return {"completed": completed, "rejected": rejected, "ongoing": ongoing} 102 103def get_approval_flow(data): 104 """ 105 Compute approval flow chart data for dashboard. 106 Returns: 107 result: Dict of approval levels to approved/rejected/pending counts. 108 """ 109 levels = ["Ward Superintendent", "RMO", "DEAN"] 110 result = {lvl: {"approved": 0, "rejected": 0, "pending": 0} for lvl in levels} 111 112 for m in data: 113 for i, h in enumerate(m.get("hierarchy", [])): 114 lvl = levels[i] if i < len(levels) else f"Level {i+1}" 115 if h.get("approved"): 116 result[lvl]["approved"] += 1 117 elif h.get("rejected"): 118 result[lvl]["rejected"] += 1 119 else: 120 result[lvl]["pending"] += 1 121 return result 122 123def get_department_distribution(data): 124 """ 125 Compute department distribution for dashboard. 126 Returns: 127 Dict of department name to count. 128 """ 129 return dict(Counter(m["info"].get("department", "Unknown") for m in data)) 130 131def get_completion_timeline(data): 132 """ 133 Compute completion timeline chart data for dashboard. 134 Returns: 135 labels: List of time buckets. 136 values: List of counts per bucket. 137 """ 138 buckets = [ 139 ("0-2h", 0, 2), 140 ("2-4h", 2, 4), 141 ("4-6h", 4, 6), 142 ("6-8h", 6, 8), 143 ("8-12h", 8, 12), 144 ("12-24h", 12, 24), 145 ("24+h", 24, float("inf")) 146 ] 147 labels = [b[0] for b in buckets] 148 counts = [0] * len(buckets) 149 150 completed = [m for m in data if m["info"]["status"] == "completed" and m["worker_status"]] 151 for m in completed: 152 start = datetime.fromisoformat(m["timestamp"]) 153 end = datetime.fromisoformat(m["worker_status"][0]["timestamp"]) 154 res_time = (end - start).total_seconds() / 3600 155 for i, (_, lo, hi) in enumerate(buckets): 156 if lo < res_time <= hi: 157 counts[i] += 1 158 break 159 return {"labels": labels, "values": counts} 160 161def precompute_dashboard_data_and_save(hospital_id=None): 162 """ 163 Generate dashboard snapshots for a specific hospital or all hospitals. 164 165 - Computes metrics for today, this week, and this month. 166 - Serializes latest memo snapshots for each period. 167 - Saves dashboard data as JSON files in media/dashboard. 168 - Returns a summary of saved files. 169 """ 170 now = timezone.now() 171 local_today = timezone.localtime(now).date() 172 week_start_local = local_today - timedelta(days=local_today.weekday()) 173 174 # Create time period strings with hospital prefix if provided 175 prefix = f"{hospital_id}_" if hospital_id else "" 176 week_number = local_today.isocalendar()[1] 177 year = local_today.year 178 month = local_today.strftime('%Y-%m') 179 180 today_str = f"{prefix}{local_today.strftime('%Y-%m-%d')}" 181 week_str = f"{prefix}{year}-W{week_number:02d}" 182 month_str = f"{prefix}{month}" 183 184 # Base memo queryset - filter by hospital if provided 185 base_memo_qs = Memo.objects.filter(is_deleted=False) 186 if hospital_id: 187 base_memo_qs = base_memo_qs.filter(hospital_id=hospital_id) 188 189 # Time range helpers 190 def get_range_for_day(date): 191 start = timezone.make_aware(datetime.combine(date, datetime.min.time())) 192 end = timezone.make_aware(datetime.combine(date + timedelta(days=1), datetime.min.time())) 193 return start, end 194 195 def get_range_for_week(start_date): 196 start = timezone.make_aware(datetime.combine(start_date, datetime.min.time())) 197 end = timezone.make_aware(datetime.combine(start_date + timedelta(days=7), datetime.min.time())) 198 return start, end 199 200 def get_range_for_month(date): 201 start = timezone.make_aware(datetime.combine(date.replace(day=1), datetime.min.time())) 202 if date.month == 12: 203 end_date = date.replace(year=date.year + 1, month=1, day=1) 204 else: 205 end_date = date.replace(month=date.month + 1, day=1) 206 end = timezone.make_aware(datetime.combine(end_date, datetime.min.time())) 207 return start, end 208 209 # Filter memos using date ranges 210 today_start, today_end = get_range_for_day(local_today) 211 today_memos = base_memo_qs.filter(created_at__gte=today_start, created_at__lt=today_end) 212 213 week_start, week_end = get_range_for_week(week_start_local) 214 week_memos = base_memo_qs.filter(created_at__gte=week_start, created_at__lt=week_end) 215 216 month_start, month_end = get_range_for_month(local_today) 217 month_memos = base_memo_qs.filter(created_at__gte=month_start, created_at__lt=month_end) 218 219 def get_latest_snapshots_data(memo_qs): 220 """ 221 For each memo in queryset, get its latest snapshot. 222 Returns a list of latest snapshots. 223 """ 224 snapshots = [] 225 for memo in memo_qs: 226 latest_snapshot = memo.latest_snapshot() 227 if latest_snapshot: 228 snapshots.append(latest_snapshot) 229 return snapshots 230 231 # Get latest snapshots for each time period 232 filtered_snapshots = { 233 today_str: get_latest_snapshots_data(today_memos), 234 week_str: get_latest_snapshots_data(week_memos), 235 month_str: get_latest_snapshots_data(month_memos), 236 } 237 238 # Create output directory 239 output_dir = os.path.join(settings.MEDIA_ROOT, "dashboard") 240 os.makedirs(output_dir, exist_ok=True) 241 242 saved_files = [] 243 244 for key, snapshots_list in filtered_snapshots.items(): 245 memo_json_data = MemoSnapshotSerializer(snapshots_list, many=True).data if snapshots_list else [] 246 247 dashboard_data = { 248 "cardMetrics": get_card_metrics(memo_json_data), 249 "blockWard": get_block_ward_heatmap(memo_json_data), 250 "responderResolution": get_responder_resolution(memo_json_data), 251 "statusPie": get_status_pie(memo_json_data), 252 "approvalFlow": get_approval_flow(memo_json_data), 253 "departmentDistribution": get_department_distribution(memo_json_data), 254 "completionTimeline": get_completion_timeline(memo_json_data) 255 } 256 257 file_path = os.path.join(output_dir, f"{key}.json") 258 with open(file_path, "w", encoding='utf-8') as f: 259 json.dump(dashboard_data, f, indent=2, default=str, ensure_ascii=False) 260 261 saved_files.append(key) 262 263 return f"Saved dashboard files for: {', '.join(saved_files)}" 264 265def precompute_all_hospitals_dashboard_data(): 266 """ 267 Generate dashboard data for all hospitals individually and combined. 268 269 - Iterates over all hospitals and generates dashboard data for each. 270 - Optionally generates combined dashboard data for all hospitals. 271 - Returns a summary string of results. 272 """ 273 from accounts.models import Hospital 274 275 results = [] 276 277 # Generate for each hospital 278 hospitals = Hospital.objects.all() 279 for hospital in hospitals: 280 try: 281 result = precompute_dashboard_data_and_save(hospital_id=hospital.id) 282 results.append(f"Hospital {hospital.id}: {result}") 283 except Exception as e: 284 results.append(f"Hospital {hospital.id}: Error - {str(e)}") 285 286 # Generate combined data (no hospital filter) 287 # try: 288 # combined_result = precompute_dashboard_data_and_save() 289 # results.append(f"Combined: {combined_result}") 290 # except Exception as e: 291 # results.append(f"Combined: Error - {str(e)}") 292 293 return "\n".join(results) 294 295def get_hospital_dashboard_data(hospital_id, period='today'): 296 """ 297 Get specific dashboard data for a hospital and period. 298 299 Args: 300 hospital_id: ID of the hospital 301 period: 'today', 'week', or 'month' 302 303 Returns: 304 Dictionary with dashboard data or None if file doesn't exist or is invalid. 305 """ 306 now = timezone.now() 307 today = now.date() 308 week_number = today.isocalendar()[1] 309 year = today.year 310 month = today.strftime('%Y-%m') 311 312 # Generate filename based on period 313 if period == 'today': 314 filename = f"{hospital_id}_{today.strftime('%Y-%m-%d')}.json" 315 elif period == 'week': 316 filename = f"{hospital_id}_{year}-{today.strftime('%m')}-W{week_number:02d}.json" 317 elif period == 'month': 318 filename = f"{hospital_id}_{month}.json" 319 else: 320 raise ValueError("Period must be 'today', 'week', or 'month'") 321 322 file_path = os.path.join(settings.MEDIA_ROOT, "dashboard", filename) 323 324 try: 325 with open(file_path, "r", encoding='utf-8') as f: 326 return json.load(f) 327 except FileNotFoundError: 328 return None 329 except json.JSONDecodeError: 330 return None
18def get_card_metrics(data): 19 """ 20 Compute card metrics for dashboard: 21 - totalMemos: Total number of memos. 22 - completedMemos: Number of completed memos. 23 - completionRate: Percentage of completed memos. 24 - avgResolution: Average resolution time (hours). 25 - fastestResponder: Role with fastest average resolution. 26 - fastestAvgTime: Fastest average resolution time (hours). 27 """ 28 total = len(data) 29 completed = [m for m in data if m["info"]["status"] == "completed" and m["worker_status"]] 30 completion_rate = round(len(completed) / total * 100) if total else 0 31 32 total_res_time = 0 33 responder_times = defaultdict(list) 34 35 for m in completed: 36 start = datetime.fromisoformat(m["timestamp"]) 37 end = datetime.fromisoformat(m["worker_status"][0]["timestamp"]) 38 res_time = (end - start).total_seconds() / 3600 39 total_res_time += res_time 40 responder_times[m["info"]["tagged_role_name"]].append(res_time) 41 42 avg_res = round(total_res_time / len(completed), 1) if completed else 0 43 fastest = min(((sum(times)/len(times), role) for role, times in responder_times.items()), default=(None, "N/A")) 44 45 return { 46 "totalMemos": total, 47 "completedMemos": len(completed), 48 "completionRate": completion_rate, 49 "avgResolution": avg_res, 50 "fastestResponder": fastest[1], 51 "fastestAvgTime": round(fastest[0], 1) if fastest[0] is not None else None 52 }
Compute card metrics for dashboard:
- totalMemos: Total number of memos.
- completedMemos: Number of completed memos.
- completionRate: Percentage of completed memos.
- avgResolution: Average resolution time (hours).
- fastestResponder: Role with fastest average resolution.
- fastestAvgTime: Fastest average resolution time (hours).
54def get_block_ward_heatmap(data): 55 """ 56 Compute block and ward distribution for dashboard heatmap. 57 Returns: 58 block_counts: Dict of block name to count. 59 ward_counts: Dict of ward name to count. 60 """ 61 block_counts = Counter(m["info"].get("current_block_name", "Unknown") for m in data) 62 ward_counts = Counter(m["info"].get("current_ward_name", "Unknown") for m in data) 63 return dict(block_counts), dict(ward_counts)
Compute block and ward distribution for dashboard heatmap. Returns: block_counts: Dict of block name to count. ward_counts: Dict of ward name to count.
65def get_responder_resolution(data): 66 """ 67 Compute average resolution time per responder role. 68 Returns: 69 labels: List of role names. 70 values: List of average resolution times (hours). 71 """ 72 completed = [m for m in data if m["info"]["status"] == "completed" and m["worker_status"]] 73 responder_stats = defaultdict(list) 74 for m in completed: 75 start = datetime.fromisoformat(m["timestamp"]) 76 end = datetime.fromisoformat(m["worker_status"][0]["timestamp"]) 77 res_time = (end - start).total_seconds() / 3600 78 responder_stats[m["info"].get("tagged_role_name", "Unknown")].append(res_time) 79 return { 80 "labels": list(responder_stats.keys()), 81 "values": [round(sum(v)/len(v), 1) for v in responder_stats.values()] 82 }
Compute average resolution time per responder role. Returns: labels: List of role names. values: List of average resolution times (hours).
84def get_status_pie(data): 85 """ 86 Compute memo status pie chart data. 87 Returns: 88 completed: Number of completed memos. 89 rejected: Number of rejected memos. 90 ongoing: Number of ongoing memos. 91 """ 92 completed = 0 93 rejected = 0 94 ongoing = 0 95 for m in data: 96 if m["info"]["status"] == "completed": 97 completed += 1 98 elif any(h.get("rejected") for h in m.get("hierarchy", [])): 99 rejected += 1 100 else: 101 ongoing += 1 102 return {"completed": completed, "rejected": rejected, "ongoing": ongoing}
Compute memo status pie chart data. Returns: completed: Number of completed memos. rejected: Number of rejected memos. ongoing: Number of ongoing memos.
104def get_approval_flow(data): 105 """ 106 Compute approval flow chart data for dashboard. 107 Returns: 108 result: Dict of approval levels to approved/rejected/pending counts. 109 """ 110 levels = ["Ward Superintendent", "RMO", "DEAN"] 111 result = {lvl: {"approved": 0, "rejected": 0, "pending": 0} for lvl in levels} 112 113 for m in data: 114 for i, h in enumerate(m.get("hierarchy", [])): 115 lvl = levels[i] if i < len(levels) else f"Level {i+1}" 116 if h.get("approved"): 117 result[lvl]["approved"] += 1 118 elif h.get("rejected"): 119 result[lvl]["rejected"] += 1 120 else: 121 result[lvl]["pending"] += 1 122 return result
Compute approval flow chart data for dashboard. Returns: result: Dict of approval levels to approved/rejected/pending counts.
124def get_department_distribution(data): 125 """ 126 Compute department distribution for dashboard. 127 Returns: 128 Dict of department name to count. 129 """ 130 return dict(Counter(m["info"].get("department", "Unknown") for m in data))
Compute department distribution for dashboard. Returns: Dict of department name to count.
132def get_completion_timeline(data): 133 """ 134 Compute completion timeline chart data for dashboard. 135 Returns: 136 labels: List of time buckets. 137 values: List of counts per bucket. 138 """ 139 buckets = [ 140 ("0-2h", 0, 2), 141 ("2-4h", 2, 4), 142 ("4-6h", 4, 6), 143 ("6-8h", 6, 8), 144 ("8-12h", 8, 12), 145 ("12-24h", 12, 24), 146 ("24+h", 24, float("inf")) 147 ] 148 labels = [b[0] for b in buckets] 149 counts = [0] * len(buckets) 150 151 completed = [m for m in data if m["info"]["status"] == "completed" and m["worker_status"]] 152 for m in completed: 153 start = datetime.fromisoformat(m["timestamp"]) 154 end = datetime.fromisoformat(m["worker_status"][0]["timestamp"]) 155 res_time = (end - start).total_seconds() / 3600 156 for i, (_, lo, hi) in enumerate(buckets): 157 if lo < res_time <= hi: 158 counts[i] += 1 159 break 160 return {"labels": labels, "values": counts}
Compute completion timeline chart data for dashboard. Returns: labels: List of time buckets. values: List of counts per bucket.
162def precompute_dashboard_data_and_save(hospital_id=None): 163 """ 164 Generate dashboard snapshots for a specific hospital or all hospitals. 165 166 - Computes metrics for today, this week, and this month. 167 - Serializes latest memo snapshots for each period. 168 - Saves dashboard data as JSON files in media/dashboard. 169 - Returns a summary of saved files. 170 """ 171 now = timezone.now() 172 local_today = timezone.localtime(now).date() 173 week_start_local = local_today - timedelta(days=local_today.weekday()) 174 175 # Create time period strings with hospital prefix if provided 176 prefix = f"{hospital_id}_" if hospital_id else "" 177 week_number = local_today.isocalendar()[1] 178 year = local_today.year 179 month = local_today.strftime('%Y-%m') 180 181 today_str = f"{prefix}{local_today.strftime('%Y-%m-%d')}" 182 week_str = f"{prefix}{year}-W{week_number:02d}" 183 month_str = f"{prefix}{month}" 184 185 # Base memo queryset - filter by hospital if provided 186 base_memo_qs = Memo.objects.filter(is_deleted=False) 187 if hospital_id: 188 base_memo_qs = base_memo_qs.filter(hospital_id=hospital_id) 189 190 # Time range helpers 191 def get_range_for_day(date): 192 start = timezone.make_aware(datetime.combine(date, datetime.min.time())) 193 end = timezone.make_aware(datetime.combine(date + timedelta(days=1), datetime.min.time())) 194 return start, end 195 196 def get_range_for_week(start_date): 197 start = timezone.make_aware(datetime.combine(start_date, datetime.min.time())) 198 end = timezone.make_aware(datetime.combine(start_date + timedelta(days=7), datetime.min.time())) 199 return start, end 200 201 def get_range_for_month(date): 202 start = timezone.make_aware(datetime.combine(date.replace(day=1), datetime.min.time())) 203 if date.month == 12: 204 end_date = date.replace(year=date.year + 1, month=1, day=1) 205 else: 206 end_date = date.replace(month=date.month + 1, day=1) 207 end = timezone.make_aware(datetime.combine(end_date, datetime.min.time())) 208 return start, end 209 210 # Filter memos using date ranges 211 today_start, today_end = get_range_for_day(local_today) 212 today_memos = base_memo_qs.filter(created_at__gte=today_start, created_at__lt=today_end) 213 214 week_start, week_end = get_range_for_week(week_start_local) 215 week_memos = base_memo_qs.filter(created_at__gte=week_start, created_at__lt=week_end) 216 217 month_start, month_end = get_range_for_month(local_today) 218 month_memos = base_memo_qs.filter(created_at__gte=month_start, created_at__lt=month_end) 219 220 def get_latest_snapshots_data(memo_qs): 221 """ 222 For each memo in queryset, get its latest snapshot. 223 Returns a list of latest snapshots. 224 """ 225 snapshots = [] 226 for memo in memo_qs: 227 latest_snapshot = memo.latest_snapshot() 228 if latest_snapshot: 229 snapshots.append(latest_snapshot) 230 return snapshots 231 232 # Get latest snapshots for each time period 233 filtered_snapshots = { 234 today_str: get_latest_snapshots_data(today_memos), 235 week_str: get_latest_snapshots_data(week_memos), 236 month_str: get_latest_snapshots_data(month_memos), 237 } 238 239 # Create output directory 240 output_dir = os.path.join(settings.MEDIA_ROOT, "dashboard") 241 os.makedirs(output_dir, exist_ok=True) 242 243 saved_files = [] 244 245 for key, snapshots_list in filtered_snapshots.items(): 246 memo_json_data = MemoSnapshotSerializer(snapshots_list, many=True).data if snapshots_list else [] 247 248 dashboard_data = { 249 "cardMetrics": get_card_metrics(memo_json_data), 250 "blockWard": get_block_ward_heatmap(memo_json_data), 251 "responderResolution": get_responder_resolution(memo_json_data), 252 "statusPie": get_status_pie(memo_json_data), 253 "approvalFlow": get_approval_flow(memo_json_data), 254 "departmentDistribution": get_department_distribution(memo_json_data), 255 "completionTimeline": get_completion_timeline(memo_json_data) 256 } 257 258 file_path = os.path.join(output_dir, f"{key}.json") 259 with open(file_path, "w", encoding='utf-8') as f: 260 json.dump(dashboard_data, f, indent=2, default=str, ensure_ascii=False) 261 262 saved_files.append(key) 263 264 return f"Saved dashboard files for: {', '.join(saved_files)}"
Generate dashboard snapshots for a specific hospital or all hospitals.
- Computes metrics for today, this week, and this month.
- Serializes latest memo snapshots for each period.
- Saves dashboard data as JSON files in media/dashboard.
- Returns a summary of saved files.
266def precompute_all_hospitals_dashboard_data(): 267 """ 268 Generate dashboard data for all hospitals individually and combined. 269 270 - Iterates over all hospitals and generates dashboard data for each. 271 - Optionally generates combined dashboard data for all hospitals. 272 - Returns a summary string of results. 273 """ 274 from accounts.models import Hospital 275 276 results = [] 277 278 # Generate for each hospital 279 hospitals = Hospital.objects.all() 280 for hospital in hospitals: 281 try: 282 result = precompute_dashboard_data_and_save(hospital_id=hospital.id) 283 results.append(f"Hospital {hospital.id}: {result}") 284 except Exception as e: 285 results.append(f"Hospital {hospital.id}: Error - {str(e)}") 286 287 # Generate combined data (no hospital filter) 288 # try: 289 # combined_result = precompute_dashboard_data_and_save() 290 # results.append(f"Combined: {combined_result}") 291 # except Exception as e: 292 # results.append(f"Combined: Error - {str(e)}") 293 294 return "\n".join(results)
Generate dashboard data for all hospitals individually and combined.
- Iterates over all hospitals and generates dashboard data for each.
- Optionally generates combined dashboard data for all hospitals.
- Returns a summary string of results.
296def get_hospital_dashboard_data(hospital_id, period='today'): 297 """ 298 Get specific dashboard data for a hospital and period. 299 300 Args: 301 hospital_id: ID of the hospital 302 period: 'today', 'week', or 'month' 303 304 Returns: 305 Dictionary with dashboard data or None if file doesn't exist or is invalid. 306 """ 307 now = timezone.now() 308 today = now.date() 309 week_number = today.isocalendar()[1] 310 year = today.year 311 month = today.strftime('%Y-%m') 312 313 # Generate filename based on period 314 if period == 'today': 315 filename = f"{hospital_id}_{today.strftime('%Y-%m-%d')}.json" 316 elif period == 'week': 317 filename = f"{hospital_id}_{year}-{today.strftime('%m')}-W{week_number:02d}.json" 318 elif period == 'month': 319 filename = f"{hospital_id}_{month}.json" 320 else: 321 raise ValueError("Period must be 'today', 'week', or 'month'") 322 323 file_path = os.path.join(settings.MEDIA_ROOT, "dashboard", filename) 324 325 try: 326 with open(file_path, "r", encoding='utf-8') as f: 327 return json.load(f) 328 except FileNotFoundError: 329 return None 330 except json.JSONDecodeError: 331 return None
Get specific dashboard data for a hospital and period.
Args: hospital_id: ID of the hospital period: 'today', 'week', or 'month'
Returns: Dictionary with dashboard data or None if file doesn't exist or is invalid.