Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:10

0001 """
0002 Common utility functions for the monitor application.
0003 """
0004 from datetime import timedelta
0005 from django.utils import timezone
0006 from django.http import JsonResponse
0007 from django.db.models import Q
0008 
0009 
0010 def format_duration(delta, is_ongoing=False):
0011     """
0012     Format a timedelta to a human-readable duration string.
0013     
0014     Args:
0015         delta: timedelta object representing the duration
0016         is_ongoing: bool, whether to append "(ongoing)" to the result
0017         
0018     Returns:
0019         str: Formatted duration string
0020         - For durations < 24 hours: "HH:MM:SS"
0021         - For durations >= 24 hours: "Nd HH:MM:SS"
0022         - Appends " (ongoing)" if is_ongoing=True
0023         
0024     Examples:
0025         format_duration(timedelta(hours=2, minutes=15, seconds=30))
0026         # Returns: "02:15:30"
0027         
0028         format_duration(timedelta(days=5, hours=8, minutes=42, seconds=15))
0029         # Returns: "5d 08:42:15"
0030         
0031         format_duration(timedelta(days=1, hours=6), is_ongoing=True)
0032         # Returns: "1d 06:00:00 (ongoing)"
0033     """
0034     if not isinstance(delta, timedelta):
0035         return 'N/A'
0036     
0037     total_seconds = delta.total_seconds()
0038     if total_seconds < 0:
0039         return 'N/A'
0040     
0041     days, remainder = divmod(total_seconds, 86400)  # 86400 seconds in a day
0042     hours, remainder = divmod(remainder, 3600)
0043     minutes, seconds = divmod(remainder, 60)
0044     
0045     if days > 0:
0046         duration_str = f"{int(days)}d {int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"
0047     else:
0048         duration_str = f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"
0049     
0050     if is_ongoing:
0051         duration_str += " (ongoing)"
0052     
0053     return duration_str
0054 
0055 
0056 def format_run_duration(start_time, end_time=None):
0057     """
0058     Format the duration of a run, handling both completed and ongoing runs.
0059     
0060     Args:
0061         start_time: datetime when the run started
0062         end_time: datetime when the run ended (None for ongoing runs)
0063         
0064     Returns:
0065         str: Formatted duration string
0066     """
0067     if not start_time:
0068         return 'N/A'
0069     
0070     if end_time:
0071         # Completed run
0072         duration = end_time - start_time
0073         return format_duration(duration)
0074     else:
0075         # Ongoing run
0076         now = timezone.now()
0077         elapsed = now - start_time
0078         return format_duration(elapsed, is_ongoing=True)
0079 
0080 
0081 def format_elapsed_time(start_time, reference_time=None):
0082     """
0083     Format elapsed time from a start time to now (or reference time).
0084     
0085     Args:
0086         start_time: datetime to calculate elapsed time from
0087         reference_time: datetime to calculate to (defaults to now)
0088         
0089     Returns:
0090         str: Formatted elapsed time string
0091     """
0092     if not start_time:
0093         return 'N/A'
0094     
0095     if reference_time is None:
0096         reference_time = timezone.now()
0097     
0098     elapsed = reference_time - start_time
0099     return format_duration(elapsed)
0100 
0101 
0102 def format_datetime(dt):
0103     """
0104     Standard datetime formatting for all views in the monitor application.
0105     Converts UTC timestamps to Eastern time for display.
0106     
0107     Args:
0108         dt: datetime object to format (expected to be timezone-aware UTC)
0109         
0110     Returns:
0111         str: Formatted datetime string in YYYYMMDD HH:MM:SS format (Eastern time), or 'N/A' if None
0112     """
0113     if not dt:
0114         return 'N/A'
0115     
0116     # Convert to Eastern time using Django's timezone support
0117     from zoneinfo import ZoneInfo
0118     eastern_tz = ZoneInfo('America/New_York')
0119     dt_eastern = dt.astimezone(eastern_tz)
0120     
0121     return dt_eastern.strftime('%Y%m%d %H:%M:%S')
0122 
0123 
0124 class DataTablesProcessor:
0125     """
0126     Common processor for server-side DataTables AJAX requests.
0127     Handles pagination, searching, ordering, and filtering consistently.
0128     """
0129     
0130     def __init__(self, request, columns, default_order_column=0, default_order_direction='desc'):
0131         """
0132         Initialize DataTables processor with request parameters.
0133         
0134         Args:
0135             request: Django request object
0136             columns: List of column names that match template column order
0137             default_order_column: Default column index for ordering (0-based)
0138             default_order_direction: 'asc' or 'desc'
0139         """
0140         self.request = request
0141         self.columns = columns
0142         
0143         # Extract DataTables parameters
0144         self.draw = int(request.GET.get('draw', 1))
0145         self.start = int(request.GET.get('start', 0))
0146         self.length = int(request.GET.get('length', 100))
0147         self.search_value = request.GET.get('search[value]', '').strip()
0148         
0149         # Order parameters
0150         self.order_column_idx = int(request.GET.get('order[0][column]', default_order_column))
0151         self.order_direction = request.GET.get('order[0][dir]', default_order_direction)
0152         self.order_column = self.columns[self.order_column_idx] if 0 <= self.order_column_idx < len(self.columns) else self.columns[default_order_column]
0153     
0154     def get_order_by(self, special_cases=None):
0155         """
0156         Get the order_by string for queryset ordering.
0157         
0158         Args:
0159             special_cases: Dict mapping column names to custom order_by strings
0160             
0161         Returns:
0162             str: Order by string for queryset
0163         """
0164         if special_cases and self.order_column in special_cases:
0165             order_by = special_cases[self.order_column]
0166         else:
0167             order_by = self.order_column
0168         
0169         if self.order_direction == 'desc' and not order_by.startswith('-'):
0170             order_by = f'-{order_by}'
0171         elif self.order_direction == 'asc' and order_by.startswith('-'):
0172             order_by = order_by[1:]
0173             
0174         return order_by
0175     
0176     def apply_search(self, queryset, search_fields):
0177         """
0178         Apply search filtering to queryset.
0179         
0180         Args:
0181             queryset: Django queryset to filter
0182             search_fields: List of field names to search in
0183             
0184         Returns:
0185             Filtered queryset
0186         """
0187         if not self.search_value:
0188             return queryset
0189             
0190         search_q = Q()
0191         for field in search_fields:
0192             search_q |= Q(**{f'{field}__icontains': self.search_value})
0193         
0194         return queryset.filter(search_q)
0195     
0196     def apply_pagination(self, queryset):
0197         """
0198         Apply pagination to queryset.
0199         
0200         Args:
0201             queryset: Django queryset to paginate
0202             
0203         Returns:
0204             Paginated queryset slice
0205         """
0206         return queryset[self.start:self.start + self.length]
0207     
0208     def create_response(self, data, records_total, records_filtered):
0209         """
0210         Create standardized DataTables JSON response.
0211         
0212         Args:
0213             data: List of data rows for the table
0214             records_total: Total number of records before filtering
0215             records_filtered: Number of records after filtering
0216             
0217         Returns:
0218             JsonResponse object
0219         """
0220         return JsonResponse({
0221             'draw': self.draw,
0222             'recordsTotal': records_total,
0223             'recordsFiltered': records_filtered,
0224             'data': data
0225         })
0226 
0227 
0228 def get_filter_params(request, param_names):
0229     """
0230     Extract filter parameters from request GET params.
0231     
0232     Args:
0233         request: Django request object
0234         param_names: List of parameter names to extract
0235         
0236     Returns:
0237         Dict of parameter_name: value pairs
0238     """
0239     return {param: request.GET.get(param) for param in param_names}
0240 
0241 
0242 def apply_filters(queryset, filters):
0243     """
0244     Apply multiple filters to a queryset.
0245     Handles boolean field conversion from string values.
0246     
0247     Args:
0248         queryset: Django queryset to filter
0249         filters: Dict of field_name: value pairs
0250         
0251     Returns:
0252         Filtered queryset
0253     """
0254     for field, value in filters.items():
0255         # Apply filter for any non-None, non-empty value (including 'false' string)
0256         if value is not None and value != '':
0257             # Convert string boolean values to actual booleans for database filtering
0258             if value == 'true':
0259                 filter_value = True
0260             elif value == 'false':
0261                 filter_value = False
0262             else:
0263                 filter_value = value
0264             
0265             queryset = queryset.filter(**{field: filter_value})
0266     return queryset
0267 
0268 
0269 def format_timestamp_fields(data_dict):
0270     """
0271     Format timestamp fields in a dictionary for display.
0272     
0273     Automatically detects timestamp fields by name (containing 'time' or 'timestamp')
0274     and formats them using the standard format_datetime function.
0275     
0276     Args:
0277         data_dict: Dictionary with potential timestamp string values
0278         
0279     Returns:
0280         New dictionary with formatted timestamp strings
0281     """
0282     from datetime import datetime
0283     
0284     formatted_dict = {}
0285     for key, value in data_dict.items():
0286         if isinstance(value, str) and ('time' in key.lower() or 'timestamp' in key.lower()):
0287             # Try to parse and format timestamp strings
0288             try:
0289                 # Handle various timestamp formats
0290                 if 'T' in value:  # ISO format
0291                     dt = datetime.fromisoformat(value.replace('Z', '+00:00'))
0292                 else:
0293                     dt = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
0294                 formatted_dict[key] = format_datetime(dt)
0295             except:
0296                 # If parsing fails, keep original value
0297                 formatted_dict[key] = value
0298         else:
0299             formatted_dict[key] = value
0300     
0301     return formatted_dict
0302 
0303 
0304 def get_filter_counts(queryset, filter_fields, current_filters=None):
0305     """
0306     Calculate counts for each possible filter value, considering current filters.
0307     Only returns options that have >0 matches in the current filtered dataset.
0308     
0309     Args:
0310         queryset: Base queryset to calculate counts from
0311         filter_fields: List of field names to calculate counts for
0312         current_filters: Dict of currently active filters to consider
0313         
0314     Returns:
0315         Dict of filter_field: [(value, count), ...] pairs, sorted by count desc
0316     """
0317     from django.db.models import Count
0318     
0319     if current_filters is None:
0320         current_filters = {}
0321     
0322     filter_counts = {}
0323     
0324     for field in filter_fields:
0325         # Start with base queryset
0326         field_queryset = queryset
0327         
0328         # Apply all current filters EXCEPT the one we're calculating counts for
0329         temp_filters = {k: v for k, v in current_filters.items() if k != field and v}
0330         field_queryset = apply_filters(field_queryset, temp_filters)
0331         
0332         # Get distinct values and their counts
0333         # Use the model's primary key field name instead of assuming 'id'
0334         pk_field = field_queryset.model._meta.pk.name
0335         counts = (field_queryset
0336                  .values(field)
0337                  .annotate(count=Count(pk_field))
0338                  .filter(count__gt=0)  # Only include options with >0 matches
0339                  .order_by('-count', field))
0340         
0341         # Convert to list of tuples: (value, count)
0342         # Include all values, including False for boolean fields (don't filter out falsy values)
0343         filter_counts[field] = [(item[field], item['count']) for item in counts if item[field] is not None]
0344     
0345     return filter_counts