File indexing completed on 2026-04-10 08:39:07
0001 import sys
0002 import time
0003 import uuid
0004
0005 from pandaserver.taskbuffer.FileSpec import FileSpec
0006 from pandaserver.taskbuffer.JobSpec import JobSpec
0007 from pandaserver.userinterface import Client
0008
0009 prodUserNameDefault = "jschovan"
0010 prodUserName = None
0011 prodUserNameDP = None
0012 prodUserNamePipeline = None
0013
0014
0015 site = "ANALY_BNL-LSST-mergetest"
0016 PIPELINE_TASK = None
0017 PIPELINE_PROCESSINSTANCE = None
0018 PIPELINE_EXECUTIONNUMBER = None
0019 PIPELINE_STREAM = None
0020 lsstJobParams = ""
0021
0022 for idx, argv in enumerate(sys.argv):
0023 if argv == "--site":
0024 try:
0025 site = sys.argv[idx + 1]
0026 except Exception:
0027 site = "ANALY_BNL-LSST-mergetest"
0028 if argv == "-DP_USER":
0029 try:
0030 prodUserNameDP = sys.argv[idx + 1]
0031 if len(lsstJobParams):
0032 lsstJobParams += "|"
0033 lsstJobParams += f"DP_USER={str(prodUserNameDP)}"
0034 except Exception:
0035 prodUserNameDP = None
0036 if argv == "-PIPELINE_USER":
0037 try:
0038 prodUserNamePipeline = sys.argv[idx + 1]
0039 if len(lsstJobParams):
0040 lsstJobParams += "|"
0041 lsstJobParams += f"PIPELINE_USER={str(prodUserNamePipeline)}"
0042 except Exception:
0043 prodUserNamePipeline = None
0044 if argv == "-PIPELINE_TASK":
0045 try:
0046 PIPELINE_TASK = sys.argv[idx + 1]
0047 if len(lsstJobParams):
0048 lsstJobParams += "|"
0049 lsstJobParams += f"PIPELINE_TASK={str(PIPELINE_TASK)}"
0050 except Exception:
0051 PIPELINE_TASK = None
0052 if argv == "-PIPELINE_PROCESSINSTANCE":
0053 try:
0054 PIPELINE_PROCESSINSTANCE = int(sys.argv[idx + 1])
0055 if len(lsstJobParams):
0056 lsstJobParams += "|"
0057 lsstJobParams += f"PIPELINE_PROCESSINSTANCE={str(PIPELINE_PROCESSINSTANCE)}"
0058 except Exception:
0059 PIPELINE_PROCESSINSTANCE = None
0060 if argv == "-PIPELINE_EXECUTIONNUMBER":
0061 try:
0062 PIPELINE_EXECUTIONNUMBER = int(sys.argv[idx + 1])
0063 if len(lsstJobParams):
0064 lsstJobParams += "|"
0065 lsstJobParams += f"PIPELINE_EXECUTIONNUMBER={str(PIPELINE_EXECUTIONNUMBER)}"
0066 except Exception:
0067 PIPELINE_EXECUTIONNUMBER = None
0068 if argv == "-PIPELINE_STREAM":
0069 try:
0070 PIPELINE_STREAM = int(sys.argv[idx + 1])
0071 if len(lsstJobParams):
0072 lsstJobParams += "|"
0073 lsstJobParams += f"PIPELINE_STREAM={str(PIPELINE_STREAM)}"
0074 except Exception:
0075 PIPELINE_STREAM = None
0076
0077
0078 if prodUserNameDP is not None:
0079 prodUserName = prodUserNameDP
0080 elif prodUserNamePipeline is not None:
0081 prodUserName = prodUserNamePipeline
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091 destName = None
0092
0093 if prodUserName is not None and PIPELINE_TASK is not None and PIPELINE_PROCESSINSTANCE is not None:
0094 datasetName = f"panda.lsst.user.{str(PIPELINE_PROCESSINSTANCE)}.{str(PIPELINE_TASK)}.{str(prodUserName)}"
0095 else:
0096 datasetName = f"panda.lsst.user.jschovan.{str(uuid.uuid4())}"
0097
0098 if prodUserName is not None and PIPELINE_TASK is not None and PIPELINE_EXECUTIONNUMBER is not None and PIPELINE_STREAM is not None:
0099 jobName = f"job.{str(PIPELINE_PROCESSINSTANCE)}.{str(PIPELINE_TASK)}.{str(PIPELINE_EXECUTIONNUMBER)}.{str(prodUserName)}.{str(PIPELINE_STREAM)}"
0100 else:
0101 jobName = f"{str(uuid.uuid4())}"
0102
0103 if PIPELINE_STREAM is not None:
0104 jobDefinitionID = PIPELINE_STREAM
0105 else:
0106 jobDefinitionID = int(time.time()) % 10000
0107 job = JobSpec()
0108 job.jobDefinitionID = jobDefinitionID
0109 job.jobName = jobName
0110 job.transformation = "http://rpm-test.pandawms.org/pandawms-jobcache/lsst-trf.sh"
0111 job.destinationDBlock = datasetName
0112 job.destinationSE = "local"
0113 job.currentPriority = 1000
0114 job.prodSourceLabel = "panda"
0115 job.jobParameters = f' --lsstJobParams="{lsstJobParams}" '
0116 if prodUserName is not None:
0117 job.prodUserName = prodUserName
0118 else:
0119 job.prodUserName = prodUserNameDefault
0120 if PIPELINE_PROCESSINSTANCE is not None:
0121 job.taskID = PIPELINE_PROCESSINSTANCE
0122 if PIPELINE_EXECUTIONNUMBER is not None:
0123 job.attemptNr = PIPELINE_EXECUTIONNUMBER
0124 if PIPELINE_TASK is not None:
0125 job.processingType = PIPELINE_TASK
0126 job.computingSite = site
0127 job.VO = "lsst"
0128
0129 fileOL = FileSpec()
0130 fileOL.lfn = f"{job.jobName}.job.log.tgz"
0131 fileOL.destinationDBlock = job.destinationDBlock
0132 fileOL.destinationSE = job.destinationSE
0133 fileOL.dataset = job.destinationDBlock
0134 fileOL.type = "log"
0135 job.addFile(fileOL)
0136
0137
0138 s, o = Client.submit_jobs([job])
0139 print(s)
0140 for x in o:
0141 print(f"PandaID={x[0]}")