Hi i have a strange problem.
My processes are like this.
Each process has a thread to recv messages(MPI non blocking recv). The thread loops infinitely.
The main thread searches for a value in its array of known value. If not found it tries to get it from other processes.It sends req for the same using MPI send call.(non blocking MPI send).
I am using 6 such processes.
My problem is i get proper results some time, sometime it just hangs.
Is it like if many messages are send to a process by others at a time, the receiver process cant handle? it misses something?
Is it problems with threads? I use Posix threads.
I am attaching the program...though other functions are not that important for others.
Any idea, or suggestion regarding implementation of concurrent threads is welcome.
Thanks in advance
Regards,
Imran
#include<iostream>
#include "mpi.h"
#include<pthread.h>
#include<signal.h>
#include<unistd.h>
#define CACHESIZE 10
#define HOPCOUNT 5
#define RESULTLIMIT 30
#define MAXRESOURCES 30
#define DISCARDTAG 9999
using namespace std;
using namespace MPI;
int search(int);
void* listen(void*);
int get_best_neighbor(int);
int getindex(int);
void sig_handler(int);
void add_cache(int,int);
int check_cache(int);
int me,nbrcnt,res_dist_size,reqcnt,fwdcnt,respcnt,cachecnt;
int *nbrs,*expbase,*myint;
int cache[CACHESIZE][2];
MPI::Graphcomm gcomm;
pthread_t tid;
int main(int argc, char**argv)
{
int i,wsize,membershipkey;
int result;
const int index[6]={1,4,6,9,10,12};
const int edges[12]={1,0,2,5,1,3,2,4,5,3,1,3};
int nnodes;
reqcnt=fwdcnt=respcnt=cachecnt=0;
MPI::Init_thread(argc,argv,2);
me=MPI::COMM_WORLD.Get_rank();
wsize=MPI::COMM_WORLD.Get_size();
nnodes=6;
gcomm=MPI::COMM_WORLD.Create_graph(nnodes,index,edges,false);
nbrcnt=gcomm.Get_neighbors_count(me);
nbrs=new int[nbrcnt];
expbase=new int[nbrcnt];
gcomm.Get_neighbors(me,nbrcnt,nbrs);
for(int i=0;i<nbrcnt;i++)
{
expbase[i]=0;
}
res_dist_size=MAXRESOURCES/wsize;
myint=new int[res_dist_size];
for(int i=res_dist_size*me;i<res_dist_size*(me+1);i++)
{
myint[i-(res_dist_size)*me]=i;
}
cout<<endl;
gcomm.Barrier();
if(pthread_create(&tid,0,listen,(void*)&me)<0)
{
cout<<"Thread cant be created"<<endl;
}
{
int randnum;
switch(me)
{
case 0:
randnum=4;
break;
case 1:
randnum=10;
break;
case 2:
randnum=8;
break;
case 3:
randnum=10;
break;
case 4:
randnum=15;
break;
case 5:
randnum=8;
break;
default :
randnum=27;
}
int sendbuf[2];
reqcnt++;
cout<<"P="<<me<<" RN="<<randnum<<endl;
int resrc_cnt=search(randnum);
if(resrc_cnt>0)
{
respcnt++;
cout<<"Yahoo! P="<<me<<" A="<<randnum<<" Respcnt="<<respcnt<<endl;
}
else
{
sendbuf[0]=randnum;
sendbuf[1]=me;
int bestneighbor=get_best_neighbor(me);
add_cache(randnum,bestneighbor);
MPI::Request request;
MPI::Status status;
request=gcomm.Isend(&sendbuf,2,MPI::INT,bestneighbor,HOPCOUNT);
fwdcnt++;
cout<<"P="<<me<<" NA="<<randnum<<" RF-->"<<bestneighbor<<endl;
request.Wait(status);
}
}
int ret;
int *retval=&ret;
pthread_join(tid,(void**)&retval);
gcomm.Free();
MPI::Finalize();
}
int search(int keyval) //returns the number of matches found
{
int resultcnt=0;
for(int k=0;k<res_dist_size;k++)
{
if(keyval==myint[k])
{
//pos=k;
resultcnt++;
}
}
return resultcnt;
}
void * listen(void *t)
{
int recvbuf[2];
MPI::Status status;
MPI::Request request;
static int exp_update_cnt=0;
int recvcnt=0;
int deepcntr=1;
int itrcnt=0;
while(1)
{
itrcnt++;
request=gcomm.Irecv(&recvbuf,2,MPI::INT,MPI::ANY_SOURCE,MPI::ANY_TAG);
recvcnt++;
request.Wait(status);
int source,tag;
source=status.Get_source();
tag=status.Get_tag();
cout<<"P= "<<me<<"Recvd: Source= "<<source<<" tag= "<<tag<<endl;
if(tag==DISCARDTAG)
{
if(nbrcnt==1)
{
//increment TTL and send to same neighbor
MPI::Request request;
MPI::Status status;
request=gcomm.Isend(&recvbuf,2,MPI::INT,nbrs[0],(HOPCOUNT+deepcntr));
cout<<"AP="<<me<<"NA="<<recvbuf[0]<<"RF->"<<nbrs[0]<<"TTL="<<HOPCOUNT+deepcntr<<endl;
deepcntr++;
request.Wait(status);
}
else //many other neighbors are there
{
int fwdnbr=check_cache(recvbuf[0]);
int bestnextnode;
if(fwdnbr>=0) //entry in cache not timed out.
bestnextnode=get_best_neighbor(fwdnbr);
else //entry in cache is not available
bestnextnode=get_best_neighbor(me);
MPI::Request request;
MPI::Status status;
request=gcomm.Isend(&recvbuf,2,MPI::INT,bestnextnode,HOPCOUNT);
cout<<"AP="<<me<<"NA="<<recvbuf[0]<<"RF->"<<bestnextnode<<"TTL="<<HOPCOUNT<<endl;
request.Wait(status);
}
}
else if(tag>RESULTLIMIT) //results are returned.
{
if(deepcntr>1)
deepcntr--;
cout<<"Yahoo! FP="<<me<<" Node="<<source<<" has "<<tag-RESULTLIMIT<<" matches"<< " for "<<recvbuf[0]<<" Hopcount="<<recvbuf[1]<<endl;
//check cache for details of forwarded request
int fwdnbr=check_cache(recvbuf[0]);
if(fwdnbr>=0)
{
int index=getindex(fwdnbr);
respcnt++;
if(index<0)
cout<<"Neighbor index error!"<<endl;
else
{
++expbase[index];
cout<<"P="<<me<<" expbase["<<index<<"]="<<expbase[index]<<endl;
++exp_update_cnt;
}
}
else cout<<"P="<<me<<" Request timed out in cache"<<endl;
}
else if (tag>0&&tag<=HOPCOUNT)
{
tag=tag-1;
int resrc_cnt;
if(recvbuf[1]==me)
resrc_cnt=0;
resrc_cnt=search(recvbuf[0]);
fwdcnt++;
if(resrc_cnt>0)
{
int *a=(int*)t;
int sendbuf[2];
sendbuf[0]=recvbuf[0];
sendbuf[1]=HOPCOUNT-tag; //for hop count calculation
MPI::Request request;
MPI::Status status;
request=gcomm.Isend(&sendbuf,2,MPI::INT,recvbuf[1],RESULTLIMIT+resrc_cnt);
cout<<"P="<<me<<" Response sent for "<<recvbuf[0]<<" to node "<<recvbuf[1]<<endl;
request.Wait(status);
}
else
{
int bestneighbor=get_best_neighbor(source);
//still tag value is positive then send.Else discard packet.
if(tag>0)
{
MPI::Request request;
MPI::Status status;
request=gcomm.Isend(&recvbuf,2,MPI::INT,bestneighbor,tag);
cout<<"RF-> P= "<<me<<" resrc= "<<recvbuf[0]<<" origin= "<<recvbuf[1]<<" Prev node= "<<source<<". "<<me<<" -> "<<bestneighbor<<endl;
request.Wait(status);
}
else
//send a notice to other matchmaker saying his result is discarded
{
MPI::Request request;
MPI::Status status;
request=gcomm.Isend(&recvbuf,2,MPI::INT,recvbuf[1],DISCARDTAG);
cout<<"P="<<me<<"Request discarded"<<endl;//discarded request
request.Wait(status);
}
}
}
else
{
MPI::Request request;
MPI::Status status;
request=gcomm.Isend(&recvbuf,2,MPI::INT,recvbuf[1],DISCARDTAG);
cout<<"P="<<me<<" Invalid tag !! Request discarded :-("<<endl;
request.Wait(status);
}
}
}
int get_best_neighbor(int source)
{
int max=0;
int index=-1;
if(nbrcnt>1)
{
for(int i=0;i<nbrcnt;i++) //search who has good expereince
{
if(expbase[i]>max &&(nbrs[i]!=source)) //take care that it is not forwarded back.
{
max=expbase[i];
index=i;
}
}
if(index>=0) //best neighbor found
return nbrs[index];
else //cant find who is best,randomly select one
{
int temp=me;
int loopcntr=0;
while(1)
{
loopcntr++;
int newindex=((random()%(7+temp++))*17)%nbrcnt;
if(nbrs[newindex]!=source)
{
return nbrs[newindex];
}
}
}
}
return nbrs[0];
}
int getindex(int rank)
{
for(int i=0;i<nbrcnt;i++)
{
if(nbrs[i]==rank)
return i;
}
cout<<" P= "<<rank<<" is not a neighbor of P= "<<me<<endl;
return -1;
}
void sig_handler(int signal)
{
cout<<"Received signal "<<signal<<endl;
cout<<"Performing cleanup..."<<endl;
pthread_cancel(tid);
MPI::Finalize();
//system("lamclean");
}
void add_cache(int num,int node)
{
if(cachecnt>=CACHESIZE)
cachecnt=0;
cache[cachecnt][0]=num;
cache[cachecnt][1]=node;
cachecnt++;
}
int check_cache(int num)
{
for(int i=0;i<CACHESIZE;i++)
{
if(cache[i][0]==num)
return (cache[i][1]);
}
return -1;
}
__________________________________________________
Do You Yahoo!?
Tired of spam? Yahoo! Mail has the best spam protection around
http://mail.yahoo.com
|