Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-30 10:29:38

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #include <JANA/JApplication.h>
0006 #include <JANA/JObject.h>
0007 #include <JANA/JEventSource.h>
0008 #include <JANA/JEventProcessor.h>
0009 #include "JANA/Engine/JTopologyBuilder.h"
0010 #include <JANA/Topology/JSubeventArrow.h>
0011 
0012 
0013 struct MyInput : public JObject {
0014     int x;
0015     float y;
0016     int evt = 0;
0017     int sub = 0;
0018 
0019     MyInput(int x, float y, int evt, int sub) : x(x), y(y), evt(evt), sub(sub) {}
0020 };
0021 
0022 struct MyOutput : public JObject {
0023     float z;
0024     int evt = 0;
0025     int sub = 0;
0026 
0027     explicit MyOutput(float z, int evt, int sub) : z(z), evt(evt), sub(sub) {}
0028 };
0029 
0030 __global__ void myKernel(MyInput *in, MyOutput *out, int n) {
0031     // Get our global thread ID
0032     int id = blockIdx.x * blockDim.x + threadIdx.x;
0033 
0034     // Make sure we do not go out of bounds
0035     if (id < n) {
0036         out[id].z = in[id].x + in[id].y;
0037         out[id].evt = in[id].evt;
0038         out[id].sub = in[id].sub;
0039     }
0040 }
0041 
0042 void myKernelWrapper(const MyInput *h_in, MyOutput *h_out) {
0043     MyInput *d_in;
0044     MyOutput *d_out;
0045     cudaMalloc((void **) &d_in, sizeof(MyInput));
0046     cudaMalloc((void **) &d_out, sizeof(MyOutput));
0047 
0048     cudaMemcpy(d_in, h_in, sizeof(MyInput), cudaMemcpyHostToDevice);
0049     cudaMemcpy(d_out, h_out, sizeof(MyOutput), cudaMemcpyHostToDevice);
0050 
0051     myKernel<<<1, 1>>>(d_in, d_out, 1); // launch with only 1 GPU thread
0052 
0053     cudaMemcpy(h_out, d_out, sizeof(MyOutput), cudaMemcpyDeviceToHost);
0054 
0055     cudaFree(d_in);
0056     cudaFree(d_out);
0057 }
0058 
0059 struct MyProcessor : public JSubeventProcessor<MyInput, MyOutput> {
0060     MyProcessor() {
0061         inputTag = "";
0062         outputTag = "subeventted";
0063     }
0064 
0065     MyOutput *ProcessSubevent(MyInput *input) override {
0066         LOG << "Processing subevent " << input->evt << ":" << input->sub << LOG_END;
0067 
0068         // return new MyOutput(input->y + (float) input->x, input->evt, input->sub); // replace with CUDA here
0069         MyOutput *output = new MyOutput(0.0, -1, -1);
0070         LOG << "    Before CUDA, evt:sub=" << output->evt << ":" << output->sub << LOG_END;
0071         myKernelWrapper(input, output);
0072         LOG << "    After CUDA, evt:sub=" << output->evt << ":" << output->sub << LOG_END;
0073         return output;
0074     }
0075 };
0076 
0077 
0078 struct SimpleSource : public JEventSource {
0079     SimpleSource() {
0080         SetCallbackStyle(CallbackStyle::ExpertMode); 
0081     };
0082 
0083     Result Emit(JEvent& event) override {
0084         auto evt = event.GetEventNumber();
0085         std::vector < MyInput * > inputs;
0086         inputs.push_back(new MyInput(22, 3.6, evt, 0));
0087         inputs.push_back(new MyInput(23, 3.5, evt, 1));
0088         inputs.push_back(new MyInput(24, 3.4, evt, 2));
0089         inputs.push_back(new MyInput(25, 3.3, evt, 3));
0090         inputs.push_back(new MyInput(26, 3.2, evt, 4));
0091         event.Insert(inputs);
0092         LOG << "Emitting event " << event->GetEventNumber() << LOG_END;
0093         return Result::Success;
0094     }
0095 };
0096 
0097 struct SimpleProcessor : public JEventProcessor {
0098 
0099     SimpleProcessor() {
0100         SetCallbackStyle(CallbackStyle::ExpertMode);
0101     }
0102 
0103     void Process(const JEvent& event) {
0104 
0105         std::lock_guard <std::mutex> guard(m_mutex);
0106 
0107         auto outputs = event.Get<MyOutput>();
0108         // assert(outputs.size() == 4);
0109         // assert(outputs[0]->z == 25.6f);
0110         // assert(outputs[1]->z == 26.5f);
0111         // assert(outputs[2]->z == 27.4f);
0112         // assert(outputs[3]->z == 28.3f);
0113         LOG << " Contents of event " << event.GetEventNumber() << LOG_END;
0114         for (auto output: outputs) {
0115             LOG << " " << output->evt << ":" << output->sub << " " << output->z << LOG_END;
0116         }
0117         LOG << " DONE with contents of event " << event.GetEventNumber() << LOG_END;
0118     }
0119 };
0120 
0121 
0122 int main() {
0123 
0124     MyProcessor processor;
0125     JMailbox <std::shared_ptr<JEvent>> events_in;
0126     JMailbox <std::shared_ptr<JEvent>> events_out;
0127     JMailbox <SubeventWrapper<MyInput>> subevents_in;
0128     JMailbox <SubeventWrapper<MyOutput>> subevents_out;
0129 
0130     auto split_arrow = new JSplitArrow<MyInput, MyOutput>("split", &processor, &events_in, &subevents_in);
0131     auto subprocess_arrow = new JSubeventArrow<MyInput, MyOutput>("subprocess", &processor, &subevents_in, &subevents_out);
0132     auto merge_arrow = new JMergeArrow<MyInput, MyOutput>("merge", &processor, &subevents_out, &events_out);
0133 
0134     JApplication app;
0135     app.SetTimeoutEnabled(false);
0136     app.SetTicker(false);
0137 
0138     auto source = new SimpleSource("simpleSource");
0139     source->SetNEvents(10); // limit ourselves to 10 events. Note that the 'jana:nevents' param won't work
0140     // here because we aren't using JComponentManager to manage the EventSource
0141 
0142     auto topology = app.GetService<JTopologyBuilder>();
0143     auto source_arrow = new JEventSourceArrow("simpleSource", {source});
0144     source_arrow->set_input(topology->event_pool);
0145     source_arrow->set_output(&events_in);
0146 
0147     auto proc_arrow = new JEventMapArrow("simpleProcessor");
0148     proc_arrow->set_input(&events_out);
0149     proc_arrow->set_output(topology->event_pool);
0150     proc_arrow->add_processor(new SimpleProcessor);
0151 
0152     topology->arrows.push_back(source_arrow);
0153     topology->sources.push_back(source_arrow);
0154     topology->arrows.push_back(split_arrow);
0155     topology->arrows.push_back(subprocess_arrow);
0156     topology->arrows.push_back(merge_arrow);
0157     topology->arrows.push_back(proc_arrow);
0158     topology->sinks.push_back(proc_arrow);
0159 
0160     source_arrow->attach(split_arrow);
0161     split_arrow->attach(subprocess_arrow);
0162     subprocess_arrow->attach(merge_arrow);
0163     merge_arrow->attach(proc_arrow);
0164 
0165     app.Run(true);
0166 
0167 }
0168