1 module jin.go.output; 2 3 import jin.go.channel; 4 import jin.go.input; 5 import jin.go.await; 6 7 /// Round robin output channel. 8 /// Implements OutputRange. 9 struct Output(Message) 10 { 11 alias Complement = Input; 12 13 mixin Channel!Message; 14 15 /// Count of messages that can be privided now. 16 /// Negative value - new messages will never provided. 17 ptrdiff_t available() 18 { 19 ptrdiff_t available = -1; 20 const ways = this.queues.length; 21 22 if (ways == 0) 23 { 24 return available; 25 } 26 27 const start = this.current; 28 do 29 { 30 const queue = this.queues[this.current]; 31 32 const available2 = queue.available; 33 if (available2 > 0) 34 { 35 return available2; 36 } 37 38 if (available2 == 0) 39 { 40 available = 0; 41 } 42 43 this.current = (this.current + 1) % ways; 44 } 45 while (this.current != start); 46 47 return available; 48 } 49 50 /// True when no more messages will be consumed. 51 bool ignore() 52 { 53 return this.available == -1; 54 } 55 56 /// Put message to current non full Queue and switch Queue 57 /// `available` must be checked before. 58 void put(Value)(Value value) 59 { 60 const available = this.available.await; 61 assert(available != -1, "Message will never consumed"); 62 63 const current = this.current; 64 this.queues[current].put(value); 65 this.current = (current + 1) % this.queues.length; 66 } 67 68 /// Create and put message. 69 /// `available` must be checked before. 70 void put(Value, Args...)(Args args) 71 { 72 this.put(Value(args)); 73 } 74 75 /// Finalizes all cursors on destroy. 76 ~this() 77 { 78 foreach (queue; this.queues) 79 { 80 queue.provider.finalize(); 81 } 82 } 83 84 }