Wednesday, August 28, 2013

Deadlock - an interesting picture & My implementation of Double-Lock Queue


A picture is worth a thousand words:-)

Circular dependency graph A->B->C->D->A

Question: Can a single thread deadlock?
Answer: Yes. Just try to acquire a lock twice.

The following is a concurrency double-lock queue I implemented in Linux.

The difference between my implementation with the famous Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms is my implementation use a circular double link list as the underlying data structure. The advantage is to use one dummy node to replace two variables - head and tail.

#ifndef Q_H_
#define Q_H_

#include
#include
#include
#include

typedef struct node{
int d;
node* prev;
node* next;
} qnode;

node* pdummy=0;
static pthread_mutex_t hlk=PTHREAD_MUTEX_INITIALIZER;//head lock
static pthread_mutex_t tlk=PTHREAD_MUTEX_INITIALIZER;//tail lock

#define Qbegin pdummy->next
#define Qend   pdummy
#define Qhead  pdummy->next
#define Qtail  pdummy->prev
#define Qempty (pdummy->next== pdummy && pdummy->prev==pdummy)

struct guard{
pthread_mutex_t* p;
guard(pthread_mutex_t* x):p(x){pthread_mutex_lock(p);}
~guard(){pthread_mutex_unlock(p);}
};

void initQ(){
pdummy=new node;
pdummy->d=0;
pdummy->next=pdummy;
pdummy->prev=pdummy;
}

void DestroyQ(){
if(Qempty){
//pdummy->next=pdummy->prev=NULL;
delete pdummy;
return;
}
node* tmp=pdummy;
node* tmp2=tmp;
do{
tmp2=tmp->next;
if(tmp!=pdummy)delete tmp;
tmp=tmp2;
}while(tmp!=pdummy);
//pdummy->next=pdummy->prev=NULL;
delete pdummy;
}

//add to the tail
void enqueue(int i){
node* newnode=new node;
newnode->d=i;

newnode->next=pdummy;
{
 guard g(&hlk);
 node* tmp=pdummy->prev;
 pdummy->prev=newnode;//

 newnode->prev=tmp;
 tmp->next=newnode;
}
}

//pop from the head - caller deallocate the memory
void dequeue(){
guard g(&tlk);
if(Qempty){
return;
}else{
node* r=pdummy->next;
node* tmp=pdummy->next->next;
delete r;
pdummy->next=tmp;
tmp->prev=pdummy;//
}
}

void* TFunc(void* param){
for(int i=0;i<2 i="" p=""> sleep(rand()%2);
enqueue(i);
}
}

void* TFunc2(void* param){
for(int i=0;i<2 i="" p=""> sleep(1);
//sleep(1+rand()%3);
dequeue();
}
}

bool GoodQ(){
if(Qempty){
return true;
}
node* tmp=pdummy;
node* tmp2=tmp;
do{
tmp2=tmp->next;
if(tmp2->prev!=tmp)
return false;
tmp=tmp2;
}while(tmp!=pdummy);
return true;
}

#endif /* Q_H_ */


No comments: