LAM/MPI logo

LAM/MPI General User's Mailing List Archives

  |   Home   |   Download   |   Documentation   |   FAQ   |   all just in this list

From: imran shaik (sk.imran_at_[hidden])
Date: 2006-05-25 07:55:48


Hi i have a strange problem.
  My processes are like this.
  
  Each process has a thread to recv messages(MPI non blocking recv). The thread loops infinitely.
  The main thread searches for a value in its array of known value. If not found it tries to get it from other processes.It sends req for the same using MPI send call.(non blocking MPI send).
  
  I am using 6 such processes.
  
  My problem is i get proper results some time, sometime it just hangs.
  
  Is it like if many messages are send to a process by others at a time, the receiver process cant handle? it misses something?
  
  Is it problems with threads? I use Posix threads.
  
  I am attaching the program...though other functions are not that important for others.
  
  
  Any idea, or suggestion regarding implementation of concurrent threads is welcome.
  
  
  
  Thanks in advance
  
  
  Regards,
  Imran
  
  
  #include<iostream>
  #include "mpi.h"
  #include<pthread.h>
  #include<signal.h>
  #include<unistd.h>
  #define CACHESIZE 10
  #define HOPCOUNT 5
  #define RESULTLIMIT 30
  #define MAXRESOURCES 30
  #define DISCARDTAG 9999
  using namespace std;
  using namespace MPI;
  int search(int);
  void* listen(void*);
  int get_best_neighbor(int);
  int getindex(int);
  void sig_handler(int);
  void add_cache(int,int);
  int check_cache(int);
  int me,nbrcnt,res_dist_size,reqcnt,fwdcnt,respcnt,cachecnt;
  int *nbrs,*expbase,*myint;
  int cache[CACHESIZE][2];
  MPI::Graphcomm gcomm;
  pthread_t tid;
  
  int main(int argc, char**argv)
  {
  int i,wsize,membershipkey;
  int result;
  const int index[6]={1,4,6,9,10,12};
  const int edges[12]={1,0,2,5,1,3,2,4,5,3,1,3};
  int nnodes;
  reqcnt=fwdcnt=respcnt=cachecnt=0;
  MPI::Init_thread(argc,argv,2);
  me=MPI::COMM_WORLD.Get_rank();
  wsize=MPI::COMM_WORLD.Get_size();
  nnodes=6;
  gcomm=MPI::COMM_WORLD.Create_graph(nnodes,index,edges,false);
  nbrcnt=gcomm.Get_neighbors_count(me);
  nbrs=new int[nbrcnt];
  expbase=new int[nbrcnt];
  gcomm.Get_neighbors(me,nbrcnt,nbrs);
  
  for(int i=0;i<nbrcnt;i++)
  {
  expbase[i]=0;
  }
  
  res_dist_size=MAXRESOURCES/wsize;
  
  myint=new int[res_dist_size];
  
      for(int i=res_dist_size*me;i<res_dist_size*(me+1);i++)
      {
      myint[i-(res_dist_size)*me]=i;
      }
      cout<<endl;
      
  
  gcomm.Barrier();
  
  if(pthread_create(&tid,0,listen,(void*)&me)<0)
      {
          cout<<"Thread cant be created"<<endl;
      }
  
  {
      int randnum;
      switch(me)
      {
      case 0:
      randnum=4;
      break;
      case 1:
      randnum=10;
      break;
      case 2:
      randnum=8;
      break;
      case 3:
      randnum=10;
      break;
      case 4:
      randnum=15;
      break;
      case 5:
      randnum=8;
      break;
      default :
      randnum=27;
      }
      int sendbuf[2];
      reqcnt++;
      cout<<"P="<<me<<" RN="<<randnum<<endl;
      int resrc_cnt=search(randnum);
          if(resrc_cnt>0)
          {
              
              respcnt++;
               cout<<"Yahoo! P="<<me<<" A="<<randnum<<" Respcnt="<<respcnt<<endl;
          }
          else
          {
              sendbuf[0]=randnum;
              sendbuf[1]=me;
              int bestneighbor=get_best_neighbor(me);
              add_cache(randnum,bestneighbor);
              MPI::Request request;
              MPI::Status status;
              request=gcomm.Isend(&sendbuf,2,MPI::INT,bestneighbor,HOPCOUNT);
              fwdcnt++;
               cout<<"P="<<me<<" NA="<<randnum<<" RF-->"<<bestneighbor<<endl;
              request.Wait(status);
          }
  }
  
  
  
  int ret;
  int *retval=&ret;
  pthread_join(tid,(void**)&retval);
  gcomm.Free();
  MPI::Finalize();
  
  
  }
  
  
  
  int search(int keyval) //returns the number of matches found
  {
  int resultcnt=0;
      for(int k=0;k<res_dist_size;k++)
      {
          if(keyval==myint[k])
          {
          //pos=k;
          resultcnt++;
          }
      }
  return resultcnt;
  }
  
  
  void * listen(void *t)
  {
  int recvbuf[2];
  MPI::Status status;
  MPI::Request request;
  static int exp_update_cnt=0;
  int recvcnt=0;
  int deepcntr=1;
  int itrcnt=0;
  while(1)
  {
  itrcnt++;
  request=gcomm.Irecv(&recvbuf,2,MPI::INT,MPI::ANY_SOURCE,MPI::ANY_TAG);
  recvcnt++;
  request.Wait(status);
  
  int source,tag;
  source=status.Get_source();
  tag=status.Get_tag();
  
  cout<<"P= "<<me<<"Recvd: Source= "<<source<<" tag= "<<tag<<endl;
  
  if(tag==DISCARDTAG)
  {
      if(nbrcnt==1)
      {
      //increment TTL and send to same neighbor
      MPI::Request request;
      MPI::Status status;
      request=gcomm.Isend(&recvbuf,2,MPI::INT,nbrs[0],(HOPCOUNT+deepcntr));
       cout<<"AP="<<me<<"NA="<<recvbuf[0]<<"RF->"<<nbrs[0]<<"TTL="<<HOPCOUNT+deepcntr<<endl;
      deepcntr++;
      request.Wait(status);
      }
      else //many other neighbors are there
      {
      int fwdnbr=check_cache(recvbuf[0]);
      int bestnextnode;
          if(fwdnbr>=0) //entry in cache not timed out.
          bestnextnode=get_best_neighbor(fwdnbr);
          else //entry in cache is not available
          bestnextnode=get_best_neighbor(me);
          
      MPI::Request request;
      MPI::Status status;
      request=gcomm.Isend(&recvbuf,2,MPI::INT,bestnextnode,HOPCOUNT);
       cout<<"AP="<<me<<"NA="<<recvbuf[0]<<"RF->"<<bestnextnode<<"TTL="<<HOPCOUNT<<endl;
      request.Wait(status);
  
      }
  }
  
  else if(tag>RESULTLIMIT) //results are returned.
      {
      if(deepcntr>1)
      deepcntr--;
      cout<<"Yahoo! FP="<<me<<" Node="<<source<<" has "<<tag-RESULTLIMIT<<" matches"<< " for "<<recvbuf[0]<<" Hopcount="<<recvbuf[1]<<endl;
      //check cache for details of forwarded request
      int fwdnbr=check_cache(recvbuf[0]);
          if(fwdnbr>=0)
          {
          int index=getindex(fwdnbr);
          respcnt++;
              if(index<0)
              cout<<"Neighbor index error!"<<endl;
              else
              {
              ++expbase[index];
               cout<<"P="<<me<<" expbase["<<index<<"]="<<expbase[index]<<endl;
              ++exp_update_cnt;
              }
          }
          else cout<<"P="<<me<<" Request timed out in cache"<<endl;
      }
  else if (tag>0&&tag<=HOPCOUNT)
      {
      tag=tag-1;
      int resrc_cnt;
      if(recvbuf[1]==me)
      resrc_cnt=0;
      resrc_cnt=search(recvbuf[0]);
      fwdcnt++;
      
          if(resrc_cnt>0)
          {
          int *a=(int*)t;
          int sendbuf[2];
          sendbuf[0]=recvbuf[0];
          sendbuf[1]=HOPCOUNT-tag; //for hop count calculation
          MPI::Request request;
          MPI::Status status;
          request=gcomm.Isend(&sendbuf,2,MPI::INT,recvbuf[1],RESULTLIMIT+resrc_cnt);
           cout<<"P="<<me<<" Response sent for "<<recvbuf[0]<<" to node "<<recvbuf[1]<<endl;
          request.Wait(status);
          }
          else
          {
          
          int bestneighbor=get_best_neighbor(source);
              //still tag value is positive then send.Else discard packet.
              if(tag>0)
              {
              MPI::Request request;
              MPI::Status status;
              request=gcomm.Isend(&recvbuf,2,MPI::INT,bestneighbor,tag);
               cout<<"RF-> P= "<<me<<" resrc= "<<recvbuf[0]<<" origin= "<<recvbuf[1]<<" Prev node= "<<source<<". "<<me<<" -> "<<bestneighbor<<endl;
              request.Wait(status);
              }
              else
              //send a notice to other matchmaker saying his result is discarded
              {
              MPI::Request request;
              MPI::Status status;
              request=gcomm.Isend(&recvbuf,2,MPI::INT,recvbuf[1],DISCARDTAG);
               cout<<"P="<<me<<"Request discarded"<<endl;//discarded request
              request.Wait(status);
              }
          
          }
      }
  
  else
      {
      MPI::Request request;
          MPI::Status status;
          request=gcomm.Isend(&recvbuf,2,MPI::INT,recvbuf[1],DISCARDTAG);
           cout<<"P="<<me<<" Invalid tag !! Request discarded :-("<<endl;
          request.Wait(status);
      
      }
  }
  
  
  }
  
  
  int get_best_neighbor(int source)
  {
  int max=0;
  int index=-1;
  if(nbrcnt>1)
  {
  
      for(int i=0;i<nbrcnt;i++) //search who has good expereince
      {
  
          if(expbase[i]>max &&(nbrs[i]!=source)) //take care that it is not forwarded back.
          {
          max=expbase[i];
          index=i;
          }
  
      }
  
      if(index>=0) //best neighbor found
      return nbrs[index];
  
      else //cant find who is best,randomly select one
      {
          int temp=me;
          int loopcntr=0;
          while(1)
          {
          loopcntr++;
          int newindex=((random()%(7+temp++))*17)%nbrcnt;
          if(nbrs[newindex]!=source)
          {
          return nbrs[newindex];
          }
          
          }
  
      }
  }
  
  return nbrs[0];
  
  }
  
  
  int getindex(int rank)
  {
      for(int i=0;i<nbrcnt;i++)
      {
          if(nbrs[i]==rank)
          return i;
      }
      cout<<" P= "<<rank<<" is not a neighbor of P= "<<me<<endl;
      return -1;
  }
  
  
  void sig_handler(int signal)
  {
  cout<<"Received signal "<<signal<<endl;
  cout<<"Performing cleanup..."<<endl;
  pthread_cancel(tid);
  MPI::Finalize();
  //system("lamclean");
  }
  
  void add_cache(int num,int node)
  {
  
  if(cachecnt>=CACHESIZE)
  cachecnt=0;
  cache[cachecnt][0]=num;
  cache[cachecnt][1]=node;
  cachecnt++;
  
  }
  
  int check_cache(int num)
  {
  
  for(int i=0;i<CACHESIZE;i++)
  {
  if(cache[i][0]==num)
  return (cache[i][1]);
  }
  return -1;
  }
  
  
__________________________________________________
Do You Yahoo!?
Tired of spam? Yahoo! Mail has the best spam protection around
http://mail.yahoo.com