1 module jin.go.input;
2 
3 import jin.go.channel;
4 import jin.go.output;
5 import jin.go.await;
6 
7 /// Round robin input channel.
8 /// Implements InputRange.
9 struct Input(Message)
10 {
11     alias Complement = Output;
12 
13     mixin Channel!Message;
14 
15     /// Minimum count of pending messages.
16     /// Negative value - new messages will never provided.
17     ptrdiff_t pending()
18     {
19         ptrdiff_t pending = -1;
20         const ways = this.queues.length;
21 
22         if (ways == 0)
23         {
24             return pending;
25         }
26 
27         const start = this.current;
28         do
29         {
30             const queue = this.queues[this.current];
31 
32             auto pending2 = queue.pending;
33             if (pending2 > 0)
34             {
35                 return pending2;
36             }
37 
38             if (pending2 == 0)
39             {
40                 pending = 0;
41             }
42 
43             this.current = (this.current + 1) % ways;
44         }
45         while (this.current != start);
46 
47         return pending;
48     }
49 
50     /// True when no more messages will be consumed.
51     bool empty()
52     {
53         return this.pending == -1;
54     }
55 
56     /// Get message fromm current non clear Queue or wait.
57     /// `pending` must be checked before.
58     Message front()
59     {
60         const pending = this.pending.await;
61         assert(pending != -1, "Message will never provided");
62 
63         return this.queues[this.current].front;
64     }
65 
66     /// Consume current pending message and switch to another Queue.
67     /// `pending` must be checked before.
68     void popFront()
69     {
70         assert(this.pending > 0, "Channel is empty");
71 
72         const current = this.current;
73         this.queues[current].popFront;
74         this.current = (current + 1) % this.queues.length;
75     }
76 
77     /// Consumes current message;
78     Message next()
79     {
80         auto value = this.front;
81         this.popFront;
82         return value;
83     }
84 
85     /// Iterates over all messages.
86     /// Example: `foreach(msg : chan) {...}`
87     int opApply(int delegate(Message) proceed)
88     {
89         for (;;)
90         {
91             const pending = this.pending.await;
92             if (pending == -1)
93             {
94                 return -1;
95             }
96 
97             auto result = proceed(this.front);
98             this.popFront();
99 
100             if (result)
101             {
102                 return result;
103             }
104         }
105     }
106 
107     /// Collects all messages to array.
108     /// Example: `chan[]`
109     Message[] opSlice()
110     {
111         Message[] list;
112         foreach (msg; this)
113             list ~= msg;
114         return list;
115     }
116 
117     /// Fix all cursors on destroy.
118     ~this()
119     {
120         foreach (queue; this.queues)
121         {
122             queue.consumer.finalize();
123         }
124     }
125 
126 }