File indexing completed on 2025-01-30 10:29:38
0001
0002
0003
0004
0005 #include <JANA/JApplication.h>
0006 #include <JANA/JObject.h>
0007 #include <JANA/JEventSource.h>
0008 #include <JANA/JEventProcessor.h>
0009 #include "JANA/Engine/JTopologyBuilder.h"
0010 #include <JANA/Topology/JSubeventArrow.h>
0011
0012
0013 struct MyInput : public JObject {
0014 int x;
0015 float y;
0016 int evt = 0;
0017 int sub = 0;
0018
0019 MyInput(int x, float y, int evt, int sub) : x(x), y(y), evt(evt), sub(sub) {}
0020 };
0021
0022 struct MyOutput : public JObject {
0023 float z;
0024 int evt = 0;
0025 int sub = 0;
0026
0027 explicit MyOutput(float z, int evt, int sub) : z(z), evt(evt), sub(sub) {}
0028 };
0029
0030 __global__ void myKernel(MyInput *in, MyOutput *out, int n) {
0031
0032 int id = blockIdx.x * blockDim.x + threadIdx.x;
0033
0034
0035 if (id < n) {
0036 out[id].z = in[id].x + in[id].y;
0037 out[id].evt = in[id].evt;
0038 out[id].sub = in[id].sub;
0039 }
0040 }
0041
0042 void myKernelWrapper(const MyInput *h_in, MyOutput *h_out) {
0043 MyInput *d_in;
0044 MyOutput *d_out;
0045 cudaMalloc((void **) &d_in, sizeof(MyInput));
0046 cudaMalloc((void **) &d_out, sizeof(MyOutput));
0047
0048 cudaMemcpy(d_in, h_in, sizeof(MyInput), cudaMemcpyHostToDevice);
0049 cudaMemcpy(d_out, h_out, sizeof(MyOutput), cudaMemcpyHostToDevice);
0050
0051 myKernel<<<1, 1>>>(d_in, d_out, 1);
0052
0053 cudaMemcpy(h_out, d_out, sizeof(MyOutput), cudaMemcpyDeviceToHost);
0054
0055 cudaFree(d_in);
0056 cudaFree(d_out);
0057 }
0058
0059 struct MyProcessor : public JSubeventProcessor<MyInput, MyOutput> {
0060 MyProcessor() {
0061 inputTag = "";
0062 outputTag = "subeventted";
0063 }
0064
0065 MyOutput *ProcessSubevent(MyInput *input) override {
0066 LOG << "Processing subevent " << input->evt << ":" << input->sub << LOG_END;
0067
0068
0069 MyOutput *output = new MyOutput(0.0, -1, -1);
0070 LOG << " Before CUDA, evt:sub=" << output->evt << ":" << output->sub << LOG_END;
0071 myKernelWrapper(input, output);
0072 LOG << " After CUDA, evt:sub=" << output->evt << ":" << output->sub << LOG_END;
0073 return output;
0074 }
0075 };
0076
0077
0078 struct SimpleSource : public JEventSource {
0079 SimpleSource() {
0080 SetCallbackStyle(CallbackStyle::ExpertMode);
0081 };
0082
0083 Result Emit(JEvent& event) override {
0084 auto evt = event.GetEventNumber();
0085 std::vector < MyInput * > inputs;
0086 inputs.push_back(new MyInput(22, 3.6, evt, 0));
0087 inputs.push_back(new MyInput(23, 3.5, evt, 1));
0088 inputs.push_back(new MyInput(24, 3.4, evt, 2));
0089 inputs.push_back(new MyInput(25, 3.3, evt, 3));
0090 inputs.push_back(new MyInput(26, 3.2, evt, 4));
0091 event.Insert(inputs);
0092 LOG << "Emitting event " << event->GetEventNumber() << LOG_END;
0093 return Result::Success;
0094 }
0095 };
0096
0097 struct SimpleProcessor : public JEventProcessor {
0098
0099 SimpleProcessor() {
0100 SetCallbackStyle(CallbackStyle::ExpertMode);
0101 }
0102
0103 void Process(const JEvent& event) {
0104
0105 std::lock_guard <std::mutex> guard(m_mutex);
0106
0107 auto outputs = event.Get<MyOutput>();
0108
0109
0110
0111
0112
0113 LOG << " Contents of event " << event.GetEventNumber() << LOG_END;
0114 for (auto output: outputs) {
0115 LOG << " " << output->evt << ":" << output->sub << " " << output->z << LOG_END;
0116 }
0117 LOG << " DONE with contents of event " << event.GetEventNumber() << LOG_END;
0118 }
0119 };
0120
0121
0122 int main() {
0123
0124 MyProcessor processor;
0125 JMailbox <std::shared_ptr<JEvent>> events_in;
0126 JMailbox <std::shared_ptr<JEvent>> events_out;
0127 JMailbox <SubeventWrapper<MyInput>> subevents_in;
0128 JMailbox <SubeventWrapper<MyOutput>> subevents_out;
0129
0130 auto split_arrow = new JSplitArrow<MyInput, MyOutput>("split", &processor, &events_in, &subevents_in);
0131 auto subprocess_arrow = new JSubeventArrow<MyInput, MyOutput>("subprocess", &processor, &subevents_in, &subevents_out);
0132 auto merge_arrow = new JMergeArrow<MyInput, MyOutput>("merge", &processor, &subevents_out, &events_out);
0133
0134 JApplication app;
0135 app.SetTimeoutEnabled(false);
0136 app.SetTicker(false);
0137
0138 auto source = new SimpleSource("simpleSource");
0139 source->SetNEvents(10);
0140
0141
0142 auto topology = app.GetService<JTopologyBuilder>();
0143 auto source_arrow = new JEventSourceArrow("simpleSource", {source});
0144 source_arrow->set_input(topology->event_pool);
0145 source_arrow->set_output(&events_in);
0146
0147 auto proc_arrow = new JEventMapArrow("simpleProcessor");
0148 proc_arrow->set_input(&events_out);
0149 proc_arrow->set_output(topology->event_pool);
0150 proc_arrow->add_processor(new SimpleProcessor);
0151
0152 topology->arrows.push_back(source_arrow);
0153 topology->sources.push_back(source_arrow);
0154 topology->arrows.push_back(split_arrow);
0155 topology->arrows.push_back(subprocess_arrow);
0156 topology->arrows.push_back(merge_arrow);
0157 topology->arrows.push_back(proc_arrow);
0158 topology->sinks.push_back(proc_arrow);
0159
0160 source_arrow->attach(split_arrow);
0161 split_arrow->attach(subprocess_arrow);
0162 subprocess_arrow->attach(merge_arrow);
0163 merge_arrow->attach(proc_arrow);
0164
0165 app.Run(true);
0166
0167 }
0168