]> git.scottworley.com Git - reliable-chat/blob - server/server.go
8c8a9893d61360dd00f3ea643871fe7fb33339d2
[reliable-chat] / server / server.go
1 package main
2
3 import "container/list"
4 import "encoding/json"
5 import "log"
6 import "net/http"
7 import "time"
8
9 type Message struct {
10 Time time.Time
11 Text string
12 }
13
14 type FetchResponse struct {
15 Messages []Message
16 Time time.Time
17 }
18
19 type StoreRequest struct {
20 StartTime time.Time
21 Messages chan<- []Message
22 }
23
24 type Store struct {
25 Add chan Message
26 Get chan StoreRequest
27 }
28
29 // TODO: Monotonic clock
30
31 func manage_store(store Store) {
32 messages := list.New()
33 message_count := 0
34 max_messages := 1000
35 waiting := list.New()
36 for {
37 select {
38 case new_message := <-store.Add:
39 messages.PushBack(new_message)
40 for waiter := waiting.Front(); waiter != nil; waiter = waiter.Next() {
41 waiter.Value.(StoreRequest).Messages <- []Message{new_message}
42 close(waiter.Value.(StoreRequest).Messages)
43 }
44 waiting.Init()
45 if message_count < max_messages {
46 message_count++
47 } else {
48 messages.Remove(messages.Front())
49 }
50 case request := <-store.Get:
51 if messages.Back() == nil || request.StartTime.After(messages.Back().Value.(Message).Time) {
52 waiting.PushBack(request)
53 } else {
54 start := messages.Back()
55 response_size := 1
56 if messages.Front().Value.(Message).Time.After(request.StartTime) {
57 start = messages.Front()
58 response_size = message_count
59 } else {
60 for start.Prev().Value.(Message).Time.After(request.StartTime) {
61 start = start.Prev()
62 response_size++
63 }
64 }
65 response_messages := make([]Message, 0, response_size)
66 for m := start; m != nil; m = m.Next() {
67 response_messages = append(response_messages, m.Value.(Message))
68 }
69 request.Messages <- response_messages
70 }
71 }
72 }
73 }
74
75 func start_store() Store {
76 store := Store{make(chan Message, 20), make(chan StoreRequest, 20)}
77 go manage_store(store)
78 return store
79 }
80
81 func start_server(store Store) {
82 http.HandleFunc("/fetch", func(w http.ResponseWriter, r *http.Request) {
83 var since time.Time
84 url_since := r.FormValue("since")
85 if url_since != "" {
86 err := json.Unmarshal([]byte(url_since), &since)
87 if err != nil {
88 log.Print("fetch: parse since: ", err)
89 w.WriteHeader(http.StatusBadRequest)
90 w.Write([]byte("Could not parse since as date"))
91 return
92 }
93 }
94 messages_from_store := make(chan []Message, 1)
95 store.Get <- StoreRequest{since, messages_from_store}
96
97 json_encoded, err := json.Marshal(FetchResponse{<-messages_from_store, time.Now()})
98 if err != nil {
99 log.Print("json encode: ", err)
100 w.WriteHeader(http.StatusInternalServerError)
101 return
102 }
103 w.Header().Add("Content-Type", "application/json")
104 w.Write(json_encoded)
105 })
106
107 http.HandleFunc("/speak", func(w http.ResponseWriter, r *http.Request) {
108 store.Add <- Message{time.Now(), r.FormValue("text")}
109 })
110
111 log.Fatal(http.ListenAndServe(":8080", nil))
112 }
113
114 func main() {
115 store := start_store()
116 start_server(store)
117 }