--- zzzz-none-000/linux-3.10.107/Documentation/circular-buffers.txt 2017-06-27 09:49:32.000000000 +0000 +++ scorpion-7490-727/linux-3.10.107/Documentation/circular-buffers.txt 2021-02-04 17:41:59.000000000 +0000 @@ -160,6 +160,7 @@ spin_lock(&producer_lock); unsigned long head = buffer->head; + /* The spin_unlock() and next spin_lock() provide needed ordering. */ unsigned long tail = ACCESS_ONCE(buffer->tail); if (CIRC_SPACE(head, tail, buffer->size) >= 1) { @@ -168,9 +169,8 @@ produce_item(item); - smp_wmb(); /* commit the item before incrementing the head */ - - buffer->head = (head + 1) & (buffer->size - 1); + smp_store_release(buffer->head, + (head + 1) & (buffer->size - 1)); /* wake_up() will make sure that the head is committed before * waking anyone up */ @@ -183,9 +183,14 @@ before the head index makes it available to the consumer and then instructs the CPU that the revised head index must be written before the consumer is woken. -Note that wake_up() doesn't have to be the exact mechanism used, but whatever -is used must guarantee a (write) memory barrier between the update of the head -index and the change of state of the consumer, if a change of state occurs. +Note that wake_up() does not guarantee any sort of barrier unless something +is actually awakened. We therefore cannot rely on it for ordering. However, +there is always one element of the array left empty. Therefore, the +producer must produce two elements before it could possibly corrupt the +element currently being read by the consumer. Therefore, the unlock-lock +pair between consecutive invocations of the consumer provides the necessary +ordering between the read of the index indicating that the consumer has +vacated a given element and the write by the producer to that same element. THE CONSUMER @@ -195,21 +200,20 @@ spin_lock(&consumer_lock); - unsigned long head = ACCESS_ONCE(buffer->head); + /* Read index before reading contents at that index. */ + unsigned long head = smp_load_acquire(buffer->head); unsigned long tail = buffer->tail; if (CIRC_CNT(head, tail, buffer->size) >= 1) { - /* read index before reading contents at that index */ - smp_read_barrier_depends(); /* extract one item from the buffer */ struct item *item = buffer[tail]; consume_item(item); - smp_mb(); /* finish reading descriptor before incrementing tail */ - - buffer->tail = (tail + 1) & (buffer->size - 1); + /* Finish reading descriptor before incrementing tail. */ + smp_store_release(buffer->tail, + (tail + 1) & (buffer->size - 1)); } spin_unlock(&consumer_lock); @@ -218,12 +222,17 @@ the new item, and then it shall make sure the CPU has finished reading the item before it writes the new tail pointer, which will erase the item. - -Note the use of ACCESS_ONCE() in both algorithms to read the opposition index. -This prevents the compiler from discarding and reloading its cached value - -which some compilers will do across smp_read_barrier_depends(). This isn't -strictly needed if you can be sure that the opposition index will _only_ be -used the once. +Note the use of ACCESS_ONCE() and smp_load_acquire() to read the +opposition index. This prevents the compiler from discarding and +reloading its cached value - which some compilers will do across +smp_read_barrier_depends(). This isn't strictly needed if you can +be sure that the opposition index will _only_ be used the once. +The smp_load_acquire() additionally forces the CPU to order against +subsequent memory references. Similarly, smp_store_release() is used +in both algorithms to write the thread's index. This documents the +fact that we are writing to something that can be read concurrently, +prevents the compiler from tearing the store, and enforces ordering +against previous accesses. ===============