1 module jin.go.go; 2 3 import std.range; 4 import std.traits; 5 import std.algorithm; 6 7 import vibe.core.core; 8 9 public import jin.go.channel; 10 public import jin.go.await; 11 12 /// Run function asynchronously 13 auto go(alias task, Args...)(Args args) 14 if (is(ReturnType!task : void) && (Parameters!task.length == Args.length)) 15 { 16 foreach (i, Arg; Args) 17 { 18 static assert(IsSafeToTransfer!Arg, "Value of type (" ~ Arg.stringof 19 ~ ") is not safe to pass between threads. " 20 ~ "Make it immutable, non-copyable or shared!"); 21 } 22 return runWorkerTask(&task, args); 23 } 24 25 /// Run function asynchronously and return Queue connectetd with range returned by function 26 auto go(alias task, Args...)(Args args) 27 if (isInputRange!(ReturnType!task)) 28 { 29 alias Result = ReturnType!task; 30 alias Message = ElementType!Result; 31 32 Input!Message future; 33 34 static void wrapper(Output!Message future, Result function(Args) task, Args args) 35 { 36 task(args).copy(&future); 37 } 38 39 go!wrapper(future.pair, &task, args); 40 41 return future; 42 } 43 44 /// Run function with autocreated result Queue and return this Queue 45 auto go(alias task, Args...)(Args args) 46 if ((Parameters!task.length == Args.length + 1) 47 && (is(Parameters!task[0] == Output!Message, Message))) 48 { 49 Parameters!task[0] results; 50 auto future = results.pair; 51 go!task(results.move, args); 52 return future; 53 } 54 55 /// Safe to transfer between threads: shared, immutable, non-copiable 56 template IsSafeToTransfer(Value) 57 { 58 enum IsCopyable(Value) = __traits(compiles, { Value x, y = x; }); 59 enum IsSafeToTransfer = !hasUnsharedAliasing!Value || !IsCopyable!Value; 60 } 61 62 /// Bidirection : start , put*2 , take 63 unittest 64 { 65 import jin.go; 66 67 static void summing(Output!int sums, Input!int feed) 68 { 69 sums.put(feed.next + feed.next); 70 } 71 72 Output!int feed; 73 Input!int sums; 74 go!summing(sums.pair, feed.pair); 75 76 feed.put(3); 77 feed.put(4); 78 assert(sums.next == 3 + 4); 79 } 80 81 /// Bidirection : put*2 , start , take 82 unittest 83 { 84 import jin.go; 85 86 static void summing(Output!int sums, Input!int feed) 87 { 88 sums.put(feed.next + feed.next); 89 } 90 91 Output!int feed; 92 auto ifeed = feed.pair; 93 feed.put(3); 94 feed.put(4); 95 feed.destroy(); 96 97 Input!int sums; 98 go!summing(sums.pair, ifeed.move); 99 100 assert(sums.next == 3 + 4); 101 } 102 103 /// Round robin : start*2 , put*4 , take*2 104 unittest 105 { 106 import jin.go; 107 108 Output!int feed; 109 Input!int sums; 110 111 static void summing(Output!int sums, Input!int feed) 112 { 113 sums.put(feed.next + feed.next); 114 } 115 116 go!summing(sums.pair, feed.pair); 117 go!summing(sums.pair, feed.pair); 118 119 feed.put(3); // 1 120 feed.put(4); // 2 121 feed.put(5); // 1 122 feed.put(6); // 2 123 124 assert(sums[].sort().array == [3 + 5, 4 + 6]); 125 } 126 127 /// Event loop on multiple queues 128 unittest 129 { 130 import jin.go; 131 132 static void generating1(Output!int numbs) 133 { 134 numbs.put(2); 135 numbs.put(3); 136 } 137 138 static void generating2(Output!long numbs) 139 { 140 numbs.put(4); 141 numbs.put(5); 142 } 143 144 auto numbs1 = go!generating1; 145 auto numbs2 = go!generating2; 146 147 int[] results1; 148 long[] results2; 149 150 while (!numbs1.empty || !numbs2.empty) 151 { 152 if (numbs1.pending > 0) 153 { 154 results1 ~= numbs1.next; 155 } 156 if (numbs2.pending > 0) 157 { 158 results2 ~= numbs2.next; 159 continue; 160 } 161 } 162 163 assert(results1 == [2, 3]); 164 assert(results2 == [4, 5]); 165 } 166 167 /// Blocking on buffer overflow 168 unittest 169 { 170 import core.time; 171 import jin.go; 172 173 static auto generating() 174 { 175 return 1.repeat.take(200); 176 } 177 178 auto numbs = go!generating; 179 10.msecs.sleep; 180 181 assert(numbs[].sum == 200); 182 } 183 184 /// https://tour.golang.org/concurrency/1 185 /// "go" template starts function in new asynchronous coroutine 186 /// Coroutines starts in thread pool and may be executed in parallel threads. 187 /// Only thread safe values can be passed to function. 188 unittest 189 { 190 import core.time; 191 import std.range; 192 import jin.go; 193 194 Input!string log; 195 196 static void saying(Output!string log, string message) 197 { 198 foreach (_; 3.iota) 199 { 200 200.msecs.sleep; 201 log.put(message); 202 } 203 } 204 205 go!saying(log.pair, "hello"); 206 saying(log.pair, "world"); 207 208 assert(log[].length == 6); 209 } 210 211 /// https://tour.golang.org/concurrency/3 212 /// Queue is one-consumer-one-provider wait-free typed queue with InputRange and OutputRange interfaces support. 213 /// Use "next" property to send and receive messages; 214 unittest 215 { 216 import jin.go; 217 218 Output!int output; 219 auto input = output.pair; 220 output.put(1); 221 output.put(2); 222 assert(input.next == 1); 223 assert(input.next == 2); 224 } 225 226 /// https://tour.golang.org/concurrency/2 227 /// Inputs is round robin input Queue list with InputRange and Queue interfaces support. 228 /// Method "pair" creates new Queue for every coroutine 229 unittest 230 { 231 import std.algorithm; 232 import std.range; 233 import jin.go; 234 235 static auto summing(Output!int sums, const int[] numbers) 236 { 237 sums.put(numbers.sum); 238 } 239 240 immutable int[] numbers = [7, 2, 8, -9, 4, 0]; 241 242 Input!int sums; 243 go!summing(sums.pair(1), numbers[0 .. $ / 2]); 244 go!summing(sums.pair(1), numbers[$ / 2 .. $]); 245 auto res = (&sums).take(2).array; 246 247 assert((res ~ res.sum).sort.array == [-5, 12, 17]); 248 } 249 250 /// https://tour.golang.org/concurrency/4 251 /// You can iterate over Queue by "foreach" like InputRange, and all standart algorithms support this. 252 /// Use "close" method to notify about no more data. 253 unittest 254 { 255 import std.range; 256 import jin.go; 257 258 static auto fibonacci(Output!int numbers, size_t count) 259 { 260 auto range = recurrence!q{ a[n-1] + a[n-2] }(0, 1).take(count); 261 foreach (x; range) 262 numbers.put(x); 263 } 264 265 Input!int numbers; 266 go!fibonacci(numbers.pair(10), 10); 267 268 assert(numbers[] == [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]); 269 } 270 271 /// https://tour.golang.org/concurrency/4 272 /// Function can return InputRange and it will be automatically converted to input Queue. 273 unittest 274 { 275 import std.range; 276 import jin.go; 277 278 static auto fibonacci(int limit) 279 { 280 return recurrence!q{ a[n-1] + a[n-2] }(0, 1).take(limit); 281 } 282 283 assert(fibonacci(10).array == [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]); 284 assert(go!fibonacci(10).array == [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]); 285 } 286 287 /// https://tour.golang.org/concurrency/5 288 /// Use custom loop to watch multiple Queues as you want. 289 /// Provider can be slave by using "needed" property. 290 /// Use "yield" to allow other coroutines executed between cycles. 291 unittest 292 { 293 import std.range; 294 import jin.go; 295 296 __gshared int[] log; 297 298 static auto fibonacci(Output!int numbers) 299 { 300 auto range = recurrence!q{ a[n-1] + a[n-2] }(0, 1); 301 302 foreach (num; range) 303 { 304 numbers.put(num); 305 306 if (numbers.available == -1) 307 { 308 break; 309 } 310 } 311 312 } 313 314 static void printing(Output!bool controls, Input!int numbers) 315 { 316 foreach (i; 10.iota) 317 { 318 log ~= numbers.next; 319 } 320 } 321 322 Output!int numbers; 323 Input!bool controls; 324 325 go!printing(controls.pair(1), numbers.pair(1)); 326 go!fibonacci(numbers.move); 327 328 controls.pending.await; 329 330 assert(log == [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]); 331 } 332 333 /// https://tour.golang.org/concurrency/6 334 /// You can ommit first argument of Queue type, and it will be autogenerated and returned. 335 unittest 336 { 337 import core.time; 338 import jin.go; 339 340 static void after(Output!bool signals, Duration dur) 341 { 342 dur.sleep; 343 signals.put(true); 344 } 345 346 static auto tick(Output!bool signals, Duration dur) 347 { 348 while (signals.available >= 0) 349 { 350 dur.sleep; 351 signals.put(true); 352 } 353 } 354 355 auto ticks = go!tick(100.msecs); 356 auto booms = go!after(450.msecs); 357 358 string log; 359 360 for (;;) 361 { 362 if (ticks.pending > 0) 363 { 364 log ~= "tick,"; 365 ticks.popFront; 366 continue; 367 } 368 if (booms.pending > 0) 369 { 370 log ~= "BOOM!"; 371 break; 372 } 373 10.msecs.sleep; 374 } 375 376 // unstable 377 // assert( log == "tick,tick,tick,tick,BOOM!"); 378 }