JEMRIS 2.9.2
open-source MRI simulations
Loading...
Searching...
No Matches
mpi_Model.h
Go to the documentation of this file.
1
5/*
6 * JEMRIS Copyright (C)
7 * 2006-2025 Tony Stoecker
8 * 2007-2018 Kaveh Vahedipour
9 * 2009-2019 Daniel Pflugfelder
10 *
11 *
12 * This program is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * This program is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with this program; if not, write to the Free Software
24 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
25 */
26
27
28#ifndef _MPI_MODEL_H_
29#define _MPI_MODEL_H_
30
31#include<iostream>
32#include "config.h"
33
34using namespace std;
35#define MPICH_IGNORE_CXX_SEEK
36
37#include <mpi.h>
38
39#include "Signal.h"
40#include "Declarations.h"
41#include "Sample.h"
42#include "MultiPoolSample.h"
43
44#ifdef HAVE_MPI_THREADS
45 #include <pthread.h>
46#endif
47
48void mpi_send_paket_signal(Signal* pSig,int CoilID);
49void mpi_recv_paket_signal(Signal* pSig,int SlaveID,int CoilID);
50
51/*****************************************************************************/
52inline MPI_Datatype MPIspindata () {
53
54 int NUM_DATA = World::instance()->GetNoOfSpinProps();
55
56 MPI_Datatype MPI_SPINDATA ;
57 MPI_Datatype type = MPI_DOUBLE;
58 MPI_Aint disp = 0;
59#if MPI_VERSION < 3
60 MPI_Type_struct( 1, &NUM_DATA, &disp, &type, &MPI_SPINDATA);
61#else
62 MPI_Type_create_struct( 1, &NUM_DATA, &disp, &type, &MPI_SPINDATA);
63#endif
64 MPI_Type_commit( &MPI_SPINDATA);
65
66 return MPI_SPINDATA;
67
68}
69
70/*****************************************************************************/
71/* function that does the progress counting; called by an extra thread from the master process */
72#ifdef HAVE_MPI_THREADS
73/*************************************************************************/
74
75void* CountProgress(void * arg) {
76
77 int TotalNoSpins = *((int *) arg);
78 int SpinsDone = TotalNoSpins - *(((int *) arg )+1);
79 int NowDone=0;
80 int progress_percent = -1;
81 static string bars = "***************************************************";
82 static string blancs = " ";
83
84 MPI_Status status;
85
86 while (SpinsDone != TotalNoSpins) {
87 MPI_Recv(&NowDone,1,MPI_INT,MPI_ANY_SOURCE,SPINS_PROGRESS,MPI_COMM_WORLD,&status);
88 SpinsDone += NowDone;
89
90 //update progress counter
91 int progr = (100*(SpinsDone+1)/TotalNoSpins);
92 if ( progr != progress_percent) {
93 progress_percent = progr;
94 ofstream fout(".jemris_progress.out" , ios::out);
95 fout << progr;
96 fout.close();
97
98 cout << "\rSimulating | ";
99 cout << bars.substr(0, progr/2) << " " << blancs.substr(0, 50-progr/2) << "| " <<setw(3) << setfill(' ') << progr << "% done";
100
101 flush(cout);
102
103 }
104 }
105
106 return new void*;
107
108}
109#endif
110
111/*****************************************************************************/
112void mpi_devide_and_send_sample (Sample* pSam, CoilArray* RxCA ) {
113
114#ifdef HAVE_MPI_THREADS
115 // copy + paste thread example
116 pthread_t counter_thread;
117 int errcode; /* holds pthread error code */
118
119 int buffer[2];
120 buffer[0] = pSam->GetSize();
121 buffer[1] = pSam->SpinsLeft();
122
123 errcode=pthread_create(&counter_thread, /* thread struct */
124 NULL, /* default thread attributes */
125 CountProgress, /* start routine */
126 (void *) &buffer);
127
128 if (errcode) { /* arg to routine */
129 cout << "thread creation failed !! exit."<< endl;
130 exit (-1);
131 }
132 // end copy + paste thread example
133#endif
134
135 int size, ilen;
136 std::string hm (MPI_MAX_PROCESSOR_NAME,' ');
137
138 MPI_Comm_size(MPI_COMM_WORLD, &size);
139 MPI_Get_processor_name(&hm[0], &ilen);
140
141 cout << endl << hm.c_str() << " -> Master Process: send MR sample (" << pSam->GetSize()
142 << " spins) to " << size-1 << " slave(s)"<< endl << endl << flush;
143
144 std::vector<int> sendcount (size); // array with NumberOfSpins[slaveid]
145 std::vector<int> displs (size);
146 int recvbuf;
147 double* recvdummy = 0; // dummy buffer
148
149 // no spin data for master:
150 displs[0] = 0;
151 sendcount[0] = 0;
152
153 pSam->GetScatterVectors(sendcount.data(), displs.data(), size);
154
155 // scatter sendcounts:
156 MPI_Scatter (sendcount.data(), 1, MPI_INT, &recvbuf, 1, MPI_INT, 0, MPI_COMM_WORLD);
157
158
159 //MODIF
160 /*int ierr,my_id,num_procs;
161 ierr = MPI_Comm_rank(MPI_COMM_WORLD, &my_id);
162 ierr = MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
163
164 printf("I'm process %i out of %i processes\n", my_id, num_procs);*/
165
166 int ii;
167 long TotalSpinNumber=pSam->GetSize(),nextSpinMem=-1;
168 for(ii=0;ii<size;ii++)
169 {
170 //cout<<ii<<" Sencount "<<sendcount[ii]<<" Displs "<<displs[ii]<<endl;
171 if(ii>0) {
172 MPI_Send(&TotalSpinNumber,1,MPI_LONG,ii,SEND_TOTAL_NO_SPINS,MPI_COMM_WORLD);
173 MPI_Send(&displs[ii],1,MPI_LONG,ii,SEND_BEGIN_SPIN,MPI_COMM_WORLD);
174 //cout<<0<<" Sent spins index information to "<<ii<<" : "<<TotalSpinNumber<<" "<<displs[ii]<<endl;
175 }
176 }
177 //MODIF***
178
179
180 // broadcast number of individual spin prioperties:
181 long NProps = pSam->GetNProps();
182 MPI_Bcast (&NProps, 1, MPI_LONG, 0, MPI_COMM_WORLD);
183//*
184 long hsize = pSam->GetHelperSize();
185 MPI_Bcast (&hsize, 1, MPI_LONG, 0, MPI_COMM_WORLD);
186 MPI_Bcast (pSam->GetHelper(), hsize, MPI_DOUBLE, 0, MPI_COMM_WORLD);
187
188 long dsize = pSam->GetSampleDimsSize();
189 MPI_Bcast (&dsize, 1, MPI_LONG, 0, MPI_COMM_WORLD);
190 MPI_Bcast (&(pSam->GetSampleDims()[0]), dsize, MPI_INT, 0, MPI_COMM_WORLD);
191// */
192 int csize = pSam->GetNoSpinCompartments();
193 MPI_Bcast (&csize , 1, MPI_INT, 0, MPI_COMM_WORLD);
194// */
195 MPI_Datatype MPI_SPINDATA = MPIspindata();
196
197 //scatter sendcounts:
198 MPI_Scatterv (pSam->GetSpinsData(), sendcount.data(), displs.data(),
199 MPI_SPINDATA, &recvdummy, 0, MPI_SPINDATA, 0, MPI_COMM_WORLD);
200 // broadcast resolution:
201 MPI_Bcast (pSam->GetResolution(),3,MPI_DOUBLE,0, MPI_COMM_WORLD);
202
203 // now listen for paket requests:
204 int SlavesDone=0;
205 while (SlavesDone < size - 1) {
206
207 int SlaveID;
208 int NoSpins;
209 int NextSpinToSend;
210 MPI_Status status;
211 MPI_Recv(&SlaveID, 1, MPI_INT, MPI_ANY_SOURCE,
212 REQUEST_SPINS, MPI_COMM_WORLD, &status);
213
214 //get next spin paket to send:
215 pSam->GetNextPacket(NoSpins,NextSpinToSend,SlaveID);
216
217
218 //MODIF
219 for(ii=0;ii<size;ii++)
220 {
221 //cout<<ii<<" sencount "<<NoSpins<<" displs "<<NextSpinToSend<<endl;
222 if(ii>0 && NextSpinToSend!=nextSpinMem) {
223 MPI_Send(&NextSpinToSend,1,MPI_LONG,SlaveID,SEND_BEGIN_SPIN,MPI_COMM_WORLD);
224 //cout<<endl<<0<<" sent spins index information to "<<SlaveID<<" : "<<"--- "<<NextSpinToSend<<endl;
225 nextSpinMem=NextSpinToSend;
226 if(nextSpinMem<0) nextSpinMem-=1;
227 }
228
229 }
230 //MODIF***
231
232
233 if (NoSpins == 0)
234 SlavesDone++;
235
236 // receive signal
237 for (unsigned int i=0; i < RxCA->GetSize(); i++)
238 mpi_recv_paket_signal(RxCA->GetCoil(i)->GetSignal(),SlaveID,i);
239
240 // dump temp signal
241 pSam->DumpRestartInfo(RxCA);
242
243 // now send NoSpins
244 MPI_Send(&NoSpins,1,MPI_INT,SlaveID,SEND_NO_SPINS, MPI_COMM_WORLD);
245 if (NoSpins > 0)
246 MPI_Send(&(pSam->GetSpinsData())[NextSpinToSend*pSam->GetNProps()],
247 NoSpins, MPI_SPINDATA,SlaveID,SEND_SAMPLE, MPI_COMM_WORLD);
248
249#ifndef HAVE_MPI_THREADS
250
251 // without threads: write progress bar each time new spins are sent:
252 static int progress_percent = -1;
253 static int spinsdone = 0;
254 spinsdone += sendcount[SlaveID];
255 sendcount[SlaveID] = NoSpins;
256
257 //update progress counter (pjemris without threads support)
258 World* pW = World::instance();
259 int progr = (100*(spinsdone+1)/pW->TotalSpinNumber);
260 // case of restart: set progress to 100 at end:
261 if (SlavesDone==(size-1)) progr=100;
262
263 if (progr != progress_percent) {
264 progress_percent = progr;
265 ofstream fout(".jemris_progress.out" , ios::out);
266 fout << progr;
267 fout.close();
268 }
269#endif
270
271 } // end while (SlavesDone < size -1)
272
273 flush (cout);
274
275#ifdef HAVE_MPI_THREADS
276 /* join threads: */
277 int *status; /* holds return code */
278 errcode = pthread_join(counter_thread,(void **) &status);
279 if (errcode) {
280 cout << "Joining of threads failed! (so what... ;) )"<<endl;
281 // exit(-1);
282 }
283#endif
284
285 return;
286
287}
288
289/*
290 * Receive first portion of sample from MPI master process.
291 * @return pointer to the new (sub)sample.
292 * Deletion of the (sub)sample has to be done elsewehere!
293 * (This is e.g. done by the Simulator class)
294 */
295Sample* mpi_receive_sample(int sender, int tag){
296
297 long NPoints;
298 World* pW = World::instance();
299
300 // get number of spins:
301 int nospins;
302 MPI_Scatter(NULL,1,MPI_INT,&nospins,1,MPI_INT,0, MPI_COMM_WORLD);
303
304 //MODIF
305 long TotalSpinNumber=0,beginTraj=0;
306 //cout<<World::instance()->m_myRank<<" Query spins index information: "<<endl;
307 MPI_Status status;
308 MPI_Recv(&TotalSpinNumber,1,MPI_LONG,0,SEND_TOTAL_NO_SPINS,MPI_COMM_WORLD,&status);
309 MPI_Recv(&beginTraj,1,MPI_LONG,0,SEND_BEGIN_SPIN,MPI_COMM_WORLD,&status);
310 pW->setTrajLoading(beginTraj,TotalSpinNumber);
311 int num_traj=beginTraj;
312 //cout<<World::instance()->m_myRank<<" Received spins index information: "<<TotalSpinNumber<<" "<<num_traj<<endl;
313 //MODIF***
314
315 NPoints= (long) nospins;
316 //get number of physical properties per spin
317 long NProps;
318 MPI_Bcast (&NProps,1,MPI_LONG,0, MPI_COMM_WORLD);
319
320 Sample* pSam = new Sample();
321 pSam->CreateSpins(NProps, NPoints);
322 pW->SetNoOfSpinProps(pSam->GetNProps());
323
324/* >> required for multi-pool samples with exchange*/
325 long hsize = 0;
326 MPI_Bcast (&hsize, 1, MPI_LONG,0, MPI_COMM_WORLD);
327 pSam->CreateHelper(hsize);
328
329 MPI_Bcast (pSam->GetHelper(),(int)hsize,MPI_DOUBLE,0, MPI_COMM_WORLD);
330 pW->InitHelper(pSam->GetHelperSize());
331 pSam->GetHelper( pW->Helper() );
332
333 long dsize = 0;
334 MPI_Bcast (&dsize, 1, MPI_LONG,0, MPI_COMM_WORLD);
335 pSam->CreateDims(dsize);
336
337 MPI_Bcast (&(pSam->GetSampleDims()[0]),(int)dsize,MPI_INT,0, MPI_COMM_WORLD);
338 int csize = 0;
339 MPI_Bcast (&csize, 1, MPI_INT,0, MPI_COMM_WORLD);
340 pSam->SetNoSpinCompartments(csize);
341 /* << required for multi-pool samples with exchange*/
342
343
344 MPI_Datatype MPI_SPINDATA = MPIspindata();
345 // get sample:
346 MPI_Scatterv (NULL,NULL,NULL,MPI_SPINDATA,pSam->GetSpinsData(),nospins,MPI_SPINDATA,0, MPI_COMM_WORLD);
347 //get resolution (needed for position randomness)
348 MPI_Bcast (pSam->GetResolution(),3,MPI_DOUBLE,0, MPI_COMM_WORLD);
349
350 int ilen; std::string hm (MPI_MAX_PROCESSOR_NAME, ' ');
351 MPI_Get_processor_name(&hm[0],&ilen);
352 //MODIF
353 //cout <<endl<< "hostname"<< " -> slave " << setw(2) << MPI::COMM_WORLD.Get_rank() << ": received "
354 // << NPoints << " spins for simulation ..." << endl << flush;
355 //MODIF***
356
357 return pSam;
358
359}
360
361/*****************************************************************************/
367
368 World* pW = World::instance();
369 int myRank = pW->m_myRank;
370 int NoSpins = 0;
371
372 //backup dimensions
373 vector<size_t> d = samp->GetSampleDims();
374
375 // request new spins:
376 MPI_Send(&myRank,1,MPI_INT,0,REQUEST_SPINS,MPI_COMM_WORLD);
377
378 // send signal
379 for (unsigned int i=0; i < RxCA->GetSize(); i++)
380 mpi_send_paket_signal(RxCA->GetCoil(i)->GetSignal(),i);
381
382 // Receive number of spins
383 MPI_Status status;
384 MPI_Recv(&NoSpins,1,MPI_INT,0,SEND_NO_SPINS,MPI_COMM_WORLD,&status);
385
386 //MODIF
387 long TotalSpinNumber=World::instance()->getTrajNumber(),beginTraj=0;
388 //cout<<World::instance()->m_myRank<<" query spins index information: "<<endl;
389 //MPI_Recv(&TotalSpinNumber,1,MPI_LONG,0,SEND_TOTAL_NO_SPINS,MPI_COMM_WORLD,&status);
390 MPI_Recv(&beginTraj,1,MPI_LONG,0,SEND_BEGIN_SPIN,MPI_COMM_WORLD,&status);
391 World::instance()->setTrajLoading(beginTraj,TotalSpinNumber);
392 //cout<<endl<<World::instance()->m_myRank<<" received spins index information: "<<TotalSpinNumber<<" "<<beginTraj<<endl;
393 //MODIF***
394
395 // Spins left?
396 if (NoSpins == 0) return false;
397
398 // prepare sample structure
399 samp->ClearSpins();
400 samp->CreateSpins(NoSpins);
401 samp->SetSampleDims(d);
402
403
404
405 // Receive
406 MPI_Datatype MPI_SPINDATA = MPIspindata();
407 MPI_Recv(samp->GetSpinsData(),NoSpins,MPI_SPINDATA,0,SEND_SAMPLE,MPI_COMM_WORLD,&status);
408 int ilen; std::string hm (MPI_MAX_PROCESSOR_NAME, ' ');
409 MPI_Get_processor_name(&hm[0],&ilen);
410 //MODIF
411 //cout <<endl<< "hostname" << " -> slave " << setw(2) << MPI::COMM_WORLD.Get_rank() << ": received "
412 // << NoSpins << " spins for simulation ..." << endl << flush;
413 //pW->setTrajLoading(myRank,NoSpins);
414 //MODIF***
415/*
416 cout << endl << "SLAVE: " << setw(2) << MPI::COMM_WORLD.Get_rank()
417 << " , #Pools= " << samp->GetNoSpinCompartments() << " , SampleDims=" << samp->GetSampleDimsSize() << " , SampleProps= " << samp->GetNProps() << " , NextPackageSize=" << samp->GetSize()
418 << " , ExMatDim=" << samp->GetHelperSize() << endl << endl;
419// */
420 return true;
421
422}
423/*****************************************************************************/
424void mpi_send_paket_signal (Signal* pSig, const int CoilID) {
425
426 int tsize = (int) pSig->Repo()->Size();
427
428 if (World::instance()->m_myRank == 1)
429 MPI_Send(pSig->Repo()->Times(), (int) pSig->Repo()->Samples(), MPI_DOUBLE, 0, SIG_TP + (CoilID)*4,MPI_COMM_WORLD);
430
431 MPI_Send(pSig->Repo()->Data(), tsize, MPI_DOUBLE, 0, SIG_MX + (CoilID)*4,MPI_COMM_WORLD);
432
433
434}
435
436/*****************************************************************************/
437void mpi_recv_paket_signal(Signal *pSig, const int SlaveId, const int CoilID) {
438
439 int tsize = (int) pSig->Repo()->Size();
440 double* data = pSig->Repo()->Data();
441 std::vector<double> tmp (tsize);
442 MPI_Status status;
443
444 if (SlaveId == 1)
445 MPI_Recv(pSig->Repo()->Times(), (int) pSig->Repo()->Samples(), MPI_DOUBLE, SlaveId, SIG_TP + (CoilID)*4,MPI_COMM_WORLD,&status);
446
447 MPI_Recv(&tmp[0], tsize, MPI_DOUBLE, SlaveId, SIG_MX + (CoilID)*4,MPI_COMM_WORLD,&status);
448
449 for (int i = 0; i < tsize; i++)
450 data[i] += tmp[i];
451
452}
453
454#endif
Implementation of JEMRIS Declarations.
Implementation of JEMRIS Sample.
Implementation of JEMRIS Signal.
Coil configuration and sensitivities.
Definition CoilArray.h:40
unsigned int GetSize()
Get the number of channels.
Definition CoilArray.h:93
Coil * GetCoil(unsigned channel)
Get a particular coil.
Definition CoilArray.cpp:390
Signal * GetSignal()
Get the received signal of this coil.
Definition Coil.h:116
The Sample is the object to simulate. It contains the spins.
Definition Sample.h:301
double * GetResolution()
Get grid resolution.
Definition Sample.h:417
void CreateSpins(const size_t size)
create the spin structure
Definition Sample.cpp:88
vector< size_t > GetSampleDims() const
Definition Sample.h:378
void ClearSpins()
delete the spin structure
Definition Sample.cpp:80
void SetSampleDims(vector< size_t > d)
Definition Sample.h:385
int SpinsLeft()
Returns No spins which still needs to be calculated.
Definition Sample.cpp:582
const int GetSampleDimsSize() const
Dimensions.
Definition Sample.h:392
double * GetSpinsData()
Definition Sample.h:462
void DumpRestartInfo(CoilArray *RxCA)
Utility function for restart: dump information to restart jemris after crash.
Definition Sample.cpp:522
size_t GetSize() const
Definition Sample.cpp:320
void GetScatterVectors(int *sendcount, int *displ, int size)
utility function to send sample in parallel mode: initial samples are send via mpi_scatterv; get need...
Definition Sample.cpp:366
void GetNextPacket(int &noSpins, int &NextSpinToSend, int SlaveId)
utility function to send sample in parallel mode: get next spin packet to be sent....
Definition Sample.cpp:458
size_t GetNProps() const
Definition Sample.h:371
The signal store and IO.
Definition Signal.h:249
Repository * Repo()
Definition Signal.h:306
Singleton with information about the simulation of the current spin.
Definition World.h:51
void InitHelper(long size)
Initilize helper array.
Definition World.cpp:127
long TotalSpinNumber
Total number of spins.
Definition World.h:163
void setTrajLoading(int firstSpin, int paketSize)
Set trajectory parameters for MPI current sample paket.
Definition World.h:71
int GetNoOfSpinProps()
Get number of spin properties.
Definition World.h:113
double * Helper()
Reference to helper array.
Definition World.h:119
int m_myRank
MPI rank of this process. if m_myRank<0 process is serial jemris.
Definition World.h:188
static World * instance()
Get sole instance of the sequence tree.
Definition World.cpp:34
void SetNoOfSpinProps(int n)
Set number of spinproperties.
Definition World.cpp:109
bool mpi_recieve_sample_paket(Sample *samp, CoilArray *RxCA)
Definition mpi_Model.h:366
long Size()
Size of bulk data (i.e. number of elemments of m_data)
Definition Signal.h:71
double * Data()
Reference to data repository.
Definition Signal.h:83
const long Samples() const
Number of samples.
Definition Signal.h:107
double * Times()
Reference to time point repository.
Definition Signal.h:95

-- last change 03.01.2025 | Tony Stoecker | Imprint | Data Protection --