1 module jin.go.input; 2 3 import jin.go.channel; 4 import jin.go.output; 5 import jin.go.await; 6 7 /// Round robin input channel. 8 /// Implements InputRange. 9 struct Input(Message) 10 { 11 alias Complement = Output; 12 13 mixin Channel!Message; 14 15 /// Minimum count of pending messages. 16 /// Negative value - new messages will never provided. 17 ptrdiff_t pending() 18 { 19 ptrdiff_t pending = -1; 20 const ways = this.queues.length; 21 22 if (ways == 0) 23 { 24 return pending; 25 } 26 27 const start = this.current; 28 do 29 { 30 const queue = this.queues[this.current]; 31 32 auto pending2 = queue.pending; 33 if (pending2 > 0) 34 { 35 return pending2; 36 } 37 38 if (pending2 == 0) 39 { 40 pending = 0; 41 } 42 43 this.current = (this.current + 1) % ways; 44 } 45 while (this.current != start); 46 47 return pending; 48 } 49 50 /// True when no more messages will be consumed. 51 bool empty() 52 { 53 return this.pending == -1; 54 } 55 56 /// Get message fromm current non clear Queue or wait. 57 /// `pending` must be checked before. 58 Message front() 59 { 60 const pending = this.pending.await; 61 assert(pending != -1, "Message will never provided"); 62 63 return this.queues[this.current].front; 64 } 65 66 /// Consume current pending message and switch to another Queue. 67 /// `pending` must be checked before. 68 void popFront() 69 { 70 assert(this.pending > 0, "Channel is empty"); 71 72 const current = this.current; 73 this.queues[current].popFront; 74 this.current = (current + 1) % this.queues.length; 75 } 76 77 /// Consumes current message; 78 Message next() 79 { 80 auto value = this.front; 81 this.popFront; 82 return value; 83 } 84 85 /// Iterates over all messages. 86 /// Example: `foreach(msg : chan) {...}` 87 int opApply(int delegate(Message) proceed) 88 { 89 for (;;) 90 { 91 const pending = this.pending.await; 92 if (pending == -1) 93 { 94 return -1; 95 } 96 97 auto result = proceed(this.front); 98 this.popFront(); 99 100 if (result) 101 { 102 return result; 103 } 104 } 105 } 106 107 /// Collects all messages to array. 108 /// Example: `chan[]` 109 Message[] opSlice() 110 { 111 Message[] list; 112 foreach (msg; this) 113 list ~= msg; 114 return list; 115 } 116 117 /// Fix all cursors on destroy. 118 ~this() 119 { 120 foreach (queue; this.queues) 121 { 122 queue.consumer.finalize(); 123 } 124 } 125 126 }