LAM/MPI logo

LAM/MPI General User's Mailing List Archives

  |   Home   |   Download   |   Documentation   |   FAQ   |   all just in this list

From: imran shaik (sk.imran_at_[hidden])
Date: 2006-05-26 00:41:13


Hi,
  Which other implementation supports MPI_THREAD_MULTIPLE.? Pls let me know, i need MPI calls in threads.
  
  Regards,
  Imran

"Jeff Squyres (jsquyres)" <jsquyres_at_[hidden]> wrote: LAM/MPI is not thread safe -- specifically, LAM does not support MPI_THREAD_MULTIPLE.
   
  Check out the FAQ in the section "Information about LAM itself", the questions "Is LAM thread safe? and "Is LAM multi-threaded?"
   
      http://www.lam-mpi.org/faq/

          
---------------------------------
    From: lam-bounces_at_[hidden] [mailto:lam-bounces_at_[hidden]] On Behalf Of imran shaik
Sent: Thursday, May 25, 2006 7:56 AM
To: lam_at_[hidden]
Subject: LAM: Threads or Recv?

    
Hi i have a strange problem.
My processes are like this.

Each process has a thread to recv messages(MPI non blocking recv). The thread loops infinitely.
The main thread searches for a value in its array of known value. If not found it tries to get it from other processes.It sends req for the same using MPI send call.(non blocking MPI send).

I am using 6 such processes.

My problem is i get proper results some time, sometime it just hangs.

Is it like if many messages are send to a process by others at a time, the receiver process cant handle? it misses something?

Is it problems with threads? I use Posix threads.

I am attaching the program...though other functions are not that important for others.

Any idea, or suggestion regarding implementation of concurrent threads is welcome.

Thanks in advance

Regards,
Imran

#include<iostream>
#include "mpi.h"
#include<pthread.h>
#include<signal.h>
#include<unistd.h>
#define CACHESIZE 10
#define HOPCOUNT 5
#define RESULTLIMIT 30
#define MAXRESOURCES 30
#define DISCARDTAG 9999
using namespace std;
using namespace MPI;
int search(int);
void* listen(void*);
int get_best_neighbor(int);
int getindex(int);
void sig_handler(int);
void add_cache(int,int);
int check_cache(int);
int me,nbrcnt,res_dist_size,reqcnt,fwdcnt,respcnt,cachecnt;
int *nbrs,*expbase,*myint;
int cache[CACHESIZE][2];
MPI::Graphcomm gcomm;
pthread_t tid;

int main(int argc, char**argv)
{
int i,wsize,membershipkey;
int result;
const int index[6]={1,4,6,9,10,12};
const int edges[12]={1,0,2,5,1,3,2,4,5,3,1,3};
int nnodes;
reqcnt=fwdcnt=respcnt=cachecnt=0;
MPI::Init_thread(argc,argv,2);
me=MPI::COMM_WORLD.Get_rank();
wsize=MPI::COMM_WORLD.Get_size();
nnodes=6;
gcomm=MPI::COMM_WORLD.Create_graph(nnodes,index,edges,false);
nbrcnt=gcomm.Get_neighbors_count(me);
nbrs=new int[nbrcnt];
expbase=new int[nbrcnt];
gcomm.Get_neighbors(me,nbrcnt,nbrs);

for(int i=0;i<nbrcnt;i++)
{
expbase[i]=0;
}

res_dist_size=MAXRESOURCES/wsize;

myint=new int[res_dist_size];

    for(int i=res_dist_size*me;i<res_dist_size*(me+1);i++)
        {
    myint[i-(res_dist_size)*me]=i;
        }
    cout<<endl;
        

gcomm.Barrier();

if(pthread_create(&tid,0,listen,(void*)&me)<0)
        {
        cout<<"Thread cant be created"<<endl;
    }

{
        int randnum;
    switch(me)
        {
    case 0:
        randnum=4;
    break;
    case 1:
    randnum=10;
        break;
    case 2:
        randnum=8;
    break;
    case 3:
    randnum=10;
        break;
    case 4:
        randnum=15;
    break;
    case 5:
    randnum=8;
        break;
    default :
        randnum=27;
    }
    int sendbuf[2];
    reqcnt++;
        cout<<"P="<<me<<" RN="<<randnum<<endl;
    int resrc_cnt=search(randnum);
            if(resrc_cnt>0)
            {
                
                respcnt++;
                cout<<"Yahoo! P="<<me<<" A="<<randnum<<" Respcnt="<<respcnt<<endl;
            }
            else
        {
                    sendbuf[0]=randnum;
                sendbuf[1]=me;
                int bestneighbor=get_best_neighbor(me);
                add_cache(randnum,bestneighbor);
                MPI::Request request;
                MPI::Status status;
                    request=gcomm.Isend(&sendbuf,2,MPI::INT,bestneighbor,HOPCOUNT);
                fwdcnt++;
                cout<<"P="<<me<<" NA="<<randnum<<" RF-->"<<bestneighbor<<endl;
                    request.Wait(status);
            }
}

int ret;
int *retval=&ret;
pthread_join(tid,(void**)&retval);
gcomm.Free();
MPI::Finalize();

}

int search(int keyval) //returns the number of matches found
{
int resultcnt=0;
    for(int k=0;k<res_dist_size;k++)
    {
            if(keyval==myint[k])
            {
            //pos=k;
            resultcnt++;
        }
        }
return resultcnt;
}

void * listen(void *t)
{
int recvbuf[2];
MPI::Status status;
MPI::Request request;
static int exp_update_cnt=0;
int recvcnt=0;
int deepcntr=1;
int itrcnt=0;
while(1)
{
itrcnt++;
request=gcomm.Irecv(&recvbuf,2,MPI::INT,MPI::ANY_SOURCE,MPI::ANY_TAG);
recvcnt++;
request.Wait(status);

int source,tag;
source=status.Get_source();
tag=status.Get_tag();

cout<<"P= "<<me<<"Recvd: Source= "<<source<<" tag= "<<tag<<endl;

if(tag==DISCARDTAG)
{
        if(nbrcnt==1)
    {
    //increment TTL and send to same neighbor
    MPI::Request request;
    MPI::Status status;
        request=gcomm.Isend(&recvbuf,2,MPI::INT,nbrs[0],(HOPCOUNT+deepcntr));
     cout<<"AP="<<me<<"NA="<<recvbuf[0]<<"RF->"<<nbrs[0]<<"TTL="<<HOPCOUNT+deepcntr<<endl;
    deepcntr++;
        request.Wait(status);
    }
    else //many other neighbors are there
    {
        int fwdnbr=check_cache(recvbuf[0]);
    int bestnextnode;
        if(fwdnbr>=0) //entry in cache not timed out.
            bestnextnode=get_best_neighbor(fwdnbr);
            else //entry in cache is not available
            bestnextnode=get_best_neighbor(me);
            
    MPI::Request request;
    MPI::Status status;
        request=gcomm.Isend(&recvbuf,2,MPI::INT,bestnextnode,HOPCOUNT);
     cout<<"AP="<<me<<"NA="<<recvbuf[0]<<"RF->"<<bestnextnode<<"TTL="<<HOPCOUNT<<endl;
        request.Wait(status);

    }
}

else if(tag>RESULTLIMIT) //results are returned.
        {
    if(deepcntr>1)
        deepcntr--;
    cout<<"Yahoo! FP="<<me<<" Node="<<source<<" has "<<tag-RESULTLIMIT<<" matches"<< " for "<<recvbuf[0]<<" Hopcount="<<recvbuf[1]<<endl;
    //check cache for details of forwarded request
    int fwdnbr=check_cache(recvbuf[0]);
            if(fwdnbr>=0)
            {
        int index=getindex(fwdnbr);
            respcnt++;
                if(index<0)
                cout<<"Neighbor index error!"<<endl;
                else
                {
                ++expbase[index];
                cout<<"P="<<me<<" expbase["<<index<<"]="<<expbase[index]<<endl;
                ++exp_update_cnt;
                }
            }
        else cout<<"P="<<me<<" Request timed out in cache"<<endl;
    }
else if (tag>0&&tag<=HOPCOUNT)
        {
    tag=tag-1;
    int resrc_cnt;
    if(recvbuf[1]==me)
        resrc_cnt=0;
        resrc_cnt=search(recvbuf[0]);
        fwdcnt++;
    
            if(resrc_cnt>0)
            {
        int *a=(int*)t;
        int sendbuf[2];
            sendbuf[0]=recvbuf[0];
            sendbuf[1]=HOPCOUNT-tag; //for hop count calculation
            MPI::Request request;
            MPI::Status status;
                request=gcomm.Isend(&sendbuf,2,MPI::INT,recvbuf[1],RESULTLIMIT+resrc_cnt);
            cout<<"P="<<me<<" Response sent for "<<recvbuf[0]<<" to node "<<recvbuf[1]<<endl;
            request.Wait(status);
            }
        else
            {
            
        int bestneighbor=get_best_neighbor(source);
                //still tag value is positive then send.Else discard packet.
                if(tag>0)
                {
                MPI::Request request;
                MPI::Status status;
                request=gcomm.Isend(&recvbuf,2,MPI::INT,bestneighbor,tag);
                cout<<"RF-> P= "<<me<<" resrc= "<<recvbuf[0]<<" origin= "<<recvbuf[1]<<" Prev node= "<<source<<". "<<me<<" -> "<<bestneighbor<<endl;
                request.Wait(status);
                }
                else
                //send a notice to other matchmaker saying his result is discarded
                {
                MPI::Request request;
                MPI::Status status;
                request=gcomm.Isend(&recvbuf,2,MPI::INT,recvbuf[1],DISCARDTAG);
                    cout<<"P="<<me<<"Request discarded"<<endl;//discarded request
                request.Wait(status);
                }
            
        }
        }

else
    {
    MPI::Request request;
        MPI::Status status;
            request=gcomm.Isend(&recvbuf,2,MPI::INT,recvbuf[1],DISCARDTAG);
            cout<<"P="<<me<<" Invalid tag !! Request discarded :-("<<endl;
            request.Wait(status);
        
    }
}

}

int get_best_neighbor(int source)
{
int max=0;
int index=-1;
if(nbrcnt>1)
{

    for(int i=0;i<nbrcnt;i++) //search who has good expereince
        {

        if(expbase[i]>max &&(nbrs[i]!=source)) //take care that it is not forwarded back.
        {
            max=expbase[i];
            index=i;
        }

        }

    if(index>=0) //best neighbor found
    return nbrs[index];

    else //cant find who is best,randomly select one
        {
        int temp=me;
            int loopcntr=0;
            while(1)
        {
            loopcntr++;
        int newindex=((random()%(7+temp++))*17)%nbrcnt;
            if(nbrs[newindex]!=source)
            {
        return nbrs[newindex];
            }
        
            }

    }
}

return nbrs[0];

}

int getindex(int rank)
{
        for(int i=0;i<nbrcnt;i++)
    {
            if(nbrs[i]==rank)
            return i;
    }
    cout<<" P= "<<rank<<" is not a neighbor of P= "<<me<<endl;
    return -1;
}

void sig_handler(int signal)
{
cout<<"Received signal "<<signal<<endl;
cout<<"Performing cleanup..."<<endl;
pthread_cancel(tid);
MPI::Finalize();
//system("lamclean");
}

void add_cache(int num,int node)
{

if(cachecnt>=CACHESIZE)
cachecnt=0;
cache[cachecnt][0]=num;
cache[cachecnt][1]=node;
cachecnt++;

}

int check_cache(int num)
{

for(int i=0;i<CACHESIZE;i++)
{
if(cache[i][0]==num)
return (cache[i][1]);
}
return -1;
}

    __________________________________________________
Do You Yahoo!?
Tired of spam? Yahoo! Mail has the best spam protection around
http://mail.yahoo.com
_______________________________________________
This list is archived at http://www.lam-mpi.org/MailArchives/lam/

                
---------------------------------
Do you Yahoo!?
 Everyone is raving about the all-new Yahoo! Mail Beta.