1 module jin.go.queue;
2 
3 import std.container;
4 import std.exception;
5 
6 import jin.go.mem;
7 import jin.go.cursor;
8 
9 /// Wait-free one input one output queue.
10 align(Line) class Queue(Message)
11 {
12 	/// Cursor to next free slot for message.
13 	align(Line) Cursor provider;
14 
15 	/// Cursor to next not received message.
16 	align(Line) Cursor consumer;
17 
18 	/// Ring buffer of transferring messages.
19 	align(Line) Array!Message messages;
20 
21 	/// Buffer fits to one memory page by default.
22 	this(size_t size = Page / Message.sizeof - 1)
23 	{
24 		enforce(size > 0, "Queue size must be greater then 0");
25 
26 		// Ring buffer uses one additional element to defferentiate empty and full.
27 		this.messages.length = size + 1;
28 	}
29 
30 	/// Maximum count of transferring messages.
31 	@property size_t size()
32 	{
33 		return this.messages.length - 1;
34 	}
35 
36 	/// Count of provided messages.
37 	/// Negative value - new messages will never provided.
38 	ptrdiff_t pending() const
39 	{
40 		const len = this.messages.length;
41 		const pending = (len - this.consumer.offset + this.provider.offset) % len;
42 
43 		if (pending > 0)
44 		{
45 			return pending;
46 		}
47 
48 		return this.provider.finalized;
49 	}
50 
51 	/// Count of messages to fulfill buffer.
52 	/// Negative value - new messages will never provided.
53 	ptrdiff_t available() const
54 	{
55 		const len = this.messages.length;
56 		const available = (len - this.provider.offset + this.consumer.offset - 1) % len;
57 
58 		if (available > 0)
59 		{
60 			return available;
61 		}
62 
63 		return this.consumer.finalized;
64 	}
65 
66 	/// True when no more messages can never be provided.
67 	bool ignore()
68 	{
69 		return this.available < 0;
70 	}
71 
72 	/// Put message without locking.
73 	/// `available` must be checked before.
74 	void put(Value)(Value value)
75 	{
76 		assert(this.available > 0, "Queue is full");
77 
78 		const offset = this.provider.offset;
79 		const len = this.messages.length;
80 
81 		this.messages[offset] = value;
82 
83 		this.provider.offset = (offset + 1) % len;
84 	}
85 
86 	/// Create and put message.
87 	/// `available` must be checked before.
88 	void put(Value, Args...)(Args args)
89 	{
90 		this.put(Value(args));
91 	}
92 
93 	/// True when no more messages can never be consumed.
94 	auto empty()
95 	{
96 		return this.pending < 0;
97 	}
98 
99 	/// Get current pending message.
100 	/// `pending` must be checked before.
101 	Message front()
102 	{
103 		assert(this.pending > 0, "Queue is empty");
104 
105 		return this.messages[this.consumer.offset];
106 	}
107 
108 	/// Consume current pending message.
109 	/// `pending` must be checked before.
110 	void popFront()
111 	{
112 		assert(this.pending > 0, "Queue is empty");
113 
114 		const offset = (this.consumer.offset + 1) % this.messages.length;
115 		this.consumer.offset = offset;
116 	}
117 }
118 
119 /// Automatic fit buffer size to memory page size.
120 unittest
121 {
122 	auto q1 = new Queue!int;
123 	assert(q1.size == 1023);
124 
125 	auto q2 = new Queue!long;
126 	assert(q2.size == 511);
127 }
128 
129 /// Pending and available.
130 unittest
131 {
132 	auto q = new Queue!int(3);
133 	assert(q.pending == 0);
134 	assert(q.available == 3);
135 
136 	q.put(7);
137 	assert(q.pending == 1);
138 	assert(q.available == 2);
139 
140 	q.put(77);
141 	assert(q.pending == 2);
142 	assert(q.available == 1);
143 
144 	q.put(777);
145 	assert(q.pending == 3);
146 	assert(q.available == 0);
147 
148 	assert(q.front == 7);
149 	q.popFront;
150 	assert(q.pending == 2);
151 	assert(q.available == 1);
152 
153 	assert(q.front == 77);
154 	q.popFront;
155 	assert(q.pending == 1);
156 	assert(q.available == 2);
157 
158 	assert(q.front == 777);
159 	q.popFront;
160 	assert(q.pending == 0);
161 	assert(q.available == 3);
162 }
163 
164 /// Consume from empty is forbidden.
165 unittest
166 {
167 	import core.exception;
168 
169 	auto q = new Queue!int(1);
170 	q.provider.finalize();
171 
172 	q.front.assertThrown!AssertError;
173 	q.popFront.assertThrown!AssertError;
174 }
175 
176 /// Provide to full is forbidden.
177 unittest
178 {
179 	import core.exception;
180 
181 	auto q = new Queue!int(1);
182 	q.consumer.finalize();
183 	q.put(7);
184 
185 	q.put(77).assertThrown!AssertError;
186 }
187 
188 /// Make struct inside put.
189 unittest
190 {
191 	struct Foo
192 	{
193 		int a;
194 		int b;
195 	}
196 
197 	auto q = new Queue!Foo;
198 	q.put!Foo(7, 13);
199 
200 	assert(q.front == Foo(7, 13));
201 }