]> git.scottworley.com Git - reliable-chat/blob - server/server.go
a8b1c5334f3a778370d4ed58f00a1503fe3bb23e
[reliable-chat] / server / server.go
1 package main
2
3 import "container/list"
4 import "encoding/json"
5 import "log"
6 import "net/http"
7 import "time"
8
9 type Message struct {
10 Time time.Time
11 Text string
12 }
13
14 type FetchResponse struct {
15 Messages []Message
16 Time time.Time
17 }
18
19 type StoreRequest struct {
20 StartTime time.Time
21 Messages chan<- []Message
22 }
23
24 type Store struct {
25 Add chan Message
26 Get chan StoreRequest
27 }
28
29 // TODO: Monotonic clock
30
31 func manage_store(store Store) {
32 messages := list.New()
33 message_count := 0
34 max_messages := 1000
35 waiting := list.New()
36 for {
37 select {
38 case new_message := <-store.Add:
39 messages.PushBack(new_message)
40 for waiter := waiting.Front(); waiter != nil; waiter = waiter.Next() {
41 waiter.Value.(StoreRequest).Messages <- []Message{new_message}
42 close(waiter.Value.(StoreRequest).Messages)
43 }
44 waiting.Init()
45 if message_count < max_messages {
46 message_count++
47 } else {
48 messages.Remove(messages.Front())
49 }
50 case request := <-store.Get:
51 if messages.Back() == nil || request.StartTime.After(messages.Back().Value.(Message).Time) {
52 waiting.PushBack(request)
53 } else {
54 start := messages.Back()
55 response_size := 1
56 if messages.Front().Value.(Message).Time.After(request.StartTime) {
57 start = messages.Front()
58 response_size = message_count
59 } else {
60 for start.Prev().Value.(Message).Time.After(request.StartTime) {
61 start = start.Prev()
62 response_size++
63 }
64 }
65 response_messages := make([]Message, 0, response_size)
66 for m := start; m != nil; m = m.Next() {
67 response_messages = append(response_messages, m.Value.(Message))
68 }
69 request.Messages <- response_messages
70 }
71 }
72 }
73 }
74
75 func start_store() Store {
76 store := Store{make(chan Message, 20), make(chan StoreRequest, 20)}
77 go manage_store(store)
78 return store
79 }
80
81 func start_server(store Store) {
82 http.HandleFunc("/fetch", func(w http.ResponseWriter, r *http.Request) {
83 var since time.Time // TODO: Get start time from URL
84 messages_from_store := make(chan []Message, 1)
85 store.Get <- StoreRequest{since, messages_from_store}
86
87 json_encoded, err := json.Marshal(FetchResponse{<-messages_from_store, time.Now()})
88 if err != nil {
89 log.Print("json encode: ", err)
90 w.WriteHeader(http.StatusInternalServerError)
91 return
92 }
93 w.Header().Add("Content-Type", "application/json")
94 w.Write(json_encoded)
95 })
96
97 http.HandleFunc("/speak", func(w http.ResponseWriter, r *http.Request) {
98 text := "woof" // TODO: Get text from URL
99 store.Add <- Message{time.Now(), text}
100 })
101
102 log.Fatal(http.ListenAndServe(":8080", nil))
103 }
104
105 func main() {
106 store := start_store()
107 start_server(store)
108 }