Question:
Input: A vector of constant size elements (a ring buffer), plus an atomic number indicating the current position in that very vector.
const int MAXIMUM = 100;
QVector<Data> _buffer(MAXIMUM);
QAtomicInt _atomic_index;
Here is a method that allows you to multi-thread insert data into a vector:
void insert(const Data &data) {
_buffer[_atomic_index++] = data;
}
Here, the problem is obvious that when the vector size limit is reached, it is necessary to return to the zero index:
void insert(const Data &data) {
_atomic_index.testAndSetOrdered(MAXIMUM,0);
_buffer[_atomic_index++] = data;
}
This is not a solution, because if, for example, two or more threads simultaneously pass the test in the first row and the value of _atomic_index
is not far from the vector boundary, then going beyond it and the subsequent segfault will be actually guaranteed.
It turns out that in this situation an atomic operation is required, consisting of a check for equality to the maximum, plus index incrementation in it:
void insert(const Data &data) {
mutex.lock();
if(index == MAXIMUM) index = 0;
_buffer[index++] = data;
mutex.unlock();
}
But this option completely eliminates the profit for multi-threaded data insertion.
I tried to solve the problem myself, but I'm not completely sure of the correctness of the solution:
void insert(const Data &data) {
int index = _atomic_index++;
if(index >= MAXIMUM) {
const int new_index = index - MAXIMUM;
_atomic_index.testAndSetOrdered(index+1,new_index);
index = new_index;
}
_buffer[index++] = data;
}
The point is to use a local and individual for each thread variable containing the value of the current index:
int index = _atomic_index++;
It is this variable that we check for going beyond the size of the vector:
if(index >= MAXIMUM) {...}
And already inside this condition, _atomic_index
, in my opinion, should take the value of the most recent index increment operation. Am I right? Or, perhaps, there is another way to solve the designated problem?
Update
In order to focus only on the moment of checking the index for exceeding the vector size limit, we will assume by default that the number of writing threads in this particular example is significantly (many times) less than the number of elements in the vector and, thereby, exclude from attention to the possibility of overwriting not yet read (occupied) elements of the vector with newly inserted elements. For instance:
- recording streams: 10
- reading threads: 20
- vector length: 100
Reading streams keep up with the writing streams, which means that all newly filled cells will be read almost immediately. Or even radically: recorded cells can be overwritten at least immediately, since the data for reading is not important.
We are only interested in the moment of resetting the value of the counter of the current record index during multi-threaded insertion, when, for example, all 10 threads will simultaneously try to increase it by one and the current index will be equal to 98 out of 100 possible elements. In this case, correct processing should assign index 99 to one thread, while the remaining nine should be assigned from 0 to 8. In this case, the value of the current index (in the code it is _atomic_index
) should be equal to the last index assigned, i.e. also 8.
Update 2
The crossed out statement is an error. The current index should become equal to 9, i.e. one more than the last inserted, because in the line:
int index = _atomic_index++;
… post-increment is used.
Answer:
My answer is based on my own experience with java; I had practically no experience with c++, but the semantics should be the same.
I would like to parse the suggested example. Let's do a thought experiment with the following situation:
void insert(const Data &data) {
int index = _atomic_index++; // Thread B
if(index >= MAXIMUM) {
const int new_index = index - MAXIMUM;
_atomic_index.testAndSetOrdered(index+1,new_index); // Thread A
index = new_index;
}
_buffer[index++] = data;
}
Thread A managed to go through several expressions faster than Thread B, and at the moment when Thread A tries to reset the index to zero, Thread B increments it, so resetting the index to zero will fail, and (since the operation does not create obstacles for repeating it) this makes it possible to increase _atomic_index
– if the situation is constantly repeated. Once _atomic_index
reaches MAXIMUM * 2 - 1
, index
will be evaluated to MAXIMUM
, which will result in an attempt to write outside of the expected value range.
The first solution that comes to mind – to use the modulo operator instead of subtracting MAXIMUM
once – will reduce the likelihood of such a situation, but will not save you from it, because when an int overflows, an unexpected value will return to _atomic_index
.
One classic approach for this kind of situation is to manually implement spinlock based on optimistic locking. Next comes the pseudocode, i.e. I don't know the actual methods in the library I'm using, nor the c++ syntax:
void insert(const Data &data) {
// это все без проблем выносится в отдельный метод
int current;
int next;
do {
current = _atomic_index.get();
next = (current + 1) % MAXIMUM;
} while (!_atomic_index.compareAndSet(current, next))
_buffer[current] = data;
}
In this case, we can talk about the following invariant:
- Next value = current value + 1% MAXIMUM
- The next value can be set instead of the current value only if the current value is correct (= has not changed since reading)
The algorithm has exactly two paths:
- Set the next index value if it hasn't changed
- Start a new round if the index has changed between two operations
If we have competing threads, it would look something like this:
- Index: 1
- Maximum: 3
- T1: tries to set 2 instead of 1
- T2: tries to set 2 instead of 1
- T3: tries to set 2 instead of 1
(theatrical break, backstage magic)
- T1: success, set 2
- T2: fail, tries to set 0 instead of 2
- T3: fail, tries to set 0 instead of 2
(theatrical break, backstage magic)
- T1: worked
- T2: fail, tries to set 1 instead of 0
- T3: success, set to 0
(theatrical break, backstage magic)
- T1: worked
- T2: success, set 1
- T3: worked
What happens if the thread "fell asleep", and during the sleep time the counter went through a full cycle?
If I didn't mess up, no big deal. Let's reproduce the algorithm:
- Index: 1
- Maximum: 4
- T1: reads 1 and goes to sleep
- T2: reads 1 and updates to 2
- T2: reads 2 and updates to 3
- T2: reads 3 and updates to 0
- T2: reads 0 and updates to 1
- T2: reads 1
- T1: wakes up and refreshes 1 to 2
- T2: fails to update, enters a new circle
Is this approach safe? In this particular case, no.
This answer suggests a solution to the problem of correctly incrementing the counter, but not the whole algorithm.
- Selecting a new index and directly setting the value are two different operations, and the execution of the insert method is not guaranteed to be atomic. The OS has the right to suspend the thread immediately after the "reserve" of the index and wake it up after an arbitrary time, so data can be written to the same cell in reverse order :
- T1: reserved cell with index 1
- T1: fell asleep
- (some time of working with the algorithm, during which the index is reset to zero and goes to a new circle)
- T2: reserved cell with index 1
- T2: put data
- T1: woke up
- T1: put obsolete data
- In addition, the index is incremented until the data at the address appears – this means that the reader of the index can refer to the cell in which the old data is still located, considering them relevant
- In addition, the entire algorithm cannot be considered working until the position of the tail is taken into account, otherwise the tail may reach the head, in which case (!) MAXIMUM elements will be lost.
If you want to find a good implementation of such an algorithm, you should search Google for lock-free circular buffer or a similar phrase; To be honest, I can’t immediately think of implementations other than a linked list with length tracking (in which it will be very difficult to guarantee the correctness of the algorithm without pessimistic locking) and directly pessimistic locking. The latter may not be such a bad variant, and will probably perform even better than the given spinlock example in a highly threaded environment.