1 module jin.go.queue; 2 3 import std.container; 4 import std.exception; 5 6 import jin.go.mem; 7 import jin.go.cursor; 8 9 /// Wait-free one input one output queue. 10 align(Line) class Queue(Message) 11 { 12 /// Cursor to next free slot for message. 13 align(Line) Cursor provider; 14 15 /// Cursor to next not received message. 16 align(Line) Cursor consumer; 17 18 /// Ring buffer of transferring messages. 19 align(Line) Array!Message messages; 20 21 /// Buffer fits to one memory page by default. 22 this(size_t size = Page / Message.sizeof - 1) 23 { 24 enforce(size > 0, "Queue size must be greater then 0"); 25 26 // Ring buffer uses one additional element to defferentiate empty and full. 27 this.messages.length = size + 1; 28 } 29 30 /// Maximum count of transferring messages. 31 @property size_t size() 32 { 33 return this.messages.length - 1; 34 } 35 36 /// Count of provided messages. 37 /// Negative value - new messages will never provided. 38 ptrdiff_t pending() const 39 { 40 const len = this.messages.length; 41 const pending = (len - this.consumer.offset + this.provider.offset) % len; 42 43 if (pending > 0) 44 { 45 return pending; 46 } 47 48 return this.provider.finalized; 49 } 50 51 /// Count of messages to fulfill buffer. 52 /// Negative value - new messages will never provided. 53 ptrdiff_t available() const 54 { 55 const len = this.messages.length; 56 const available = (len - this.provider.offset + this.consumer.offset - 1) % len; 57 58 if (available > 0) 59 { 60 return available; 61 } 62 63 return this.consumer.finalized; 64 } 65 66 /// True when no more messages can never be provided. 67 bool ignore() 68 { 69 return this.available < 0; 70 } 71 72 /// Put message without locking. 73 /// `available` must be checked before. 74 void put(Value)(Value value) 75 { 76 assert(this.available > 0, "Queue is full"); 77 78 const offset = this.provider.offset; 79 const len = this.messages.length; 80 81 this.messages[offset] = value; 82 83 this.provider.offset = (offset + 1) % len; 84 } 85 86 /// Create and put message. 87 /// `available` must be checked before. 88 void put(Value, Args...)(Args args) 89 { 90 this.put(Value(args)); 91 } 92 93 /// True when no more messages can never be consumed. 94 auto empty() 95 { 96 return this.pending < 0; 97 } 98 99 /// Get current pending message. 100 /// `pending` must be checked before. 101 Message front() 102 { 103 assert(this.pending > 0, "Queue is empty"); 104 105 return this.messages[this.consumer.offset]; 106 } 107 108 /// Consume current pending message. 109 /// `pending` must be checked before. 110 void popFront() 111 { 112 assert(this.pending > 0, "Queue is empty"); 113 114 const offset = (this.consumer.offset + 1) % this.messages.length; 115 this.consumer.offset = offset; 116 } 117 } 118 119 /// Automatic fit buffer size to memory page size. 120 unittest 121 { 122 auto q1 = new Queue!int; 123 assert(q1.size == 1023); 124 125 auto q2 = new Queue!long; 126 assert(q2.size == 511); 127 } 128 129 /// Pending and available. 130 unittest 131 { 132 auto q = new Queue!int(3); 133 assert(q.pending == 0); 134 assert(q.available == 3); 135 136 q.put(7); 137 assert(q.pending == 1); 138 assert(q.available == 2); 139 140 q.put(77); 141 assert(q.pending == 2); 142 assert(q.available == 1); 143 144 q.put(777); 145 assert(q.pending == 3); 146 assert(q.available == 0); 147 148 assert(q.front == 7); 149 q.popFront; 150 assert(q.pending == 2); 151 assert(q.available == 1); 152 153 assert(q.front == 77); 154 q.popFront; 155 assert(q.pending == 1); 156 assert(q.available == 2); 157 158 assert(q.front == 777); 159 q.popFront; 160 assert(q.pending == 0); 161 assert(q.available == 3); 162 } 163 164 /// Consume from empty is forbidden. 165 unittest 166 { 167 import core.exception; 168 169 auto q = new Queue!int(1); 170 q.provider.finalize(); 171 172 q.front.assertThrown!AssertError; 173 q.popFront.assertThrown!AssertError; 174 } 175 176 /// Provide to full is forbidden. 177 unittest 178 { 179 import core.exception; 180 181 auto q = new Queue!int(1); 182 q.consumer.finalize(); 183 q.put(7); 184 185 q.put(77).assertThrown!AssertError; 186 } 187 188 /// Make struct inside put. 189 unittest 190 { 191 struct Foo 192 { 193 int a; 194 int b; 195 } 196 197 auto q = new Queue!Foo; 198 q.put!Foo(7, 13); 199 200 assert(q.front == Foo(7, 13)); 201 }