Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 #!/usr/bin/env python
0002 
0003 from pathlib import Path
0004 import matplotlib as mpl # type: ignore
0005 import matplotlib.pyplot as plt # type: ignore
0006 from matplotlib.colors import LogNorm # type: ignore
0007 import numpy as np # type: ignore
0008 import collections
0009 import pickle
0010 import json
0011 
0012 import pprint # noqa F401
0013 
0014 from argparsing import monitor_args
0015 from sphenixdbutils import test_mode as dbutils_test_mode
0016 from simpleLogger import slogger, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0017 from sphenixmisc import setup_rot_handler
0018 from sphenixcondortools import base_batchname_from_args, monitor_condor_jobs
0019 import htcondor2 as htcondor # type: ignore
0020 
0021 def plot_memory_distribution(memory_usage, request_memory, output_file):
0022     """Generates and saves a histogram of memory usage vs. requested memory."""
0023     if not memory_usage and not request_memory:
0024         INFO("No memory data to plot for distribution.")
0025         return
0026 
0027     mpl.rcParams['axes.formatter.useoffset'] = False
0028 
0029     plt.style.use('seaborn-v0_8-deep')
0030     fig, ax = plt.subplots(figsize=(12, 7))
0031 
0032     max_val = max(np.max(memory_usage) if memory_usage else 0, np.max(request_memory) if request_memory else 0)
0033     bins = np.linspace(0, max_val, 50)
0034 
0035     if memory_usage:
0036         ax.hist(memory_usage, bins=bins, alpha=0.7, label=f'Memory Usage (Avg: {np.mean(memory_usage):.0f} MB)')
0037     if request_memory:
0038         ax.hist(request_memory, bins=bins, alpha=0.7, label=f'Requested Memory (Avg: {np.mean(request_memory):.0f} MB)')
0039 
0040     ax.set_title('Distribution of Memory Usage and Request for Held Jobs')
0041     ax.set_xlabel('Memory (MB)')
0042     ax.set_ylabel('Number of Jobs')
0043     ax.legend()
0044     ax.grid(True, which='both', linestyle='--', linewidth=0.5)
0045 
0046     plt.tight_layout()
0047     plt.savefig(output_file)
0048     plt.close(fig)
0049     INFO(f"Saved memory distribution plot to {output_file}")
0050 
0051 def plot_memory_boxplot(memory_usage, request_memory, output_file):
0052     """Generates and saves a boxplot of memory usage vs. requested memory."""
0053     if not memory_usage and not request_memory:
0054         INFO("No memory data to plot for boxplot.")
0055         return
0056 
0057     plt.style.use('seaborn-v0_8-deep')
0058     fig, ax = plt.subplots(figsize=(10, 7))
0059 
0060     data_to_plot = []
0061     labels = []
0062     if memory_usage:
0063         data_to_plot.append(memory_usage)
0064         labels.append('Memory Usage')
0065     if request_memory:
0066         data_to_plot.append(request_memory)
0067         labels.append('Requested Memory')
0068 
0069     ax.boxplot(data_to_plot, patch_artist=True)
0070 
0071     ax.set_title('Box Plot of Memory Usage and Request for Held Jobs')
0072     ax.set_ylabel('Memory (MB)')
0073     ax.set_xticklabels(labels)
0074     ax.yaxis.grid(True, linestyle='--', which='major', color='grey', alpha=0.5)
0075 
0076     plt.tight_layout()
0077     plt.savefig(output_file)
0078     plt.close(fig)
0079     INFO(f"Saved memory box plot to {output_file}")
0080 
0081 def plot_memory_scatterplot(memory_usage, request_memory, output_file):
0082     """Generates and saves a 2D histogram of memory usage vs. requested memory."""
0083     if not memory_usage or not request_memory:
0084         INFO("Not enough data to plot 2D histogram.")
0085         return
0086 
0087     plt.style.use('seaborn-v0_8-deep')
0088     fig, ax = plt.subplots(figsize=(11, 10))
0089 
0090     x_data = np.array(request_memory)
0091     y_data = np.array(memory_usage)
0092 
0093     # Create the 2D histogram. A logarithmic color scale is used to handle skewed distributions.
0094     counts, xedges, yedges, im = ax.hist2d(x_data, y_data, bins=50, cmap='viridis', norm=LogNorm())
0095 
0096     # Add a color bar to show the number of jobs in each bin
0097     fig.colorbar(im, ax=ax, label='Number of Jobs')
0098 
0099     # Add a y=x reference line for easy comparison
0100     max_val = max(np.max(x_data) if len(x_data) > 0 else 0, np.max(y_data) if len(y_data) > 0 else 0)
0101     ax.plot([0, max_val], [0, max_val], 'r--', label='Requested = Used (y=x)')
0102 
0103     ax.set_title('Memory Usage vs. Requested Memory for Held Jobs (2D Histogram)')
0104     ax.set_xlabel('Requested Memory (MB)')
0105     ax.set_ylabel('Actual Memory Usage (MB)')
0106     ax.legend()
0107     ax.grid(True, which='both', linestyle='--', linewidth=0.5)
0108     ax.set_aspect('equal', adjustable='box')
0109 
0110     plt.tight_layout()
0111     plt.savefig(output_file)
0112     plt.close(fig)
0113     INFO(f"Saved memory 2D histogram to {output_file}")
0114 
0115 def main():
0116     args = monitor_args()
0117     #################### Test mode?
0118     test_mode = (
0119             dbutils_test_mode
0120             or args.test_mode
0121             # or ( hasattr(rule, 'test_mode') and rule.test_mode ) ## allow in the yaml file?
0122         )
0123 
0124     # Set up submission logging before going any further
0125     sublogdir=setup_rot_handler(args)
0126     slogger.setLevel(args.loglevel)
0127     INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0128 
0129     if test_mode:
0130         INFO("Running in testbed mode.")
0131         args.mangle_dirpath = 'production-testbed'
0132     else:
0133         INFO("Running in production mode.")
0134 
0135     batch_name=base_batchname_from_args(args)
0136     jobs=monitor_condor_jobs(batch_name=batch_name, dryrun=args.dryrun)
0137 
0138     # Filter for held jobs (JobStatus == 5)
0139     held_jobs_ads = [ad for ad in jobs.values() if ad.get('JobStatus') == 5]
0140 
0141     if not held_jobs_ads:
0142         INFO(f"Found {len(jobs)} total jobs, but none are currently held.")
0143         return
0144 
0145     INFO(f"Found {len(jobs)} total jobs, {len(held_jobs_ads)} of which are held.")
0146 
0147     held_memory_usage = []
0148     held_request_memory = []
0149     kill_suggestion = []
0150     under_memory_hold_reasons = collections.Counter()
0151     for job_ad in held_jobs_ads:
0152         # MemoryUsage and RequestMemory are in MB
0153         mu = int(job_ad.get('ResidentSetSize', 0))/1024  # Convert from KB to MB
0154         rm = int(job_ad.get('MemoryProvisioned', 0))
0155         held_memory_usage.append(mu)
0156         held_request_memory.append(rm)
0157         # If memory usage is below request, it's interesting to see why it's held.
0158         if mu < rm:
0159             hold_reason = job_ad.get('HoldReason', 'Not Available')
0160             job_id = f"{job_ad.get('ClusterId')}.{job_ad.get('ProcId')}"
0161             DEBUG(f"Job {job_id} held with mu ({mu:.0f}MB) < rm ({rm}MB). Reason: {hold_reason}")
0162             reason_code = job_ad.get('LastHoldReasonCode', 0) # Default to 0 (None)
0163             if reason_code !=26 :
0164                 WARN(f'Job {job_id} held with mu ({mu:.0f}MB) < rm ({rm}MB). Reason Code {reason_code}:\n\t"{hold_reason}"')
0165             under_memory_hold_reasons[reason_code] += 1
0166 
0167         # # Now let's kill and resubmit this job
0168         # # Fix difference between Submit object and ClassAd keys
0169         # job_ad['output']=job_ad.pop('Out')
0170         # job_ad['error']=job_ad.pop('Err')
0171         # # adjust memory request
0172         # new_submit_ad = htcondor.Submit(dict(job_ad))
0173         # if args.memory:
0174         #     new_rm=int(args.memory)
0175         # else:
0176         #     new_rm=int(rm)
0177         #     new_rm=int(new_rm * 1.5)  # Increase request by 50%
0178         #     if new_rm > args.max_memory:
0179         #         WARN(f"Calculated new memory request {new_rm}MB exceeds maximum of {args.max_memory}MB. Skipping.")
0180         #         #kill_suggestion.append(f"{job_ad['ClusterId']}.{job_ad['ProcId']}")
0181         #         kill_suggestion.append(job_ad)
0182         #         continue
0183         # new_submit_ad['RequestMemory'] = str(new_rm)
0184         # if args.resubmit:
0185         #     if not args.dryrun:
0186         #         schedd = htcondor.Schedd()
0187         #         try:
0188         #             # The transaction context manager is deprecated. The following replacement operations are not atomic.
0189         #             schedd.act(htcondor.JobAction.Remove, [f"{job_ad['ClusterId']}.{job_ad['ProcId']}"])
0190         #             INFO(f"Removed held job {job_ad['ClusterId']}.{job_ad['ProcId']} from queue.")
0191         #             submit_result = schedd.submit(new_submit_ad)
0192         #             new_queue_id = submit_result.cluster()
0193         #             INFO(f"Resubmitted job with increased memory request ({rm}MB -> {new_rm}MB) as {new_queue_id}.")                    
0194         #         except Exception as e:
0195         #             ERROR(f"Failed to remove and resubmit job {job_ad['ClusterId']}.{job_ad['ProcId']}: {e}")
0196         #     else:
0197         #         INFO(f"(Dry Run) Would remove held job {job_ad['ClusterId']}.{job_ad['ProcId']} and resubmit with RequestMemory={new_rm}MB.")
0198 
0199     if args.plot:
0200         if held_memory_usage or held_request_memory:
0201             dist_plot_file = f"{batch_name}_memory_distribution.png"
0202             box_plot_file  = f"{batch_name}_memory_boxplot.png"
0203             scatter_plot_file = f"{batch_name}_memory_scatterplot.png"
0204             plot_memory_distribution(held_memory_usage, held_request_memory, dist_plot_file)
0205             plot_memory_boxplot(held_memory_usage, held_request_memory, box_plot_file)
0206             plot_memory_scatterplot(held_memory_usage, held_request_memory, scatter_plot_file)
0207 
0208         if under_memory_hold_reasons:
0209             INFO("Frequency of hold reason codes for jobs held while under memory request:")
0210             pprint.pprint(dict(under_memory_hold_reasons))
0211         # /if args.plot
0212 
0213     if kill_suggestion:
0214         INFO(f"There were {len(kill_suggestion)} jobs that could not be resubmitted due to exceeding max memory.")
0215         if args.kill:
0216             INFO(f"Killing them now as per --kill option.")
0217             if not args.dryrun or True:
0218                 schedd = htcondor.Schedd()
0219                 with open(f"{batch_name}_killed_jobs.pkl", "wb") as f:
0220                     pickle.dump(kill_suggestion, f)
0221                 with open(f"{batch_name}_killed_jobs.json", "w") as f:
0222                     for job_ad in kill_suggestion:
0223                         json.dump(dict(job_ad), f, indent=4)
0224                 try:
0225 
0226                     #schedd.act(htcondor.JobAction.Remove, kill_procs)
0227                     INFO(f"Killed {len(kill_suggestion)} jobs that exceeded max memory limit of {args.max_memory}MB.")
0228                 except Exception as e:
0229                     ERROR(f"Failed to kill jobs: {e}")
0230         else:
0231             kill_procs=[f"{job_ad['ClusterId']}.{job_ad['ProcId']}" for job_ad in kill_suggestion]
0232             INFO(f"You may want to kill them manually: \n{', '.join(kill_procs)}")
0233 
0234 
0235     INFO(f"{Path(__file__).name} DONE.")
0236 
0237 if __name__ == '__main__':
0238     main()
0239     exit(0)