1 module jin.go.output;
2 
3 import jin.go.channel;
4 import jin.go.input;
5 import jin.go.await;
6 
7 /// Round robin output channel.
8 /// Implements OutputRange.
9 struct Output(Message)
10 {
11     alias Complement = Input;
12 
13     mixin Channel!Message;
14 
15     /// Count of messages that can be privided now.
16     /// Negative value - new messages will never provided.
17     ptrdiff_t available()
18     {
19         ptrdiff_t available = -1;
20         const ways = this.queues.length;
21 
22         if (ways == 0)
23         {
24             return available;
25         }
26 
27         const start = this.current;
28         do
29         {
30             const queue = this.queues[this.current];
31 
32             const available2 = queue.available;
33             if (available2 > 0)
34             {
35                 return available2;
36             }
37 
38             if (available2 == 0)
39             {
40                 available = 0;
41             }
42 
43             this.current = (this.current + 1) % ways;
44         }
45         while (this.current != start);
46 
47         return available;
48     }
49 
50     /// True when no more messages will be consumed.
51     bool ignore()
52     {
53         return this.available == -1;
54     }
55 
56     /// Put message to current non full Queue and switch Queue
57     /// `available` must be checked before.
58     void put(Value)(Value value)
59     {
60         const available = this.available.await;
61         assert(available != -1, "Message will never consumed");
62 
63         const current = this.current;
64         this.queues[current].put(value);
65         this.current = (current + 1) % this.queues.length;
66     }
67 
68     /// Create and put message.
69     /// `available` must be checked before.
70     void put(Value, Args...)(Args args)
71     {
72         this.put(Value(args));
73     }
74 
75     /// Finalizes all cursors on destroy.
76     ~this()
77     {
78         foreach (queue; this.queues)
79         {
80             queue.provider.finalize();
81         }
82     }
83 
84 }