File indexing completed on 2025-02-22 10:47:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033 #ifndef PMIX_IOF_H
0034 #define PMIX_IOF_H
0035
0036 #include "src/include/pmix_config.h"
0037
0038 #ifdef HAVE_SYS_TYPES_H
0039 # include <sys/types.h>
0040 #endif
0041 #ifdef HAVE_SYS_UIO_H
0042 # include <sys/uio.h>
0043 #endif
0044 #ifdef HAVE_NET_UIO_H
0045 # include <net/uio.h>
0046 #endif
0047 #ifdef HAVE_UNISTD_H
0048 # include <unistd.h>
0049 #endif
0050 #include <signal.h>
0051
0052 #include "src/class/pmix_list.h"
0053 #include "src/include/pmix_globals.h"
0054 #include "src/util/pmix_fd.h"
0055
0056 BEGIN_C_DECLS
0057
0058
0059
0060
0061 #define PMIX_IOF_BASE_MSG_MAX 8192
0062 #define PMIX_IOF_BASE_TAG_MAX 1024
0063 #define PMIX_IOF_MAX_INPUT_BUFFERS 50
0064 #define PMIX_IOF_MAX_RETRIES 4
0065
0066 typedef struct {
0067 pmix_list_item_t super;
0068 bool pending;
0069 bool always_writable;
0070 int numtries;
0071 pmix_event_t *ev;
0072 struct timeval tv;
0073 int fd;
0074 pmix_list_t outputs;
0075 } pmix_iof_write_event_t;
0076 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_write_event_t);
0077 #define PMIX_IOF_WRITE_EVENT_STATIC_INIT \
0078 { \
0079 .super = PMIX_LIST_ITEM_STATIC_INIT, \
0080 .pending = false, \
0081 .always_writable = false, \
0082 .numtries = 0, \
0083 .ev = NULL, \
0084 .tv = {0, 0}, \
0085 .fd = 0, \
0086 .outputs = PMIX_LIST_STATIC_INIT \
0087 }
0088
0089 typedef struct {
0090 pmix_list_item_t super;
0091 pmix_proc_t name;
0092 pmix_iof_channel_t tag;
0093 pmix_iof_write_event_t wev;
0094 bool xoff;
0095 bool exclusive;
0096 bool closed;
0097 } pmix_iof_sink_t;
0098 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_sink_t);
0099 #define PMIX_IOF_SINK_STATIC_INIT \
0100 { \
0101 .super = PMIX_LIST_ITEM_STATIC_INIT, \
0102 .name = {{0}, 0}, \
0103 .tag = PMIX_FWD_NO_CHANNELS, \
0104 .wev = PMIX_IOF_WRITE_EVENT_STATIC_INIT, \
0105 .xoff = false, \
0106 .exclusive = false, \
0107 .closed = false \
0108 }
0109
0110 typedef struct {
0111 pmix_list_item_t super;
0112 char *data;
0113 int numbytes;
0114 } pmix_iof_write_output_t;
0115 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_write_output_t);
0116
0117 typedef struct {
0118 pmix_object_t super;
0119 pmix_event_t ev;
0120 struct timeval tv;
0121 int fd;
0122 bool active;
0123 void *childproc;
0124 bool always_readable;
0125 pmix_proc_t name;
0126 pmix_iof_channel_t channel;
0127 pmix_proc_t *targets;
0128 size_t ntargets;
0129 pmix_info_t *directives;
0130 size_t ndirs;
0131 } pmix_iof_read_event_t;
0132 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_read_event_t);
0133
0134 typedef struct {
0135 pmix_list_item_t super;
0136 pmix_proc_t name;
0137 pmix_iof_write_event_t *channel;
0138 pmix_iof_flags_t flags;
0139 pmix_iof_channel_t stream;
0140 bool copystdout;
0141 bool copystderr;
0142 pmix_byte_object_t bo;
0143 } pmix_iof_residual_t;
0144 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_residual_t);
0145
0146
0147
0148 static inline bool pmix_iof_fd_always_ready(int fd)
0149 {
0150 return pmix_fd_is_regular(fd) || (pmix_fd_is_chardev(fd) && !isatty(fd))
0151 || pmix_fd_is_blkdev(fd);
0152 }
0153
0154 #define PMIX_IOF_SINK_BLOCKSIZE (1024)
0155
0156 #define PMIX_IOF_SINK_ACTIVATE(w) \
0157 do { \
0158 struct timeval *tv = NULL; \
0159 (w)->pending = true; \
0160 PMIX_POST_OBJECT((w)); \
0161 if ((w)->always_writable) { \
0162 \
0163 tv = &(w)->tv; \
0164 } \
0165 if (pmix_event_add((w)->ev, tv)) { \
0166 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); \
0167 } \
0168 } while (0);
0169
0170
0171
0172 #define PMIX_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr) \
0173 do { \
0174 pmix_output_verbose(1, pmix_client_globals.iof_output, \
0175 "defining endpt: file %s line %d fd %d", __FILE__, __LINE__, (fid)); \
0176 PMIX_CONSTRUCT((snk), pmix_iof_sink_t); \
0177 pmix_strncpy((snk)->name.nspace, (nm)->nspace, PMIX_MAX_NSLEN); \
0178 (snk)->name.rank = (nm)->rank; \
0179 (snk)->tag = (tg); \
0180 if (0 <= (fid)) { \
0181 (snk)->wev.fd = (fid); \
0182 (snk)->wev.always_writable = pmix_iof_fd_always_ready(fid); \
0183 if ((snk)->wev.always_writable) { \
0184 pmix_event_evtimer_set(pmix_globals.evbase, (snk)->wev.ev, wrthndlr, (snk)); \
0185 } else { \
0186 pmix_event_set(pmix_globals.evbase, (snk)->wev.ev, (snk)->wev.fd, PMIX_EV_WRITE, \
0187 wrthndlr, (snk)); \
0188 } \
0189 } \
0190 PMIX_POST_OBJECT(snk); \
0191 } while (0);
0192
0193
0194 #define PMIX_IOF_READ_ADDEV(rev) \
0195 do { \
0196 struct timeval *tv = NULL; \
0197 if ((rev)->always_readable) { \
0198 tv = &(rev)->tv; \
0199 } \
0200 if (pmix_event_add(&(rev)->ev, tv)) { \
0201 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); \
0202 } \
0203 } while (0);
0204
0205 #define PMIX_IOF_READ_ACTIVATE(rev) \
0206 do { \
0207 (rev)->active = true; \
0208 PMIX_POST_OBJECT(rev); \
0209 PMIX_IOF_READ_ADDEV(rev); \
0210 } while (0);
0211
0212 #define PMIX_IOF_READ_EVENT(rv, p, np, d, nd, fid, cbfunc, actv) \
0213 do { \
0214 size_t _ii; \
0215 pmix_iof_read_event_t *rev; \
0216 pmix_output_verbose(1, pmix_client_globals.iof_output, "defining read event at: %s %d", \
0217 __FILE__, __LINE__); \
0218 rev = PMIX_NEW(pmix_iof_read_event_t); \
0219 if (NULL != (p)) { \
0220 (rev)->ntargets = (np); \
0221 PMIX_PROC_CREATE((rev)->targets, (rev)->ntargets); \
0222 memcpy((rev)->targets, (p), (np) * sizeof(pmix_proc_t)); \
0223 } \
0224 if (NULL != (d) && 0 < (nd)) { \
0225 PMIX_INFO_CREATE((rev)->directives, (nd)); \
0226 (rev)->ndirs = (nd); \
0227 for (_ii = 0; _ii < (size_t) nd; _ii++) { \
0228 PMIX_INFO_XFER(&((rev)->directives[_ii]), &((d)[_ii])); \
0229 } \
0230 } \
0231 rev->fd = (fid); \
0232 rev->always_readable = pmix_iof_fd_always_ready(fid); \
0233 *(rv) = rev; \
0234 if (rev->always_readable) { \
0235 pmix_event_evtimer_set(pmix_globals.evbase, &rev->ev, (cbfunc), rev); \
0236 } else { \
0237 pmix_event_set(pmix_globals.evbase, &rev->ev, (fid), PMIX_EV_READ, (cbfunc), rev); \
0238 } \
0239 if ((actv)) { \
0240 PMIX_IOF_READ_ACTIVATE(rev) \
0241 } \
0242 } while (0);
0243
0244 #define PMIX_IOF_READ_EVENT_LOCAL(rv, fid, cbfunc, actv) \
0245 do { \
0246 pmix_iof_read_event_t *rev; \
0247 pmix_output_verbose(1, pmix_client_globals.iof_output, "defining read event at: %s %d", \
0248 __FILE__, __LINE__); \
0249 rev = PMIX_NEW(pmix_iof_read_event_t); \
0250 rev->fd = (fid); \
0251 rev->always_readable = pmix_iof_fd_always_ready(fid); \
0252 *(rv) = rev; \
0253 if (rev->always_readable) { \
0254 pmix_event_evtimer_set(pmix_globals.evbase, &rev->ev, (cbfunc), rev); \
0255 } else { \
0256 pmix_event_set(pmix_globals.evbase, &rev->ev, (fid), PMIX_EV_READ, (cbfunc), rev); \
0257 } \
0258 if ((actv)) { \
0259 PMIX_IOF_READ_ACTIVATE(rev) \
0260 } \
0261 } while (0);
0262
0263
0264 PMIX_EXPORT pmix_status_t pmix_iof_flush(void);
0265
0266 PMIX_EXPORT pmix_status_t pmix_iof_write_output(const pmix_proc_t *name, pmix_iof_channel_t stream,
0267 const pmix_byte_object_t *bo);
0268 PMIX_EXPORT void pmix_iof_static_dump_output(pmix_iof_sink_t *sink);
0269 PMIX_EXPORT void pmix_iof_write_handler(int fd, short event, void *cbdata);
0270 PMIX_EXPORT bool pmix_iof_stdin_check(int fd);
0271 PMIX_EXPORT void pmix_iof_read_local_handler(int unusedfd, short event, void *cbdata);
0272 PMIX_EXPORT void pmix_iof_stdin_cb(int fd, short event, void *cbdata);
0273 PMIX_EXPORT pmix_status_t pmix_iof_process_iof(pmix_iof_channel_t channels,
0274 const pmix_proc_t *source,
0275 const pmix_byte_object_t *bo,
0276 const pmix_info_t *info, size_t ninfo,
0277 const pmix_iof_req_t *req);
0278 PMIX_EXPORT void pmix_iof_check_flags(pmix_info_t *info, pmix_iof_flags_t *flags);
0279 PMIX_EXPORT void pmix_iof_flush_residuals(void);
0280
0281 END_C_DECLS
0282
0283 #endif