컨텐츠를 불러오는 중...
Web Worker
와 Node.js의 Worker Threads
가 등장하면서 JavaScript에서도 또한 동기화 문제, race condition
이 발생할 수 있게 되었고 이를 처리하기 위한 개념이 등장했습니다.Event Loop
를 사용 했습니다. 이로인해 Promise, callback, async/await을 통해 가능한 시스템 커널에 작업을 오프로딩함으로써 Node.js의 비동기 특성과 non-blocking I/O
에 중요한 역할을 수행할 수 있게 되었습니다.libuv
라이브러리를 통해 내부적으로 이미 I/O 작업을 위한 스레드 풀을 관리하므로, 파일 I/O를 위해 별도로 worker threads를 사용하는 것은 중복된 오버헤드를 발생시킬 수 있기 때문에 Worker Thread에서는 I/O 작업을 수행하지 않는 것이 좋습니다.C++
코드를 활용하여 이 문제를 자세하게 다루어 보겠습니다.sum
을 증가 시키는 함수이지만 위의 로직을 동작 시키면 결과는 저희가 기대한 결과와 굉장히 다른 결과가 출력 되는 것을 확인할 수 있습니다.sum
변수가 바로 그런 공유 자원입니다.sum++
명령을 실행합니다. 소비자 스레드가 sum--
명령을 실행합니다. 이 작업은 실제로 다음과 같은 세 단계로 이루어져 있습니다.sum
값을 읽기9
가 되게 되는 문제가 발생합니다. 이러한 상황은 공유 자원에 대한 접근을 모두 공정하게 갖고 있기 때문에 값의 업데이트에 대한 타이밍이 겹쳐서 발생하게 되는 경쟁 상태, (race condition)
문제로 인해 Dead Lock, 교착 상태가 발생하게 됩니다.C++
와 JavaScript
에서 작성한 로직의 차이는 없는데 결과는 어째서 큰 차이가 발생 했을 까요?문맥 교환
이 발생하지 않기 때문에 공용 변수인sum
변수를 순서대로 접근하여 값을 변경 시키기 때문에 C++
와 같은 Dead Lock, 교착 상태 문제가 발생하지 않았습니다.race condition
이 발생하게끔 코드를 작성 해보겠습니다.Web Worker
와 Node.js의 Worker
는 메인 스레드와 분리된 독립적인 실행 환경을 제공하기 위해 파일 또는 파일과 유사한 URL을 기반으로 동작합니다. 각 워커들은 파일과 URL을 기반으로 동작함으로 메인 스레드와 별도의 전역 컨텍스트와 이벤트 루프 인스턴스를 가지며, 이는 실행 중인 코드의 상태나 변수들이 서로 간섭하지 않도록 보장합니다.Shared Array
를 사용하여 공용 전역 변수를 사용하고 있고 이 작업은 교착 상태를 만들고 이로 인해 위와 같은 결과가 만들어지게 됩니다.Atomics
객체를 활용하여 load
후 store
를 수행하여 공용 변수의 값을 변경하는 작업을 수행합니다. 이러한 작업을 앞 서 작업한 C++
와 동일하게 수행할 경우 동일한 문제가 발생합니다.C++
에서 동기화 문제를 해결하는 방법은 아래와 같습니다.lock
, unlock
메서드를 활용하여 변경 시킴으로써 임계 구역에 접근할 수 있는 순서를 통제하여 문제를 해결하는 방법입니다.lock
으로 인해서 변경된 제어 값을 변경할 수 없으므로 생길 수 있는 문제를 해결하기 위하여 이진 값을 통해 자원의 접근을 제어하는 것이 아닌 임계 구역에 진입할 수 있는 프로세스의 개수(사용 가능한 공유 자원의 개수)를 나타내는 전역 변수 S 와 P 연산, 혹은 wait 을 통해 증가 시키고 대기중인 작업을 수행할 수 있게 신호를 주는 V 연산, 혹은 signal 을 사용하여 문제를 해결합니다.Fast Userspace muTEX
, Futex
를 이용하여 C++에서 문제를 해결하는 방법에 대해서 다루어 보려고 합니다.futex
는 사용자 공간과 커널 공간의 장점을 결합한 하이브리드 동기화 메커니즘입니다. 전동적인 mutex와는 달리 futex는 다음과 같은 핵심 원칙에 기반합니다.SharedBuffer
는 C++의 condition_variable
를 사용하여 스레드간의 통신을 통해서 현재 버퍼의 조건에 따라 다른 스레드로 작업을 기다려야 하는지, 작업을 수행해도 되는지 신호를 전송합니다.unidque_lock
을 이용하여 {}
구역 내에서 수행하는 작업들이 자동으로 unlock
을 하게 mutex를 사용하고 있습니다. 대부분의 현대적인 C++는 내부적으로 앞서 이야기 했던 조건하에 작동하고 있습니다.condition_variable
를 이용하여 스레드간 조건을 확인 하고 사용자 공간에서 원자적 연산을 통해 정말 필요한 경우에 시스템 콜이 발생하기 때문에 가짜 깨움(spurious wakeup) 없이 작업을 처리할 수 있기 때문에 효율적으로 작업을 수행할 수 있습니다. 전체 코드는 아래와 같습니다.counter[0]
: 실제 카운터 값 (예: 현재 공유 자원의 값)counter[1]
: 생산된 항목의 총 개수 (생산자가 증가시킴)counter[2]
: 소비된 항목의 총 개수 (소비자가 증가시킴)wait
, notify
와 역할이 동일합니다. 다른 스레드의 통신을 대기하고 사용이 모두 종료 되었을 경우 다른 스레드에 알림을 보냄으로써 깨우는 역할을 수행합니다.Atomics.wait
을 이용하여 2번 인덱스에서 작업이 모두 수행될 때 까지 기다립니다.Atmoics.wait
을 1번 인덱스로 대상으로 하여 notify()
를 통해 작업 수행이 가능하기 이전까지 대기합니다.Atomics
객체를 이용하여 작업을 수행할 경우 앞서 겪었던 경쟁 상태 문제를 해결하여 결과 값이 기대한대로 나온 것을 알 수 있습니다. 이 작업을 수행한 전체 코드는 아래와 같습니다.Web Worker
, Worker
를 사용하여 작업의 효율성을 높이고자 하였습니다.Futex
로 대체되어서 사용되고 있고 JavaScript 또한 동일하지는 않지만 Futex의 많은 부분을 차용하여 Atomic
객체가 만들어졌고 동기화 문제를 효율적으로 작업을 수행할 수 있게 되었음을 알 수 있었습니다.초기 합계: 10
producer, consumer 스레드 실행 이후 합계: -38292
프로세스A 프로세스B 현재 sum 현재 r1 현재 r2
-------------------------------------------------------------
r1 = sum | 10 10
r1 = r1 + 1 | 10 11
문맥교환 | 10 11
r2 = sum | 10 11 10
r2 = r2 -1 | 10 11 9
문맥교환 | 10 11 9
sum = r1 | 11 11 9
문맥 교환 | 11 11 9
sum = r2 | 9 11 9
초기 합계: 10
producer, consumer 스레드 실행 이후 합계: 10
순서: 1
실행 내용: let sum = 10;
변수 상태 (sum): sum = 10
순서: 2
실행 내용: main() 함수 호출
변수 상태 (sum): sum = 10
순서: 3
실행 내용: console.log("초기 합계: ", sum); → 초기값 10 출력
변수 상태 (sum): sum = 10
순서: 4
실행 내용: producer() 실행
- for 루프: 반복마다 sum += 1 수행
- 최종적으로 1,000,000번 반복하여 10 + 1,000,000 = 1,000,010
변수 상태 (sum): sum = 1,000,010
순서: 5
실행 내용: consumer() 실행
- for 루프: 반복마다 sum -= 1 수행
- 최종적으로 1,000,010 - 1,000,000 = 10
변수 상태 (sum): sum = 10
순서: 6
실행 내용: console.log("producer, consumer 스레드 실행 이후 합계: ", sum); → 최종값 10 출력
변수 상태 (sum): sum = 10
초기 합계: 10
Consumer 작업 완료
Producer 작업 완료
producer, consumer 스레드 실행 이후 합계: -844
예상 합계(10)와 실제 합계의 차이: -854
Atomic.wat(typedArray, index, value);
Atomic.nofity(typedArray, index, count);
소비: 카운터 = 11, 소비 = 999
소비: 카운터 = 10, 소비 = 1000
Consumer 작업 완료
producer, consumer 스레드 실행 이후 합계: 10
예상 합계(10)와 실제 합계의 차이: 0
#include <iostream>
#include <queue>
#include <thread>
void produce();
void consume();
int sum = 10;
void produce() {
for(int i = 0; i < 100000; i++) {
sum++;
}
}
void consume() {
for(int i = 0; i < 100000; i++) {
sum--;
}
}
int main() {
std::cout << "초기 합계: " << sum << std::endl;
std::thread producer(produce);
std::thread consumer(consume);
producer.join();
consumer.join();
std::cout << "producer, consumer 스레드 실행 이후 합계: " << sum << std::endl;
return 0;
}
let sum = 10;
function producer() {
for (let i = 0; i < 1000000; i++) {
sum += 1;
}
}
function consumer() {
for (let i = 0; i < 1000000; i++) {
sum -= 1;
}
}
function main() {
console.log("초기 합계: ", sum);
producer();
consumer();
console.log("producer, consumer 스레드 실행 이후 합계: ", sum);
}
main();
//worker.js
import { Worker } from "node:worker_threads";
import path from "node:path";
const INIT_VALUE = 10;
console.log("초기 합계: ", INIT_VALUE);
const sharedBuffer = new SharedArrayBuffer(4);
const counter = new Int32Array(sharedBuffer);
counter[0] = INIT_VALUE;
const producer = new Worker(path.resolve(__dirname, "producer.js"), {
workerData: { sharedBuffer },
});
const consumer = new Worker(path.resolve(__dirname, "consumer.js"), {
workerData: { sharedBuffer },
});
let completedWorkers = 0;
function checkCompletion() {
completedWorkers++;
if (completedWorkers === 2) {
console.log("producer, consumer 스레드 실행 이후 합계: ", counter[0]);
console.log("예상 합계(10)와 실제 합계의 차이:", counter[0] - 10);
}
}
producer.on("message", (message) => {
console.log(message);
checkCompletion();
});
consumer.on("message", (message) => {
console.log(message);
checkCompletion();
});
import { parentPort, workerData } from "worker_threads";
const sharedBuffer = workerData.sharedBuffer;
const counter = new Int32Array(sharedBuffer);
function consumer() {
for (let i = 0; i < 1000; i++) {
const currentValue = Atomics.load(counter, 0);
const start = Date.now();
while (Date.now() - start < 1) {}
Atomics.store(counter, 0, currentValue - 1);
}
}
consumer();
parentPort.postMessage("Consumer 작업 완료");
struct SharedBuffer
{
std::queue<int> buffer;
std::mutex mutex;
std::condition_variable not_full;
std::condition_variable not_empty;
const int capacity = 10;
int counter = 0;
};
std::unique_lock<std::mutex> lock(buffer.mutex);
buffer.not_full.wait(lock, [&buffer]() {
return buffer.buffer.size() < buffer.capacity;
});
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
struct SharedBuffer
{
std::queue<int> buffer;
std::mutex mutex;
std::condition_variable not_full;
std::condition_variable not_empty;
const int capacity = 10;
int counter = 0;
};
void producer(SharedBuffer& buffer, int items) {
for (int i = 0; i < items; i++) {
// 임계 영역 진입
std::unique_lock<std::mutex> lock(buffer.mutex);
// 버퍼가 가득 찼으면 대기
buffer.not_full.wait(lock, [&buffer]() {
return buffer.buffer.size() < buffer.capacity;
});
// 버퍼에 아이템 추가 및 카운터 증가
buffer.buffer.push(i);
buffer.counter++;
std::cout << "생산: 아이템 " << i << ", 카운터 = " << buffer.counter << std::endl;
// 소비자에게 버퍼가 비어있지 않음을 알림
buffer.not_empty.notify_one();
// 임계 영역 나가기 (lock 소멸자에서 자동으로 발생)
}
}
void consumer(SharedBuffer& buffer, int items) {
for (int i = 0; i < items; i++) {
// 임계 영역 진입
std::unique_lock<std::mutex> lock(buffer.mutex);
// 버퍼가 비었으면 대기
buffer.not_empty.wait(lock, [&buffer]() {
return !buffer.buffer.empty();
});
// 버퍼에서 아이템 제거 및 카운터 감소
int item = buffer.buffer.front();
buffer.buffer.pop();
buffer.counter--;
std::cout << "소비: 아이템 " << item << ", 카운터 = " << buffer.counter << std::endl;
// 생산자에게 버퍼가 가득 차지 않았음을 알림
buffer.not_full.notify_one();
// 임계 영역 나가기 (lock 소멸자에서 자동으로 발생)
}
}
int main() {
SharedBuffer buffer;
std::thread producerThread(producer, std::ref(buffer), 100);
std::thread consumerThread(consumer, std::ref(buffer), 100);
producerThread.join();
consumerThread.join();
std::cout << "최종 카운터 값: " << buffer.counter << std::endl;
return 0;
}
//worker.js
const sharedBuffer = new SharedArrayBuffer(4 * 3);
const counter = new Int32Array(sharedBuffer);
// 초기화: 카운터 = 0, 생산된 아이템 = 0, 소비된 아이템 = 0
counter[0] = 0;
counter[1] = 0;
counter[2] = 0;
// producer.js
while (Atomics.load(counter, 1) - Atomics.load(counter, 2) >= MAX_BUFFER_SIZE) {
Atomics.wait(counter, 2, Atomics.load(counter, 1));
}
// consumer.js
while (Atomics.load(counter, 1) <= Atomics.load(counter, 2)) {
Atomics.wait(counter, 1, Atomics.load(counter, 1));
}
//producer.js
import { parentPort, workerData } from "worker_threads";
const sharedBuffer = workerData.sharedBuffer;
const counter = new Int32Array(sharedBuffer);
const MAX_BUFFER_SIZE = 10;
function producer() {
for (let i = 0; i < 1000; i++) {
while (
Atomics.load(counter, 1) - Atomics.load(counter, 2) >=
MAX_BUFFER_SIZE
) {
Atomics.wait(counter, 2, Atomics.load(counter, 1));
}
const start = Date.now();
while (Date.now() - start < 1) {}
Atomics.add(counter, 0, 1);
const producedItems = Atomics.add(counter, 1, 1) + 1;
console.log(
`생산: 카운터 = ${Atomics.load(counter, 0)}, 생산 = ${producedItems}`
);
Atomics.notify(counter, 1, 1);
}
}
producer();
parentPort.postMessage("Producer 작업 완료");
// consumer.js
import { parentPort, workerData } from "worker_threads";
const sharedBuffer = workerData.sharedBuffer;
const counter = new Int32Array(sharedBuffer);
function consumer() {
for (let i = 0; i < 1000; i++) {
while (Atomics.load(counter, 1) <= Atomics.load(counter, 2)) {
Atomics.wait(counter, 1, Atomics.load(counter, 1));
}
const start = Date.now();
while (Date.now() - start < 1) {}
Atomics.sub(counter, 0, 1);
const consumedItems = Atomics.add(counter, 2, 1) + 1;
console.log(
`소비: 카운터 = ${Atomics.load(counter, 0)}, 소비 = ${consumedItems}`
);
Atomics.notify(counter, 2, 1);
}
}
consumer();
parentPort.postMessage("Consumer 작업 완료");