본문 바로가기
MultiThread

기초적인 Lock Free Stack 구현

by W00gie 2021. 12. 3.

앞서 공부했던 CAS(Compare And Swap) 을 기반으로 Lock Free Stack을 실습해보았다.

Lock Free는 커널세션의 높은 오버헤드를 가지는 mutex와 Lock을 이용하지않고 Atomic 연산을 통해 멀티스레드 환경에서 이용가능한 오브젝트를 구현하는 방법이다. Lock Free에 있어서 여러 구현 방법이 있지만 가장 기초적이고 직관적인 방법으로 만들어진 Lock Free Stack을 소개한다,

 


구현한 Lock Free Stack은 Push, Pop 두 가지 메인 함수를 기반으로 부수적인 헬퍼 함수들을 구현하였다. Push의 경우 크게 어려운 부분이 없으나.Pop 함수에서 멀티스레드 환경을 고려해 작업중인 타 스레드들을 고려해 할당되었던 메모리를 해제하는데 많은 고려가 필요하다.

 

template<typename T>
class LockFreeStack
{
	struct Node
	{
		Node(const T& value) : data(value), next(nullptr);
		{

		}

		T data;
		Node* next;

	};
public:

	void Push(const T& value)
	{
		Node* node = new Node(value);
		node->next = _head;

		while (_head.compare_exchange_weak(node->next, node) == false)
		{

		}
	}
    
	bool TryPop(T& value)
	{
		++_popCount;

		Node* oldHead = _head;

		while (oldHead && _head.compare_exchange_weak(oldHead, oldHead->next) == false)
		{

		}
		if (oldHead == nullptr)
		{
			--_popCount;
			return false;
		}

		value = oldHead->data;
		TryDelete(oldHead);

		return true;
	}


	//1)데이터분리
	//2)Count 체크
	//3)나 혼자면 삭제
	void TryDelete(Node* oldHead)
	{
		if (_popCount == 1)
		{
			//Trypop의 cas에서 혼자인것 보증

			//이왕 혼자인거, 삭제 예약된 다른 데이터들도 삭제해보자
			Node* node = _pendingList.exchange(nullptr);

			if (--_popCount == 0) //atomic으로 래핑되어서 ==으로 비교되는부분까지 원자성 보증됨
			{
				//끼어든 애가 없음 -> 삭제 진행
				//이제와서 끼어들어도 데이터는 분리해둔 상태
				DeleteNodes(node);
			}
			else
			{
				//누가 끼어들었으니 다시 갖다 놓기
				ChainPendingNodeList(node);
			}

			//내 데이터는 삭제 
			delete oldHead;
		}
		else
		{
			//누가 있으니 삭제 예약만.
			chainPendingNode(oldHead);
			--_popCount;
		}
	}


	//누가 끼어들었으니 다시 갖다 놓기
	void ChainPendingNodeList(Node* first, Node* last)
	{
		//다시 갖다놓을 목록을 만들어주는데. first는 리스트의 처음 last는 리스트이 마지막
		
		//추가해되는애들을 원래 있던애들(pending List) 와 연결.
		last->next = _pendingList;

		while (_pendingList.compare_exchange_weak(last->next, first) == false) //CAS 에서 while문 조건 false 하는것 습관하시킬것. false일 경우 뺑뺑이돌면서 대기한다.
		{

		}
	}

	// 오버로딩된 헬프 함수 
	void ChainPendingNodeList(Node* node)
	{
		Node* last = node;
		while (last->next)
			last = last->next;

		ChainPendingNodeList(node, last);
	}


	void ChainPendingNode(Node* node)
	{
		ChainPendingNodeList(node, node);
	}

	
	static void DeleteNodes(Node* node)
	{
		while (node)
		{
			node* next = node->next;
			delete node;
			node = next;
		}
	}

private:
	atomic<Node*> _head;

	atomic<uint32 >> _popCount = 0; // pop을 실행중인 스레드 개수
	atomic<Node*> _pendingList; // 삭제되어야할 노드들 ( 첫번째 노드)
};

 

Pop 함수의 전체적인 과정은 다음과 같다.

1) head 읽기
2) head->next 읽기

3) head  = head->next 를 통해 head 옮기기
4) data 추출해서 반환
5) 추출한 노드를 삭제  

 

처음 TryPop 함수에 스레드가 진입하였을때 _popCount를 증가시켜

현재 해당 함수에서 진입하여 작업중인 스레드의 수를 카운트한다. ( _popCount를 atomic 클래스로 랩핑되어 코드실행중 원자적인 카운트가 가능하다.)

 

5) 과정에서 할당된 메모리를 해제할 때 다른 스레드가 참조중인 메모리를 삭제하면 오류가 발생할 수 있기에

_popCount가 0일때만 메모리를 해제하고 이외에는 pendingList에 추가하여 삭제를 예약해놓는 방식이다.

 

CAS의 false 조건을 이용한 루프를 통해 다른 스레드들이 spin하며 대기하도록하여 독립된 환경에서 스레드가 작업할 수 있도록 구현하는것이 핵심이다.