1 module jin.go.go;
2 
3 import std.range;
4 import std.traits;
5 import std.algorithm;
6 
7 import vibe.core.core;
8 
9 public import jin.go.channel;
10 public import jin.go.await;
11 
12 /// Run function asynchronously
13 auto go(alias task, Args...)(Args args)
14         if (is(ReturnType!task : void) && (Parameters!task.length == Args.length))
15 {
16     foreach (i, Arg; Args)
17     {
18         static assert(IsSafeToTransfer!Arg, "Value of type (" ~ Arg.stringof
19                 ~ ") is not safe to pass between threads. "
20                 ~ "Make it immutable, non-copyable or shared!");
21     }
22     return runWorkerTask(&task, args);
23 }
24 
25 /// Run function asynchronously and return Queue connectetd with range returned by function
26 auto go(alias task, Args...)(Args args)
27         if (isInputRange!(ReturnType!task))
28 {
29     alias Result = ReturnType!task;
30     alias Message = ElementType!Result;
31 
32     Input!Message future;
33 
34     static void wrapper(Output!Message future, Result function(Args) task, Args args)
35     {
36         task(args).copy(&future);
37     }
38 
39     go!wrapper(future.pair, &task, args);
40 
41     return future;
42 }
43 
44 /// Run function with autocreated result Queue and return this Queue
45 auto go(alias task, Args...)(Args args)
46         if ((Parameters!task.length == Args.length + 1)
47             && (is(Parameters!task[0] == Output!Message, Message)))
48 {
49     Parameters!task[0] results;
50     auto future = results.pair;
51     go!task(results.move, args);
52     return future;
53 }
54 
55 /// Safe to transfer between threads: shared, immutable, non-copiable
56 template IsSafeToTransfer(Value)
57 {
58     enum IsCopyable(Value) = __traits(compiles, { Value x, y = x; });
59     enum IsSafeToTransfer = !hasUnsharedAliasing!Value || !IsCopyable!Value;
60 }
61 
62 /// Bidirection : start , put*2 , take
63 unittest
64 {
65     import jin.go;
66 
67     static void summing(Output!int sums, Input!int feed)
68     {
69         sums.put(feed.next + feed.next);
70     }
71 
72     Output!int feed;
73     Input!int sums;
74     go!summing(sums.pair, feed.pair);
75 
76     feed.put(3);
77     feed.put(4);
78     assert(sums.next == 3 + 4);
79 }
80 
81 /// Bidirection : put*2 , start , take
82 unittest
83 {
84     import jin.go;
85 
86     static void summing(Output!int sums, Input!int feed)
87     {
88         sums.put(feed.next + feed.next);
89     }
90 
91     Output!int feed;
92     auto ifeed = feed.pair;
93     feed.put(3);
94     feed.put(4);
95     feed.destroy();
96 
97     Input!int sums;
98     go!summing(sums.pair, ifeed.move);
99 
100     assert(sums.next == 3 + 4);
101 }
102 
103 /// Round robin : start*2 , put*4 , take*2
104 unittest
105 {
106     import jin.go;
107 
108     Output!int feed;
109     Input!int sums;
110 
111     static void summing(Output!int sums, Input!int feed)
112     {
113         sums.put(feed.next + feed.next);
114     }
115 
116     go!summing(sums.pair, feed.pair);
117     go!summing(sums.pair, feed.pair);
118 
119     feed.put(3); // 1
120     feed.put(4); // 2
121     feed.put(5); // 1
122     feed.put(6); // 2
123 
124     assert(sums[].sort().array == [3 + 5, 4 + 6]);
125 }
126 
127 /// Event loop on multiple queues
128 unittest
129 {
130     import jin.go;
131 
132     static void generating1(Output!int numbs)
133     {
134         numbs.put(2);
135         numbs.put(3);
136     }
137 
138     static void generating2(Output!long numbs)
139     {
140         numbs.put(4);
141         numbs.put(5);
142     }
143 
144     auto numbs1 = go!generating1;
145     auto numbs2 = go!generating2;
146 
147     int[] results1;
148     long[] results2;
149 
150     while (!numbs1.empty || !numbs2.empty)
151     {
152         if (numbs1.pending > 0)
153         {
154             results1 ~= numbs1.next;
155         }
156         if (numbs2.pending > 0)
157         {
158             results2 ~= numbs2.next;
159             continue;
160         }
161     }
162 
163     assert(results1 == [2, 3]);
164     assert(results2 == [4, 5]);
165 }
166 
167 /// Blocking on buffer overflow
168 unittest
169 {
170     import core.time;
171     import jin.go;
172 
173     static auto generating()
174     {
175         return 1.repeat.take(200);
176     }
177 
178     auto numbs = go!generating;
179     10.msecs.sleep;
180 
181     assert(numbs[].sum == 200);
182 }
183 
184 /// https://tour.golang.org/concurrency/1
185 /// "go" template starts function in new asynchronous coroutine
186 /// Coroutines starts in thread pool and may be executed in parallel threads.
187 /// Only thread safe values can be passed to function.
188 unittest
189 {
190     import core.time;
191     import std.range;
192     import jin.go;
193 
194     Input!string log;
195 
196     static void saying(Output!string log, string message)
197     {
198         foreach (_; 3.iota)
199         {
200             200.msecs.sleep;
201             log.put(message);
202         }
203     }
204 
205     go!saying(log.pair, "hello");
206     saying(log.pair, "world");
207 
208     assert(log[].length == 6);
209 }
210 
211 /// https://tour.golang.org/concurrency/3
212 /// Queue is one-consumer-one-provider wait-free typed queue with InputRange and OutputRange interfaces support.
213 /// Use "next" property to send and receive messages;
214 unittest
215 {
216     import jin.go;
217 
218     Output!int output;
219     auto input = output.pair;
220     output.put(1);
221     output.put(2);
222     assert(input.next == 1);
223     assert(input.next == 2);
224 }
225 
226 /// https://tour.golang.org/concurrency/2
227 /// Inputs is round robin input Queue list with InputRange and Queue interfaces support.
228 /// Method "pair" creates new Queue for every coroutine
229 unittest
230 {
231     import std.algorithm;
232     import std.range;
233     import jin.go;
234 
235     static auto summing(Output!int sums, const int[] numbers)
236     {
237         sums.put(numbers.sum);
238     }
239 
240     immutable int[] numbers = [7, 2, 8, -9, 4, 0];
241 
242     Input!int sums;
243     go!summing(sums.pair(1), numbers[0 .. $ / 2]);
244     go!summing(sums.pair(1), numbers[$ / 2 .. $]);
245     auto res = (&sums).take(2).array;
246 
247     assert((res ~ res.sum).sort.array == [-5, 12, 17]);
248 }
249 
250 /// https://tour.golang.org/concurrency/4
251 /// You can iterate over Queue by "foreach" like InputRange, and all standart algorithms support this.
252 /// Use "close" method to notify about no more data.
253 unittest
254 {
255     import std.range;
256     import jin.go;
257 
258     static auto fibonacci(Output!int numbers, size_t count)
259     {
260         auto range = recurrence!q{ a[n-1] + a[n-2] }(0, 1).take(count);
261         foreach (x; range)
262             numbers.put(x);
263     }
264 
265     Input!int numbers;
266     go!fibonacci(numbers.pair(10), 10);
267 
268     assert(numbers[] == [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]);
269 }
270 
271 /// https://tour.golang.org/concurrency/4
272 /// Function can return InputRange and it will be automatically converted to input Queue.
273 unittest
274 {
275     import std.range;
276     import jin.go;
277 
278     static auto fibonacci(int limit)
279     {
280         return recurrence!q{ a[n-1] + a[n-2] }(0, 1).take(limit);
281     }
282 
283     assert(fibonacci(10).array == [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]);
284     assert(go!fibonacci(10).array == [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]);
285 }
286 
287 /// https://tour.golang.org/concurrency/5
288 /// Use custom loop to watch multiple Queues as you want.
289 /// Provider can be slave by using "needed" property.
290 /// Use "yield" to allow other coroutines executed between cycles.
291 unittest
292 {
293     import std.range;
294     import jin.go;
295 
296     __gshared int[] log;
297 
298     static auto fibonacci(Output!int numbers)
299     {
300         auto range = recurrence!q{ a[n-1] + a[n-2] }(0, 1);
301 
302         foreach (num; range)
303         {
304             numbers.put(num);
305 
306             if (numbers.available == -1)
307             {
308                 break;
309             }
310         }
311 
312     }
313 
314     static void printing(Output!bool controls, Input!int numbers)
315     {
316         foreach (i; 10.iota)
317         {
318             log ~= numbers.next;
319         }
320     }
321 
322     Output!int numbers;
323     Input!bool controls;
324 
325     go!printing(controls.pair(1), numbers.pair(1));
326     go!fibonacci(numbers.move);
327 
328     controls.pending.await;
329 
330     assert(log == [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]);
331 }
332 
333 /// https://tour.golang.org/concurrency/6
334 /// You can ommit first argument of Queue type, and it will be autogenerated and returned.
335 unittest
336 {
337     import core.time;
338     import jin.go;
339 
340     static void after(Output!bool signals, Duration dur)
341     {
342         dur.sleep;
343         signals.put(true);
344     }
345 
346     static auto tick(Output!bool signals, Duration dur)
347     {
348         while (signals.available >= 0)
349         {
350             dur.sleep;
351             signals.put(true);
352         }
353     }
354 
355     auto ticks = go!tick(100.msecs);
356     auto booms = go!after(450.msecs);
357 
358     string log;
359 
360     for (;;)
361     {
362         if (ticks.pending > 0)
363         {
364             log ~= "tick,";
365             ticks.popFront;
366             continue;
367         }
368         if (booms.pending > 0)
369         {
370             log ~= "BOOM!";
371             break;
372         }
373         10.msecs.sleep;
374     }
375 
376     // unstable
377     // assert( log == "tick,tick,tick,tick,BOOM!");
378 }