LAM/MPI logo

LAM/MPI General User's Mailing List Archives

  |   Home   |   Download   |   Documentation   |   FAQ   |   all just in this list

From: Jeff Squyres \(jsquyres\) (jsquyres_at_[hidden])
Date: 2006-05-25 08:03:46


LAM/MPI is not thread safe -- specifically, LAM does not support
MPI_THREAD_MULTIPLE.
 
Check out the FAQ in the section "Information about LAM itself", the
questions "Is LAM thread safe? and "Is LAM multi-threaded?"
 
    http://www.lam-mpi.org/faq/

________________________________

        From: lam-bounces_at_[hidden] [mailto:lam-bounces_at_[hidden]]
On Behalf Of imran shaik
        Sent: Thursday, May 25, 2006 7:56 AM
        To: lam_at_[hidden]
        Subject: LAM: Threads or Recv?
        
        
        Hi i have a strange problem.
        My processes are like this.
        
        Each process has a thread to recv messages(MPI non blocking
recv). The thread loops infinitely.
        The main thread searches for a value in its array of known
value. If not found it tries to get it from other processes.It sends req
for the same using MPI send call.(non blocking MPI send).
        
        I am using 6 such processes.
        
        My problem is i get proper results some time, sometime it just
hangs.
        
        Is it like if many messages are send to a process by others at a
time, the receiver process cant handle? it misses something?
        
        Is it problems with threads? I use Posix threads.
        
        I am attaching the program...though other functions are not that
important for others.
        
        
        Any idea, or suggestion regarding implementation of concurrent
threads is welcome.
        
        
        
        Thanks in advance
        
        
        Regards,
        Imran
        
        
        #include<iostream>
        #include "mpi.h"
        #include<pthread.h>
        #include<signal.h>
        #include<unistd.h>
        #define CACHESIZE 10
        #define HOPCOUNT 5
        #define RESULTLIMIT 30
        #define MAXRESOURCES 30
        #define DISCARDTAG 9999
        using namespace std;
        using namespace MPI;
        int search(int);
        void* listen(void*);
        int get_best_neighbor(int);
        int getindex(int);
        void sig_handler(int);
        void add_cache(int,int);
        int check_cache(int);
        int me,nbrcnt,res_dist_size,reqcnt,fwdcnt,respcnt,cachecnt;
        int *nbrs,*expbase,*myint;
        int cache[CACHESIZE][2];
        MPI::Graphcomm gcomm;
        pthread_t tid;
        
        int main(int argc, char**argv)
        {
        int i,wsize,membershipkey;
        int result;
        const int index[6]={1,4,6,9,10,12};
        const int edges[12]={1,0,2,5,1,3,2,4,5,3,1,3};
        int nnodes;
        reqcnt=fwdcnt=respcnt=cachecnt=0;
        MPI::Init_thread(argc,argv,2);
        me=MPI::COMM_WORLD.Get_rank();
        wsize=MPI::COMM_WORLD.Get_size();
        nnodes=6;
        gcomm=MPI::COMM_WORLD.Create_graph(nnodes,index,edges,false);
        nbrcnt=gcomm.Get_neighbors_count(me);
        nbrs=new int[nbrcnt];
        expbase=new int[nbrcnt];
        gcomm.Get_neighbors(me,nbrcnt,nbrs);
        
        for(int i=0;i<nbrcnt;i++)
        {
        expbase[i]=0;
        }
        
        res_dist_size=MAXRESOURCES/wsize;
        
        myint=new int[res_dist_size];
        
            for(int i=res_dist_size*me;i<res_dist_size*(me+1);i++)
            {
            myint[i-(res_dist_size)*me]=i;
            }
            cout<<endl;
            
        
        gcomm.Barrier();
        
        if(pthread_create(&tid,0,listen,(void*)&me)<0)
            {
                cout<<"Thread cant be created"<<endl;
            }
        
        {
            int randnum;
            switch(me)
            {
            case 0:
            randnum=4;
            break;
            case 1:
            randnum=10;
            break;
            case 2:
            randnum=8;
            break;
            case 3:
            randnum=10;
            break;
            case 4:
            randnum=15;
            break;
            case 5:
            randnum=8;
            break;
            default :
            randnum=27;
            }
            int sendbuf[2];
            reqcnt++;
            cout<<"P="<<me<<" RN="<<randnum<<endl;
            int resrc_cnt=search(randnum);
                if(resrc_cnt>0)
                {
                    
                    respcnt++;
                    cout<<"Yahoo! P="<<me<<" A="<<randnum<<"
Respcnt="<<respcnt<<endl;
                }
                else
                {
                    sendbuf[0]=randnum;
                    sendbuf[1]=me;
                    int bestneighbor=get_best_neighbor(me);
                    add_cache(randnum,bestneighbor);
                    MPI::Request request;
                    MPI::Status status;
        
request=gcomm.Isend(&sendbuf,2,MPI::INT,bestneighbor,HOPCOUNT);
                    fwdcnt++;
                    cout<<"P="<<me<<" NA="<<randnum<<"
RF-->"<<bestneighbor<<endl;
                    request.Wait(status);
                }
        }
        
        
        
        int ret;
        int *retval=&ret;
        pthread_join(tid,(void**)&retval);
        gcomm.Free();
        MPI::Finalize();
        
        
        }
        
        
        
        int search(int keyval) //returns the number of matches found
        {
        int resultcnt=0;
            for(int k=0;k<res_dist_size;k++)
            {
                if(keyval==myint[k])
                {
                //pos=k;
                resultcnt++;
                }
            }
        return resultcnt;
        }
        
        
        void * listen(void *t)
        {
        int recvbuf[2];
        MPI::Status status;
        MPI::Request request;
        static int exp_update_cnt=0;
        int recvcnt=0;
        int deepcntr=1;
        int itrcnt=0;
        while(1)
        {
        itrcnt++;
        
request=gcomm.Irecv(&recvbuf,2,MPI::INT,MPI::ANY_SOURCE,MPI::ANY_TAG);
        recvcnt++;
        request.Wait(status);
        
        int source,tag;
        source=status.Get_source();
        tag=status.Get_tag();
        
        cout<<"P= "<<me<<"Recvd: Source= "<<source<<" tag= "<<tag<<endl;
        
        if(tag==DISCARDTAG)
        {
            if(nbrcnt==1)
            {
            //increment TTL and send to same neighbor
            MPI::Request request;
            MPI::Status status;
        
request=gcomm.Isend(&recvbuf,2,MPI::INT,nbrs[0],(HOPCOUNT+deepcntr));
        
cout<<"AP="<<me<<"NA="<<recvbuf[0]<<"RF->"<<nbrs[0]<<"TTL="<<HOPCOUNT+de
epcntr<<endl;
            deepcntr++;
            request.Wait(status);
            }
            else //many other neighbors are there
            {
            int fwdnbr=check_cache(recvbuf[0]);
            int bestnextnode;
                if(fwdnbr>=0) //entry in cache not timed out.
                bestnextnode=get_best_neighbor(fwdnbr);
                else //entry in cache is not available
                bestnextnode=get_best_neighbor(me);
                
            MPI::Request request;
            MPI::Status status;
        
request=gcomm.Isend(&recvbuf,2,MPI::INT,bestnextnode,HOPCOUNT);
        
cout<<"AP="<<me<<"NA="<<recvbuf[0]<<"RF->"<<bestnextnode<<"TTL="<<HOPCOU
NT<<endl;
            request.Wait(status);
        
            }
        }
        
        else if(tag>RESULTLIMIT) //results are returned.
            {
            if(deepcntr>1)
            deepcntr--;
            cout<<"Yahoo! FP="<<me<<" Node="<<source<<" has
"<<tag-RESULTLIMIT<<" matches"<< " for "<<recvbuf[0]<<"
Hopcount="<<recvbuf[1]<<endl;
            //check cache for details of forwarded request
            int fwdnbr=check_cache(recvbuf[0]);
                if(fwdnbr>=0)
                {
                int index=getindex(fwdnbr);
                respcnt++;
                    if(index<0)
                    cout<<"Neighbor index error!"<<endl;
                    else
                    {
                    ++expbase[index];
                    cout<<"P="<<me<<"
expbase["<<index<<"]="<<expbase[index]<<endl;
                    ++exp_update_cnt;
                    }
                }
                else cout<<"P="<<me<<" Request timed out in
cache"<<endl;
            }
        else if (tag>0&&tag<=HOPCOUNT)
            {
            tag=tag-1;
            int resrc_cnt;
            if(recvbuf[1]==me)
            resrc_cnt=0;
            resrc_cnt=search(recvbuf[0]);
            fwdcnt++;
            
                if(resrc_cnt>0)
                {
                int *a=(int*)t;
                int sendbuf[2];
                sendbuf[0]=recvbuf[0];
                sendbuf[1]=HOPCOUNT-tag; //for hop count calculation
                MPI::Request request;
                MPI::Status status;
        
request=gcomm.Isend(&sendbuf,2,MPI::INT,recvbuf[1],RESULTLIMIT+resrc_cnt
);
                cout<<"P="<<me<<" Response sent for "<<recvbuf[0]<<" to
node "<<recvbuf[1]<<endl;
                request.Wait(status);
                }
                else
                {
                
                int bestneighbor=get_best_neighbor(source);
                    //still tag value is positive then send.Else discard
packet.
                    if(tag>0)
                    {
                    MPI::Request request;
                    MPI::Status status;
        
request=gcomm.Isend(&recvbuf,2,MPI::INT,bestneighbor,tag);
                    cout<<"RF-> P= "<<me<<" resrc= "<<recvbuf[0]<<"
origin= "<<recvbuf[1]<<" Prev node= "<<source<<". "<<me<<" ->
"<<bestneighbor<<endl;
                    request.Wait(status);
                    }
                    else
                    //send a notice to other matchmaker saying his
result is discarded
                    {
                    MPI::Request request;
                    MPI::Status status;
        
request=gcomm.Isend(&recvbuf,2,MPI::INT,recvbuf[1],DISCARDTAG);
                    cout<<"P="<<me<<"Request
discarded"<<endl;//discarded request
                    request.Wait(status);
                    }
                
                }
            }
        
        else
            {
            MPI::Request request;
                MPI::Status status;
        
request=gcomm.Isend(&recvbuf,2,MPI::INT,recvbuf[1],DISCARDTAG);
                cout<<"P="<<me<<" Invalid tag !! Request discarded
:-("<<endl;
                request.Wait(status);
            
            }
        }
        
        
        }
        
        
        int get_best_neighbor(int source)
        {
        int max=0;
        int index=-1;
        if(nbrcnt>1)
        {
        
            for(int i=0;i<nbrcnt;i++) //search who has good expereince
            {
        
                if(expbase[i]>max &&(nbrs[i]!=source)) //take care that
it is not forwarded back.
                {
                max=expbase[i];
                index=i;
                }
        
            }
        
            if(index>=0) //best neighbor found
            return nbrs[index];
        
            else //cant find who is best,randomly select one
            {
                int temp=me;
                int loopcntr=0;
                while(1)
                {
                loopcntr++;
                int newindex=((random()%(7+temp++))*17)%nbrcnt;
                if(nbrs[newindex]!=source)
                {
                return nbrs[newindex];
                }
                
                }
        
            }
        }
        
        return nbrs[0];
        
        }
        
        
        int getindex(int rank)
        {
            for(int i=0;i<nbrcnt;i++)
            {
                if(nbrs[i]==rank)
                return i;
            }
            cout<<" P= "<<rank<<" is not a neighbor of P= "<<me<<endl;
            return -1;
        }
        
        
        void sig_handler(int signal)
        {
        cout<<"Received signal "<<signal<<endl;
        cout<<"Performing cleanup..."<<endl;
        pthread_cancel(tid);
        MPI::Finalize();
        //system("lamclean");
        }
        
        void add_cache(int num,int node)
        {
        
        if(cachecnt>=CACHESIZE)
        cachecnt=0;
        cache[cachecnt][0]=num;
        cache[cachecnt][1]=node;
        cachecnt++;
        
        }
        
        int check_cache(int num)
        {
        
        for(int i=0;i<CACHESIZE;i++)
        {
        if(cache[i][0]==num)
        return (cache[i][1]);
        }
        return -1;
        }
        
        

        __________________________________________________
        Do You Yahoo!?
        Tired of spam? Yahoo! Mail has the best spam protection around
        http://mail.yahoo.com