diff --git a/server/api_key.go b/server/api_key.go index 795b7f8c..a24845f5 100644 --- a/server/api_key.go +++ b/server/api_key.go @@ -13,7 +13,8 @@ import ( "crypto/hmac" "crypto/md5" "encoding/base64" - "log" + + "github.com/tinode/chat/server/logs" ) // Singned AppID. Composition: @@ -46,11 +47,11 @@ func checkAPIKey(apikey string) (isValid, isRoot bool) { data, err := base64.URLEncoding.DecodeString(apikey) if err != nil { - log.Println("failed to decode.base64 appid ", err) + logs.Warning.Println("failed to decode.base64 appid ", err) return } if data[0] != 1 { - log.Println("unknown appid signature algorithm ", data[0]) + logs.Warning.Println("unknown appid signature algorithm ", data[0]) return } @@ -58,7 +59,7 @@ func checkAPIKey(apikey string) (isValid, isRoot bool) { hasher.Write(data[:apikeyVersion+apikeyAppID+apikeySequence+apikeyWho]) check := hasher.Sum(nil) if !bytes.Equal(data[apikeyVersion+apikeyAppID+apikeySequence+apikeyWho:], check) { - log.Println("invalid apikey signature") + logs.Warning.Println("invalid apikey signature") return } diff --git a/server/auth/rest/auth_rest.go b/server/auth/rest/auth_rest.go index f690b4a2..c9f204ca 100644 --- a/server/auth/rest/auth_rest.go +++ b/server/auth/rest/auth_rest.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "io/ioutil" - "log" "net/http" "net/url" "regexp" @@ -14,6 +13,7 @@ import ( "time" "github.com/tinode/chat/server/auth" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" ) @@ -179,7 +179,7 @@ func (a *authenticator) Authenticate(secret []byte, remoteAddr string) (*auth.Re // Auth record not found. if resp.Record == nil { - log.Println("rest_auth: invalid response: missing Record") + logs.Warning.Println("rest_auth: invalid response: missing Record") return nil, nil, types.ErrInternal } @@ -276,7 +276,7 @@ func (a *authenticator) RestrictedTags() ([]string, error) { if len(resp.ByteVal) > 0 { a.reToken, err = regexp.Compile(string(resp.ByteVal)) if err != nil { - log.Println("rest_auth: invalid token regexp", string(resp.ByteVal)) + logs.Warning.Println("rest_auth: invalid token regexp", string(resp.ByteVal)) } } return resp.StrSliceVal, nil diff --git a/server/cluster.go b/server/cluster.go index a96b7596..a0ac1568 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -4,7 +4,6 @@ import ( "encoding/gob" "encoding/json" "errors" - "log" "net" "net/rpc" "reflect" @@ -14,6 +13,7 @@ import ( "time" "github.com/tinode/chat/server/auth" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/push" rh "github.com/tinode/chat/server/ringhash" "github.com/tinode/chat/server/store/types" @@ -256,7 +256,7 @@ func (n *ClusterNode) reconnect() { n.reconnecting = false n.lock.Unlock() statsInc("LiveClusterNodes", 1) - log.Println("cluster: connected to", n.name) + logs.Info.Println("cluster: connected to", n.name) // Send this node credentials to the new node. var unused bool n.call("Cluster.Ping", &ClusterPing{ @@ -274,7 +274,7 @@ func (n *ClusterNode) reconnect() { // Wait for timer to try to reconnect again. Do nothing if the timer is inactive. case <-n.done: // Shutting down - log.Println("cluster: shutdown started at node", n.name) + logs.Info.Println("cluster: shutdown started at node", n.name) reconnTicker.Stop() if n.endpoint != nil { n.endpoint.Close() @@ -283,7 +283,7 @@ func (n *ClusterNode) reconnect() { n.connected = false n.reconnecting = false n.lock.Unlock() - log.Println("cluster: shut down completed at node", n.name) + logs.Info.Println("cluster: shut down completed at node", n.name) return } } @@ -295,7 +295,7 @@ func (n *ClusterNode) call(proc string, req, resp interface{}) error { } if err := n.endpoint.Call(proc, req, resp); err != nil { - log.Println("cluster: call failed", n.name, err) + logs.Warning.Println("cluster: call failed", n.name, err) n.lock.Lock() if n.connected { @@ -313,7 +313,7 @@ func (n *ClusterNode) call(proc string, req, resp interface{}) error { func (n *ClusterNode) handleRpcResponse(call *rpc.Call) { if call.Error != nil { - log.Printf("cluster: %s call failed: %s", call.ServiceMethod, call.Error) + logs.Warning.Printf("cluster: %s call failed: %s", call.ServiceMethod, call.Error) n.lock.Lock() if n.connected { n.endpoint.Close() @@ -327,7 +327,7 @@ func (n *ClusterNode) handleRpcResponse(call *rpc.Call) { func (n *ClusterNode) callAsync(proc string, req, resp interface{}, done chan *rpc.Call) *rpc.Call { if done != nil && cap(done) == 0 { - log.Panic("cluster: RPC done channel is unbuffered") + logs.Error.Panic("cluster: RPC done channel is unbuffered") } if !n.connected { @@ -425,7 +425,7 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error { node := c.nodes[msg.Node] if node == nil { - log.Println("cluster TopicMaster: request from an unknown node", msg.Node) + logs.Warning.Println("cluster TopicMaster: request from an unknown node", msg.Node) return nil } @@ -447,7 +447,7 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error { } if msg.Signature != c.ring.Signature() { - log.Println("cluster TopicMaster: session signature mismatch", msg.RcptTo) + logs.Warning.Println("cluster TopicMaster: session signature mismatch", msg.RcptTo) *rejected = true return nil } @@ -461,7 +461,7 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error { node.msess[msid] = struct{}{} node.lock.Unlock() - log.Println("cluster: multiplexing session started", msid, count) + logs.Info.Println("cluster: multiplexing session started", msid, count) } // This is a local copy of a remote session. @@ -496,7 +496,7 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error { default: // Reply with a 500 to the user. sess.queueOut(ErrUnknownReply(msg.CliMsg, msg.CliMsg.Timestamp)) - log.Println("cluster: join req failed - hub.join queue full, topic ", msg.CliMsg.RcptTo, "; orig sid ", sess.sid) + logs.Warning.Println("cluster: join req failed - hub.join queue full, topic ", msg.CliMsg.RcptTo, "; orig sid ", sess.sid) } case ProxyReqLeave: @@ -506,7 +506,7 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error { sess: sess, } } else { - log.Println("cluster: leave request for unknown topic", msg.RcptTo) + logs.Warning.Println("cluster: leave request for unknown topic", msg.RcptTo) } case ProxyReqMeta: @@ -518,10 +518,10 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error { }: default: sess.queueOut(ErrUnknownReply(msg.CliMsg, msg.CliMsg.Timestamp)) - log.Println("cluster: meta req failed - topic.meta queue full, topic ", msg.CliMsg.RcptTo, "; orig sid ", sess.sid) + logs.Warning.Println("cluster: meta req failed - topic.meta queue full, topic ", msg.CliMsg.RcptTo, "; orig sid ", sess.sid) } } else { - log.Println("cluster: meta request for unknown topic", msg.RcptTo) + logs.Warning.Println("cluster: meta request for unknown topic", msg.RcptTo) } case ProxyReqBroadcast: @@ -530,13 +530,13 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error { select { case globals.hub.route <- msg.SrvMsg: default: - log.Println("cluster: route req failed - hub.route queue full") + logs.Error.Println("cluster: route req failed - hub.route queue full") } case ProxyReqBgSession, ProxyReqMeUserAgent: if t := globals.hub.topicGet(msg.RcptTo); t != nil { if t.supd == nil { - log.Panicln("cluster: invalid topic category in session update", t.name, msg.ReqType) + logs.Error.Panicln("cluster: invalid topic category in session update", t.name, msg.ReqType) } su := &sessionUpdate{} if msg.ReqType == ProxyReqBgSession { @@ -546,11 +546,11 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error { } t.supd <- su } else { - log.Println("cluster: session update for unknown topic", msg.RcptTo, msg.ReqType) + logs.Warning.Println("cluster: session update for unknown topic", msg.RcptTo, msg.ReqType) } default: - log.Println("cluster: unknown request type", msg.ReqType, msg.RcptTo) + logs.Warning.Println("cluster: unknown request type", msg.ReqType, msg.RcptTo) *rejected = true } @@ -565,7 +565,7 @@ func (Cluster) TopicProxy(msg *ClusterResp, unused *bool) error { msg.SrvMsg.uid = types.ParseUserId(msg.SrvMsg.AsUser) t.proxy <- msg } else { - log.Println("cluster: master response for unknown topic", msg.RcptTo) + logs.Warning.Println("cluster: master response for unknown topic", msg.RcptTo) } return nil @@ -580,7 +580,7 @@ func (c *Cluster) Route(msg *ClusterRoute, rejected *bool) error { if msg.Sess != nil { sid = msg.Sess.Sid } - log.Println("cluster Route: session signature mismatch", sid) + logs.Warning.Println("cluster Route: session signature mismatch", sid) *rejected = true return nil } @@ -590,7 +590,7 @@ func (c *Cluster) Route(msg *ClusterRoute, rejected *bool) error { sid = msg.Sess.Sid } // TODO: maybe panic here. - log.Println("cluster Route: nil server message", sid) + logs.Warning.Println("cluster Route: nil server message", sid) *rejected = true return nil } @@ -621,7 +621,7 @@ func (c *Cluster) UserCacheUpdate(msg *UserCacheReq, rejected *bool) error { func (c *Cluster) Ping(ping *ClusterPing, unused *bool) error { node := c.nodes[ping.Node] if node == nil { - log.Println("cluster Ping from unknown node", ping.Node) + logs.Warning.Println("cluster Ping from unknown node", ping.Node) return nil } @@ -717,14 +717,14 @@ func (c *Cluster) routeUserReq(req *UserCacheReq) error { func (c *Cluster) nodeForTopic(topic string) *ClusterNode { key := c.ring.Get(topic) if key == c.thisNodeName { - log.Println("cluster: request to route to self") + logs.Error.Println("cluster: request to route to self") // Do not route to self return nil } node := c.nodes[key] if node == nil { - log.Println("cluster: no node for topic", topic, key) + logs.Warning.Println("cluster: no node for topic", topic, key) } return node } @@ -901,7 +901,7 @@ func (c *Cluster) topicProxyGone(topicName string) error { // Returns snowflake worker id func clusterInit(configString json.RawMessage, self *string) int { if globals.cluster != nil { - log.Fatal("Cluster already initialized.") + logs.Error.Fatal("Cluster already initialized.") } // Registering variables even if it's a standalone server. Otherwise monitoring software will @@ -916,13 +916,13 @@ func clusterInit(configString json.RawMessage, self *string) int { // This is a standalone server, not initializing if len(configString) == 0 { - log.Println("Cluster: running as a standalone server.") + logs.Info.Println("Cluster: running as a standalone server.") return 1 } var config clusterConfig if err := json.Unmarshal(configString, &config); err != nil { - log.Fatal(err) + logs.Error.Fatal(err) } thisName := *self @@ -932,7 +932,7 @@ func clusterInit(configString json.RawMessage, self *string) int { // Name of the current node is not specified: clustering disabled. if thisName == "" { - log.Println("Cluster: running as a standalone server.") + logs.Info.Println("Cluster: running as a standalone server.") return 1 } @@ -943,7 +943,7 @@ func clusterInit(configString json.RawMessage, self *string) int { gob.Register(MsgAccessMode{}) if config.NumProxyEventGoRoutines != 0 { - log.Println("Cluster config: field num_proxy_event_goroutines is deprecated.") + logs.Warning.Println("Cluster config: field num_proxy_event_goroutines is deprecated.") } globals.cluster = &Cluster{ @@ -970,7 +970,7 @@ func clusterInit(configString json.RawMessage, self *string) int { if len(globals.cluster.nodes) == 0 { // Cluster needs at least two nodes. - log.Fatal("Cluster: invalid cluster size: 1") + logs.Error.Fatal("Cluster: invalid cluster size: 1") } if !globals.cluster.failoverInit(config.Failover) { @@ -988,7 +988,7 @@ func clusterInit(configString json.RawMessage, self *string) int { // Proxied session is being closed at the Master node func (sess *Session) closeRPC() { if sess.isMultiplex() { - log.Println("cluster: session proxy closed", sess.sid) + logs.Info.Println("cluster: session proxy closed", sess.sid) } } @@ -996,13 +996,13 @@ func (sess *Session) closeRPC() { func (c *Cluster) start() { addr, err := net.ResolveTCPAddr("tcp", c.listenOn) if err != nil { - log.Fatal(err) + logs.Error.Fatal(err) } c.inbound, err = net.ListenTCP("tcp", addr) if err != nil { - log.Fatal(err) + logs.Error.Fatal(err) } for _, n := range c.nodes { @@ -1017,12 +1017,12 @@ func (c *Cluster) start() { err = rpc.Register(c) if err != nil { - log.Fatal(err) + logs.Error.Fatal(err) } go rpc.Accept(c.inbound) - log.Printf("Cluster of %d nodes initialized, node '%s' is listening on [%s]", len(globals.cluster.nodes)+1, + logs.Info.Printf("Cluster of %d nodes initialized, node '%s' is listening on [%s]", len(globals.cluster.nodes)+1, globals.cluster.thisNodeName, c.listenOn) } @@ -1046,7 +1046,7 @@ func (c *Cluster) shutdown() { n.done <- true } - log.Println("Cluster shut down") + logs.Info.Println("Cluster shut down") } // Recalculate the ring hash using provided list of nodes or only nodes in a non-failed state. @@ -1144,7 +1144,7 @@ func (t *Topic) clusterSelectProxyEvent() (event ProxyEventType, s *Session, val } chosen, value, ok := reflect.Select(t.proxiedChannels) if !ok { - log.Printf("topic[%s]: clusterWriteLoop EOF - quitting", t.name) + logs.Warning.Printf("topic[%s]: clusterWriteLoop EOF - quitting", t.name) return EventAbort, nil, nil } if chosen == 0 { @@ -1152,14 +1152,14 @@ func (t *Topic) clusterSelectProxyEvent() (event ProxyEventType, s *Session, val return EventContinue, nil, nil } if len(t.proxiedSessions) == 0 { - log.Printf("topic[%s]: clusterWriteLoop - no more proxied sessions (num proxied channels: %d). Quitting.", + logs.Info.Printf("topic[%s]: clusterWriteLoop - no more proxied sessions (num proxied channels: %d). Quitting.", t.name, len(t.proxiedChannels)) return EventAbort, nil, nil } chosen-- sessionIdx := chosen / 3 if sessionIdx >= len(t.proxiedSessions) { - log.Printf("topic[%s]: clusterWriteLoop - invalid proxiedSessions index %d (num proxied sessions %d)", t.name, chosen, len(t.proxiedSessions)) + logs.Error.Printf("topic[%s]: clusterWriteLoop - invalid proxiedSessions index %d (num proxied sessions %d)", t.name, chosen, len(t.proxiedSessions)) return EventAbort, nil, nil } sess := t.proxiedSessions[sessionIdx] @@ -1188,7 +1188,7 @@ func (t *Topic) clusterWriteLoop() { } }() - log.Printf("topic[%s]: starting cluster write loop", t.name) + logs.Info.Printf("topic[%s]: starting cluster write loop", t.name) for { // t.m event, sess, value := t.clusterSelectProxyEvent() @@ -1214,11 +1214,11 @@ func (t *Topic) clusterWriteLoop() { if srvMsg.Data != nil || srvMsg.Pres != nil || srvMsg.Info != nil { response.OrigSid = "*" } else if srvMsg.Ctrl == nil { - log.Println("cluster: request type not set in clusterWriteLoop", sess.sid, + logs.Warning.Println("cluster: request type not set in clusterWriteLoop", sess.sid, srvMsg.describe(), "src_sid:", srvMsg.sess.sid) } default: - log.Panicln("cluster: unknown request type in clusterWriteLoop", srvMsg.sess.proxyReq) + logs.Error.Panicln("cluster: unknown request type in clusterWriteLoop", srvMsg.sess.proxyReq) } } @@ -1226,7 +1226,7 @@ func (t *Topic) clusterWriteLoop() { response.RcptTo = t.name if err := sess.clnode.masterToProxyAsync(response); err != nil { - log.Printf("cluster: response to proxy failed \"%s\": %s", sess.sid, err.Error()) + logs.Warning.Printf("cluster: response to proxy failed \"%s\": %s", sess.sid, err.Error()) return } case EventStop: // sess.stop diff --git a/server/cluster_leader.go b/server/cluster_leader.go index c26ae049..630bdac3 100644 --- a/server/cluster_leader.go +++ b/server/cluster_leader.go @@ -1,11 +1,12 @@ package main import ( - "log" "math/rand" "net/rpc" "sync" "time" + + "github.com/tinode/chat/server/logs" ) // Cluster methods related to leader node election. Based on ideas from Raft protocol. @@ -89,7 +90,7 @@ func (c *Cluster) failoverInit(config *clusterFailoverConfig) bool { return false } if len(c.nodes) < 2 { - log.Printf("cluster: failover disabled; need at least 3 nodes, got %d", len(c.nodes)+1) + logs.Error.Printf("cluster: failover disabled; need at least 3 nodes, got %d", len(c.nodes)+1) return false } @@ -116,7 +117,7 @@ func (c *Cluster) failoverInit(config *clusterFailoverConfig) bool { electionVote: make(chan *ClusterVote, len(c.nodes)), done: make(chan bool, 1)} - log.Println("cluster: failover mode enabled") + logs.Info.Println("cluster: failover mode enabled") return true } @@ -185,7 +186,7 @@ func (c *Cluster) sendHealthChecks() { c.invalidateProxySubs("") c.gcProxySessions(activeNodes) - log.Println("cluster: initiating failover rehash for nodes", activeNodes) + logs.Info.Println("cluster: initiating failover rehash for nodes", activeNodes) globals.hub.rehash <- true } } @@ -198,7 +199,7 @@ func (c *Cluster) electLeader() { // Make sure the current node does not report itself as a leader. statsSet("ClusterLeader", 0) - log.Println("cluster: leading new election for term", c.fo.term) + logs.Info.Println("cluster: leading new election for term", c.fo.term) nodeCount := len(c.nodes) // Number of votes needed to elect the leader @@ -245,7 +246,7 @@ func (c *Cluster) electLeader() { // Current node elected as the leader. c.fo.leader = c.thisNodeName statsSet("ClusterLeader", 1) - log.Printf("'%s' elected self as a new leader", c.thisNodeName) + logs.Info.Printf("'%s' elected self as a new leader", c.thisNodeName) } } @@ -281,21 +282,21 @@ func (c *Cluster) run() { if health.Term < c.fo.term { // This is a health check from a stale leader. Ignore. - log.Println("cluster: health check from a stale leader", health.Term, c.fo.term, health.Leader, c.fo.leader) + logs.Warning.Println("cluster: health check from a stale leader", health.Term, c.fo.term, health.Leader, c.fo.leader) continue } if health.Term > c.fo.term { c.fo.term = health.Term c.fo.leader = health.Leader - log.Printf("cluster: leader '%s' elected", c.fo.leader) + logs.Info.Printf("cluster: leader '%s' elected", c.fo.leader) } else if health.Leader != c.fo.leader { if c.fo.leader != "" { // Wrong leader. It's a bug, should never happen! - log.Printf("cluster: wrong leader '%s' while expecting '%s'; term %d", + logs.Error.Printf("cluster: wrong leader '%s' while expecting '%s'; term %d", health.Leader, c.fo.leader, health.Term) } else { - log.Printf("cluster: leader set to '%s'", health.Leader) + logs.Info.Printf("cluster: leader set to '%s'", health.Leader) } c.fo.leader = health.Leader } @@ -306,7 +307,7 @@ func (c *Cluster) run() { missed = 0 if health.Signature != c.ring.Signature() { if rehashSkipped { - log.Println("cluster: rehashing at a request of", + logs.Info.Println("cluster: rehashing at a request of", health.Leader, health.Nodes, health.Signature, c.ring.Signature()) c.rehash(health.Nodes) c.invalidateProxySubs("") @@ -323,7 +324,7 @@ func (c *Cluster) run() { if c.fo.term < vreq.req.Term { // This is a new election. This node has not voted yet. Vote for the requestor and // clear the current leader. - log.Printf("Voting YES for %s, my term %d, vote term %d", vreq.req.Node, c.fo.term, vreq.req.Term) + logs.Info.Printf("Voting YES for %s, my term %d, vote term %d", vreq.req.Node, c.fo.term, vreq.req.Term) c.fo.term = vreq.req.Term c.fo.leader = "" // Election means these is no leader yet. @@ -331,7 +332,7 @@ func (c *Cluster) run() { vreq.resp <- ClusterVoteResponse{Result: true, Term: c.fo.term} } else { // This node has voted already or stale election, reject. - log.Printf("Voting NO for %s, my term %d, vote term %d", vreq.req.Node, c.fo.term, vreq.req.Term) + logs.Info.Printf("Voting NO for %s, my term %d, vote term %d", vreq.req.Node, c.fo.term, vreq.req.Term) vreq.resp <- ClusterVoteResponse{Result: false, Term: c.fo.term} } case <-c.fo.done: diff --git a/server/db/mongodb/adapter.go b/server/db/mongodb/adapter.go index a238b9e4..d15a67a2 100644 --- a/server/db/mongodb/adapter.go +++ b/server/db/mongodb/adapter.go @@ -8,12 +8,12 @@ import ( "crypto/tls" "encoding/json" "errors" - "log" "strconv" "strings" "time" "github.com/tinode/chat/server/auth" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store" t "github.com/tinode/chat/server/store/types" b "go.mongodb.org/mongo-driver/bson" @@ -114,7 +114,7 @@ func (a *adapter) Open(jsonconfig json.RawMessage) error { } if config.ReplicaSet == "" { - log.Println("MongoDB configured as standalone or replica_set option not set. Transaction support is disabled.") + logs.Info.Println("MongoDB configured as standalone or replica_set option not set. Transaction support is disabled.") } else { opts.SetReplicaSet(config.ReplicaSet) a.useTransactions = true @@ -253,7 +253,7 @@ func (a *adapter) SetMaxResults(val int) error { // CreateDb creates the database optionally dropping an existing database first. func (a *adapter) CreateDb(reset bool) error { if reset { - log.Print("Dropping database...") + logs.Info.Print("Dropping database...") if err := a.db.Drop(a.ctx); err != nil { return err } diff --git a/server/db/mongodb/tests/mongo_test.go b/server/db/mongodb/tests/mongo_test.go index 4b561209..c22a76ec 100644 --- a/server/db/mongodb/tests/mongo_test.go +++ b/server/db/mongodb/tests/mongo_test.go @@ -27,9 +27,8 @@ import ( mdb "go.mongodb.org/mongo-driver/mongo" mdbopts "go.mongodb.org/mongo-driver/mongo/options" - //backend "github.com/tinode/chat/server/db/rethinkdb" - //backend "github.com/tinode/chat/server/db/mysql" backend "github.com/tinode/chat/server/db/mongodb" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store/types" ) @@ -1223,6 +1222,7 @@ func initConnectionToDb() { } func init() { + logs.Init() adp = backend.GetAdapter() conffile := flag.String("config", "./test.conf", "config of the database connection") diff --git a/server/hdl_files.go b/server/hdl_files.go index 5e75683a..40b2b78c 100644 --- a/server/hdl_files.go +++ b/server/hdl_files.go @@ -13,11 +13,11 @@ import ( "encoding/json" "errors" "io" - "log" "net/http" "strings" "time" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" ) @@ -34,7 +34,7 @@ func largeFileServe(wrt http.ResponseWriter, req *http.Request) { wrt.WriteHeader(msg.Ctrl.Code) enc.Encode(msg) if err != nil { - log.Println("media serve", err) + logs.Warning.Println("media serve", err) } } @@ -67,7 +67,7 @@ func largeFileServe(wrt http.ResponseWriter, req *http.Request) { wrt.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") wrt.WriteHeader(http.StatusTemporaryRedirect) enc.Encode(InfoFound("", "", now)) - log.Println("media serve redirected", redirTo) + logs.Info.Println("media serve redirected", redirTo) return } else if err != nil { writeHttpResponse(decodeStoreError(err, "", "", now, nil), err) @@ -86,12 +86,12 @@ func largeFileServe(wrt http.ResponseWriter, req *http.Request) { wrt.Header().Set("Content-Disposition", "attachment") http.ServeContent(wrt, req, "", fd.UpdatedAt, rsc) - log.Println("media served OK") + logs.Info.Println("media served OK") } // largeFileReceive receives files from client over HTTP(S) and passes them to the configured media handler. func largeFileReceive(wrt http.ResponseWriter, req *http.Request) { - log.Println("Upload request", req.RequestURI) + logs.Info.Println("Upload request", req.RequestURI) now := types.TimeNow() enc := json.NewEncoder(wrt) @@ -104,7 +104,7 @@ func largeFileReceive(wrt http.ResponseWriter, req *http.Request) { wrt.WriteHeader(msg.Ctrl.Code) enc.Encode(msg) - log.Println("media upload:", msg.Ctrl.Code, msg.Ctrl.Text, "/", err) + logs.Info.Println("media upload:", msg.Ctrl.Code, msg.Ctrl.Text, "/", err) } // Check if this is a POST or a PUT request. @@ -148,7 +148,7 @@ func largeFileReceive(wrt http.ResponseWriter, req *http.Request) { wrt.WriteHeader(http.StatusTemporaryRedirect) enc.Encode(InfoFound("", "", now)) - log.Println("media upload redirected", redirTo) + logs.Info.Println("media upload redirected", redirTo) return } else if err != nil { writeHttpResponse(decodeStoreError(err, "", "", now, nil), err) @@ -199,7 +199,7 @@ func largeFileRunGarbageCollection(period time.Duration, block int) chan<- bool select { case <-gcTimer: if err := store.Files.DeleteUnused(time.Now().Add(-time.Hour), block); err != nil { - log.Println("media gc:", err) + logs.Warning.Println("media gc:", err) } case <-stop: return diff --git a/server/hdl_grpc.go b/server/hdl_grpc.go index 65e16cbe..ec459eaa 100644 --- a/server/hdl_grpc.go +++ b/server/hdl_grpc.go @@ -12,10 +12,10 @@ package main import ( "crypto/tls" "io" - "log" "time" "github.com/tinode/chat/pbx" + "github.com/tinode/chat/server/logs" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" @@ -39,7 +39,7 @@ func (*grpcNodeServer) MessageLoop(stream pbx.Node_MessageLoopServer) error { if p, ok := peer.FromContext(stream.Context()); ok { sess.remoteAddr = p.Addr.String() } - log.Println("grpc: session started", sess.sid, sess.remoteAddr, count) + logs.Info.Println("grpc: session started", sess.sid, sess.remoteAddr, count) defer func() { sess.closeGrpc() @@ -54,10 +54,10 @@ func (*grpcNodeServer) MessageLoop(stream pbx.Node_MessageLoopServer) error { return nil } if err != nil { - log.Println("grpc: recv", sess.sid, err) + logs.Error.Println("grpc: recv", sess.sid, err) return err } - log.Println("grpc in:", truncateStringIfTooLong(in.String()), sess.sid) + logs.Info.Println("grpc in:", truncateStringIfTooLong(in.String()), sess.sid) statsInc("IncomingMessagesGrpcTotal", 1) sess.dispatch(pbCliDeserialize(in)) @@ -86,12 +86,12 @@ func (sess *Session) writeGrpcLoop() { return } if len(sess.send) > sendQueueLimit { - log.Println("grpc: outbound queue limit exceeded", sess.sid) + logs.Error.Println("grpc: outbound queue limit exceeded", sess.sid) return } statsInc("OutgoingMessagesGrpcTotal", 1) if err := grpcWrite(sess, msg); err != nil { - log.Println("grpc: write", sess.sid, err) + logs.Error.Println("grpc: write", sess.sid, err) return } @@ -157,11 +157,11 @@ func serveGrpc(addr string, kaEnabled bool, tlsConf *tls.Config) (*grpc.Server, srv := grpc.NewServer(opts...) pbx.RegisterNodeServer(srv, &grpcNodeServer{}) - log.Printf("gRPC/%s%s server is registered at [%s]", grpc.Version, secure, addr) + logs.Info.Printf("gRPC/%s%s server is registered at [%s]", grpc.Version, secure, addr) go func() { if err := srv.Serve(lis); err != nil { - log.Println("gRPC server failed:", err) + logs.Error.Println("gRPC server failed:", err) } }() diff --git a/server/hdl_longpoll.go b/server/hdl_longpoll.go index f3f89358..81b434d5 100644 --- a/server/hdl_longpoll.go +++ b/server/hdl_longpoll.go @@ -13,9 +13,10 @@ import ( "encoding/json" "errors" "io/ioutil" - "log" "net/http" "time" + + "github.com/tinode/chat/server/logs" ) func (sess *Session) writeOnce(wrt http.ResponseWriter, req *http.Request) { @@ -25,11 +26,11 @@ func (sess *Session) writeOnce(wrt http.ResponseWriter, req *http.Request) { case msg, ok := <-sess.send: if ok { if len(sess.send) > sendQueueLimit { - log.Println("longPoll: outbound queue limit exceeded", sess.sid) + logs.Error.Println("longPoll: outbound queue limit exceeded", sess.sid) } else { statsInc("OutgoingMessagesLongpollTotal", 1) if err := lpWrite(wrt, msg); err != nil { - log.Println("longPoll: writeOnce failed", sess.sid, err) + logs.Error.Println("longPoll: writeOnce failed", sess.sid, err) } } } @@ -58,7 +59,7 @@ func (sess *Session) writeOnce(wrt http.ResponseWriter, req *http.Request) { case <-time.After(pingPeriod): // just write an empty packet on timeout if _, err := wrt.Write([]byte{}); err != nil { - log.Println("longPoll: writeOnce: timout", sess.sid, err) + logs.Error.Println("longPoll: writeOnce: timout", sess.sid, err) } return @@ -141,7 +142,7 @@ func serveLongPoll(wrt http.ResponseWriter, req *http.Request) { var count int sess, count = globals.sessionStore.NewSession(wrt, "") sess.remoteAddr = getRemoteAddr(req) - log.Println("longPoll: session started", sess.sid, sess.remoteAddr, count) + logs.Info.Println("longPoll: session started", sess.sid, sess.remoteAddr, count) wrt.WriteHeader(http.StatusCreated) pkt := NoErrCreated(req.FormValue("id"), "", now) @@ -156,7 +157,7 @@ func serveLongPoll(wrt http.ResponseWriter, req *http.Request) { // Existing session sess = globals.sessionStore.Get(sid) if sess == nil { - log.Println("longPoll: invalid or expired session id", sid) + logs.Warning.Println("longPoll: invalid or expired session id", sid) wrt.WriteHeader(http.StatusForbidden) enc.Encode(ErrSessionNotFound(now)) return @@ -165,13 +166,13 @@ func serveLongPoll(wrt http.ResponseWriter, req *http.Request) { addr := getRemoteAddr(req) if sess.remoteAddr != addr { sess.remoteAddr = addr - log.Println("longPoll: remote address changed", sid, addr) + logs.Warning.Println("longPoll: remote address changed", sid, addr) } if req.ContentLength != 0 { // Read payload and send it for processing. if code, err := sess.readOnce(wrt, req); err != nil { - log.Println("longPoll: readOnce failed", sess.sid, err) + logs.Warning.Println("longPoll: readOnce failed", sess.sid, err) // Failed to read request, report an error, if possible if code != 0 { wrt.WriteHeader(code) diff --git a/server/hdl_websock.go b/server/hdl_websock.go index 8ac89c18..992665b2 100644 --- a/server/hdl_websock.go +++ b/server/hdl_websock.go @@ -11,11 +11,11 @@ package main import ( "encoding/json" - "log" "net/http" "time" "github.com/gorilla/websocket" + "github.com/tinode/chat/server/logs" ) const ( @@ -54,7 +54,7 @@ func (sess *Session) readLoop() { if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { - log.Println("ws: readLoop", sess.sid, err) + logs.Error.Println("ws: readLoop", sess.sid, err) } return } @@ -80,14 +80,14 @@ func (sess *Session) writeLoop() { return } if len(sess.send) > sendQueueLimit { - log.Println("ws: outbound queue limit exceeded", sess.sid) + logs.Error.Println("ws: outbound queue limit exceeded", sess.sid) return } statsInc("OutgoingMessagesWebsockTotal", 1) if err := wsWrite(sess.ws, websocket.TextMessage, msg); err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { - log.Println("ws: writeLoop", sess.sid, err) + logs.Error.Println("ws: writeLoop", sess.sid, err) } return } @@ -112,7 +112,7 @@ func (sess *Session) writeLoop() { if err := wsWrite(sess.ws, websocket.PingMessage, nil); err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { - log.Println("ws: writeLoop ping", sess.sid, err) + logs.Error.Println("ws: writeLoop ping", sess.sid, err) } return } @@ -146,23 +146,23 @@ func serveWebSocket(wrt http.ResponseWriter, req *http.Request) { if isValid, _ := checkAPIKey(getAPIKey(req)); !isValid { wrt.WriteHeader(http.StatusForbidden) json.NewEncoder(wrt).Encode(ErrAPIKeyRequired(now)) - log.Println("ws: Missing, invalid or expired API key") + logs.Error.Println("ws: Missing, invalid or expired API key") return } if req.Method != http.MethodGet { wrt.WriteHeader(http.StatusMethodNotAllowed) json.NewEncoder(wrt).Encode(ErrOperationNotAllowed("", "", now)) - log.Println("ws: Invalid HTTP method", req.Method) + logs.Error.Println("ws: Invalid HTTP method", req.Method) return } ws, err := upgrader.Upgrade(wrt, req, nil) if _, ok := err.(websocket.HandshakeError); ok { - log.Println("ws: Not a websocket handshake") + logs.Error.Println("ws: Not a websocket handshake") return } else if err != nil { - log.Println("ws: failed to Upgrade ", err) + logs.Error.Println("ws: failed to Upgrade ", err) return } @@ -174,7 +174,7 @@ func serveWebSocket(wrt http.ResponseWriter, req *http.Request) { sess.remoteAddr = req.RemoteAddr } - log.Println("ws: session started", sess.sid, sess.remoteAddr, count) + logs.Info.Println("ws: session started", sess.sid, sess.remoteAddr, count) // Do work in goroutines to return from serveWebSocket() to release file pointers. // Otherwise "too many open files" will happen. diff --git a/server/http.go b/server/http.go index 90d8290c..98c93afe 100644 --- a/server/http.go +++ b/server/http.go @@ -14,7 +14,6 @@ import ( "encoding/base64" "encoding/json" "errors" - "log" "net" "net/http" "net/url" @@ -25,6 +24,7 @@ import ( "syscall" "time" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" ) @@ -54,7 +54,7 @@ func listenAndServe(addr string, mux *http.ServeMux, tlfConf *tls.Config, stop < if isUnixAddr(globals.tlsRedirectHTTP) || isUnixAddr(addr) { err = errors.New("HTTP to HTTPS redirect: unix sockets not supported.") } else { - log.Printf("Redirecting connections from HTTP at [%s] to HTTPS at [%s]", + logs.Info.Printf("Redirecting connections from HTTP at [%s] to HTTPS at [%s]", globals.tlsRedirectHTTP, addr) // This is a second HTTP server listenning on a different port. @@ -63,7 +63,7 @@ func listenAndServe(addr string, mux *http.ServeMux, tlfConf *tls.Config, stop < } if err == nil { - log.Printf("Listening for client HTTPS connections on [%s]", addr) + logs.Info.Printf("Listening for client HTTPS connections on [%s]", addr) var lis net.Listener lis, err = netListener(addr) if err == nil { @@ -71,7 +71,7 @@ func listenAndServe(addr string, mux *http.ServeMux, tlfConf *tls.Config, stop < } } } else { - log.Printf("Listening for client HTTP connections on [%s]", addr) + logs.Info.Printf("Listening for client HTTP connections on [%s]", addr) var lis net.Listener lis, err = netListener(addr) if err == nil { @@ -81,9 +81,9 @@ func listenAndServe(addr string, mux *http.ServeMux, tlfConf *tls.Config, stop < if err != nil { if globals.shuttingDown { - log.Println("HTTP server: stopped") + logs.Info.Println("HTTP server: stopped") } else { - log.Println("HTTP server: failed", err) + logs.Error.Println("HTTP server: failed", err) } } httpdone <- true @@ -100,7 +100,7 @@ Loop: ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) if err := server.Shutdown(ctx); err != nil { // failure/timeout shutting down the server gracefully - log.Println("HTTP server failed to terminate gracefully", err) + logs.Error.Println("HTTP server failed to terminate gracefully", err) } // While the server shuts down, termianate all sessions. @@ -153,7 +153,7 @@ func signalHandler() <-chan bool { go func() { // Wait for a signal. Don't care which signal it is sig := <-signchan - log.Printf("Signal received: '%s', shutting down", sig) + logs.Info.Printf("Signal received: '%s', shutting down", sig) stop <- true }() @@ -349,7 +349,7 @@ func authHttpRequest(req *http.Request) (types.Uid, []byte, error) { } uid = rec.Uid } else { - log.Println("fileUpload: auth data is present but handler is not found", authMethod) + logs.Info.Println("fileUpload: auth data is present but handler is not found", authMethod) } } else { // Find the session, make sure it's appropriately authenticated. diff --git a/server/http_pprof.go b/server/http_pprof.go index 8cdcf217..d12a4c80 100644 --- a/server/http_pprof.go +++ b/server/http_pprof.go @@ -6,11 +6,12 @@ package main import ( "fmt" - "log" "net/http" "path" "runtime/pprof" "strings" + + "github.com/tinode/chat/server/logs" ) var pprofHttpRoot string @@ -24,7 +25,7 @@ func servePprof(mux *http.ServeMux, serveAt string) { pprofHttpRoot = path.Clean("/"+serveAt) + "/" mux.HandleFunc(pprofHttpRoot, profileHandler) - log.Printf("pprof: profiling info exposed at '%s'", pprofHttpRoot) + logs.Info.Printf("pprof: profiling info exposed at '%s'", pprofHttpRoot) } func profileHandler(wrt http.ResponseWriter, req *http.Request) { diff --git a/server/hub.go b/server/hub.go index 9d25d0f0..fb723692 100644 --- a/server/hub.go +++ b/server/hub.go @@ -10,12 +10,13 @@ package main import ( - "log" + //"log" "strings" "sync" "time" "github.com/tinode/chat/server/auth" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" ) @@ -215,7 +216,7 @@ func (h *Hub) run() { join.sess.inflightReqs.Done() } join.sess.queueOut(ErrServiceUnavailableReply(join.pkt, join.pkt.Timestamp)) - log.Println("hub.join loop: topic's reg queue full", join.pkt.RcptTo, join.sess.sid, " - total queue len:", len(t.reg)) + logs.Error.Println("hub.join loop: topic's reg queue full", join.pkt.RcptTo, join.sess.sid, " - total queue len:", len(t.reg)) } } @@ -229,16 +230,16 @@ func (h *Hub) run() { select { case dst.broadcast <- msg: default: - log.Println("hub: topic's broadcast queue is full", dst.name) + logs.Error.Println("hub: topic's broadcast queue is full", dst.name) } } else { - log.Println("hub: invalid topic category for broadcast", dst.name) + logs.Warning.Println("hub: invalid topic category for broadcast", dst.name) } } else if (strings.HasPrefix(msg.RcptTo, "usr") || strings.HasPrefix(msg.RcptTo, "grp")) && globals.cluster.isRemoteTopic(msg.RcptTo) { // It is a remote topic. if err := globals.cluster.routeToTopicIntraCluster(msg.RcptTo, msg, msg.sess); err != nil { - log.Printf("hub: routing to '%s' failed", msg.RcptTo) + logs.Warning.Printf("hub: routing to '%s' failed", msg.RcptTo) } } else if msg.Pres == nil && msg.Info == nil { // Topic is unknown or offline. @@ -246,7 +247,7 @@ func (h *Hub) run() { // TODO(gene): validate topic name, discarding invalid topics - log.Printf("Hub. Topic[%s] is unknown or offline", msg.RcptTo) + logs.Info.Printf("Hub. Topic[%s] is unknown or offline", msg.RcptTo) msg.sess.queueOut(NoErrAcceptedExplicitTs(msg.Id, msg.RcptTo, types.TimeNow(), msg.Timestamp)) } @@ -276,7 +277,7 @@ func (h *Hub) run() { if unreg.forUser.IsZero() { // The topic is being garbage collected or deleted. if err := h.topicUnreg(unreg.sess, unreg.rcptTo, unreg.pkt, reason); err != nil { - log.Println("hub.topicUnreg failed:", err) + logs.Error.Println("hub.topicUnreg failed:", err) } } else { go h.stopTopicsForUser(unreg.forUser, reason, unreg.done) @@ -321,7 +322,7 @@ func (h *Hub) run() { <-topicsdone } - log.Printf("Hub shutdown completed with %d topics", topicCount) + logs.Info.Printf("Hub shutdown completed with %d topics", topicCount) // let the main goroutine know we are done with the cleanup hubdone <- true @@ -567,7 +568,7 @@ func replyOfflineTopicGetDesc(sess *Session, msg *ClientComMessage) { if strings.HasPrefix(topic, "grp") { stopic, err := store.Topics.Get(topic) if err != nil { - log.Println("replyOfflineTopicGetDesc", err) + logs.Info.Println("replyOfflineTopicGetDesc", err) sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Id, msg.Original, now, msg.Timestamp, nil)) return } @@ -603,7 +604,7 @@ func replyOfflineTopicGetDesc(sess *Session, msg *ClientComMessage) { } if uid.IsZero() { - log.Println("replyOfflineTopicGetDesc: malformed p2p topic name") + logs.Warning.Println("replyOfflineTopicGetDesc: malformed p2p topic name") sess.queueOut(ErrMalformedReply(msg, now)) return } @@ -627,7 +628,7 @@ func replyOfflineTopicGetDesc(sess *Session, msg *ClientComMessage) { sub, err := store.Subs.Get(topic, asUid) if err != nil { - log.Println("replyOfflineTopicGetDesc:", err) + logs.Warning.Println("replyOfflineTopicGetDesc:", err) sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Id, msg.Original, now, msg.Timestamp, nil)) return } @@ -658,7 +659,7 @@ func replyOfflineTopicGetSub(sess *Session, msg *ClientComMessage) { ssub, err := store.Subs.Get(msg.RcptTo, types.ParseUserId(msg.AsUser)) if err != nil { - log.Println("replyOfflineTopicGetSub:", err) + logs.Warning.Println("replyOfflineTopicGetSub:", err) sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Id, msg.Original, now, msg.Timestamp, nil)) return } @@ -712,7 +713,7 @@ func replyOfflineTopicSetSub(sess *Session, msg *ClientComMessage) { sub, err := store.Subs.Get(msg.RcptTo, asUid) if err != nil { - log.Println("replyOfflineTopicSetSub get sub:", err) + logs.Warning.Println("replyOfflineTopicSetSub get sub:", err) sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Id, msg.Original, now, msg.Timestamp, nil)) return } @@ -735,7 +736,7 @@ func replyOfflineTopicSetSub(sess *Session, msg *ClientComMessage) { if msg.Set.Sub != nil && msg.Set.Sub.Mode != "" { var modeWant types.AccessMode if err = modeWant.UnmarshalText([]byte(msg.Set.Sub.Mode)); err != nil { - log.Println("replyOfflineTopicSetSub mode:", err) + logs.Warning.Println("replyOfflineTopicSetSub mode:", err) sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Id, msg.Original, now, msg.Timestamp, nil)) return } @@ -762,7 +763,7 @@ func replyOfflineTopicSetSub(sess *Session, msg *ClientComMessage) { if len(update) > 0 { err = store.Subs.Update(msg.RcptTo, asUid, update, true) if err != nil { - log.Println("replyOfflineTopicSetSub update:", err) + logs.Warning.Println("replyOfflineTopicSetSub update:", err) sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Id, msg.Original, now, msg.Timestamp, nil)) } else { var params interface{} diff --git a/server/init_topic.go b/server/init_topic.go index f19c6357..8c48eb0c 100644 --- a/server/init_topic.go +++ b/server/init_topic.go @@ -9,10 +9,10 @@ package main import ( - "log" "strings" "github.com/tinode/chat/server/auth" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" ) @@ -62,7 +62,7 @@ func topicInit(t *Topic, join *sessionJoin, h *Hub) { // Remove topic from cache to prevent hub from forwarding more messages to it. h.topicDel(join.pkt.RcptTo) - log.Println("init_topic: failed to load or create topic:", join.pkt.RcptTo, err) + logs.Error.Println("init_topic: failed to load or create topic:", join.pkt.RcptTo, err) join.sess.queueOut(decodeStoreErrorExplicitTs(err, join.pkt.Id, t.xoriginal, timestamp, join.pkt.Timestamp, nil)) // Re-queue pending requests to join the topic. @@ -247,7 +247,7 @@ func initTopicP2P(t *Topic, sreg *sessionJoin) error { // Case 3, fail if len(subs) == 0 { - log.Println("hub: missing both subscriptions for '" + t.name + "' (SHOULD NEVER HAPPEN!)") + logs.Error.Println("hub: missing both subscriptions for '" + t.name + "' (SHOULD NEVER HAPPEN!)") return types.ErrInternal } @@ -346,7 +346,7 @@ func initTopicP2P(t *Topic, sreg *sessionJoin) error { // The other user is assumed to have auth level "Auth". sub2.ModeGiven = users[u1].Access.Auth if err := sub2.ModeGiven.UnmarshalText([]byte(pktsub.Set.Desc.DefaultAcs.Auth)); err != nil { - log.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Desc.DefaultAcs.Auth) + logs.Error.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Desc.DefaultAcs.Auth) } } else { // Use user1.Auth as modeGiven for the other user @@ -385,11 +385,11 @@ func initTopicP2P(t *Topic, sreg *sessionJoin) error { if uid != userID1 { // Report the error and ignore the value - log.Println("hub: setting mode for another user is not supported '" + t.name + "'") + logs.Error.Println("hub: setting mode for another user is not supported '" + t.name + "'") } else { // user1 is setting non-default modeWant if err := userData.modeWant.UnmarshalText([]byte(pktsub.Set.Sub.Mode)); err != nil { - log.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Sub.Mode) + logs.Error.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Sub.Mode) } // Ensure sanity userData.modeWant = userData.modeWant&types.ModeCP2P | types.ModeApprove @@ -531,9 +531,9 @@ func initTopicNewGrp(t *Topic, sreg *sessionJoin, isChan bool) error { } else { t.accessAnon = anonMode } - log.Println("hub: invalid access mode for topic '" + t.name + "': '" + err.Error() + "'") + logs.Error.Println("hub: invalid access mode for topic '" + t.name + "': '" + err.Error() + "'") } else if authMode.IsOwner() || anonMode.IsOwner() { - log.Println("hub: OWNER default access in topic '" + t.name) + logs.Error.Println("hub: OWNER default access in topic '" + t.name) t.accessAuth, t.accessAnon = authMode & ^types.ModeOwner, anonMode & ^types.ModeOwner } else { t.accessAuth, t.accessAnon = authMode, anonMode @@ -545,7 +545,7 @@ func initTopicNewGrp(t *Topic, sreg *sessionJoin, isChan bool) error { if pktsub.Set.Sub != nil && pktsub.Set.Sub.Mode != "" { userData.modeWant = types.ModeCFull if err := userData.modeWant.UnmarshalText([]byte(pktsub.Set.Sub.Mode)); err != nil { - log.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Sub.Mode) + logs.Error.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Sub.Mode) } // User must not unset ModeJoin or the owner flags userData.modeWant |= types.ModeJoin | types.ModeOwner diff --git a/server/logs/logs.go b/server/logs/logs.go new file mode 100644 index 00000000..b2ce0950 --- /dev/null +++ b/server/logs/logs.go @@ -0,0 +1,24 @@ +/****************************************************************************** + * + * Description : + * Package exposes info, warning and error loggers. + * + *****************************************************************************/ +package logs + +import ( + "log" + "os" +) + +var ( + Info *log.Logger + Warning *log.Logger + Error *log.Logger +) + +func Init() { + Info = log.New(os.Stdout, "I", log.LstdFlags | log.Lshortfile) + Warning = log.New(os.Stdout, "W", log.LstdFlags | log.Lshortfile) + Error = log.New(os.Stdout, "E", log.LstdFlags | log.Lshortfile) +} diff --git a/server/main.go b/server/main.go index 00ee918d..2e7f8352 100644 --- a/server/main.go +++ b/server/main.go @@ -13,7 +13,6 @@ package main import ( "encoding/json" "flag" - "log" "net/http" "os" "path/filepath" @@ -39,6 +38,8 @@ import ( _ "github.com/tinode/chat/server/db/mysql" _ "github.com/tinode/chat/server/db/rethinkdb" + "github.com/tinode/chat/server/logs" + // Push notifications "github.com/tinode/chat/server/push" _ "github.com/tinode/chat/server/push/fcm" @@ -255,14 +256,13 @@ type configType struct { func main() { executable, _ := os.Executable() - // Prepend log lines with file name and line number. - log.SetFlags(log.LstdFlags | log.Lshortfile) + logs.Init() // All relative paths are resolved against the executable path, not against current working directory. // Absolute paths are left unchanged. rootpath, _ := filepath.Split(executable) - log.Printf("Server v%s:%s:%s; pid %d; %d process(es)", + logs.Info.Printf("Server v%s:%s:%s; pid %d; %d process(es)", currentVersion, executable, buildstamp, os.Getpid(), runtime.GOMAXPROCS(runtime.NumCPU())) @@ -280,25 +280,25 @@ func main() { flag.Parse() *configfile = toAbsolutePath(rootpath, *configfile) - log.Printf("Using config from '%s'", *configfile) + logs.Info.Printf("Using config from '%s'", *configfile) var config configType if file, err := os.Open(*configfile); err != nil { - log.Fatal("Failed to read config file: ", err) + logs.Error.Fatal("Failed to read config file: ", err) } else { jr := jcr.New(file) if err = json.NewDecoder(jr).Decode(&config); err != nil { switch jerr := err.(type) { case *json.UnmarshalTypeError: lnum, cnum, _ := jr.LineAndChar(jerr.Offset) - log.Fatalf("Unmarshall error in config file in %s at %d:%d (offset %d bytes): %s", + logs.Error.Fatalf("Unmarshall error in config file in %s at %d:%d (offset %d bytes): %s", jerr.Field, lnum, cnum, jerr.Offset, jerr.Error()) case *json.SyntaxError: lnum, cnum, _ := jr.LineAndChar(jerr.Offset) - log.Fatalf("Syntax error in config file at %d:%d (offset %d bytes): %s", + logs.Error.Fatalf("Syntax error in config file at %d:%d (offset %d bytes): %s", lnum, cnum, jerr.Offset, jerr.Error()) default: - log.Fatal("Failed to parse config file: ", err) + logs.Error.Fatal("Failed to parse config file: ", err) } } file.Close() @@ -336,13 +336,13 @@ func main() { cpuf, err := os.Create(*pprofFile + ".cpu") if err != nil { - log.Fatal("Failed to create CPU pprof file: ", err) + logs.Error.Fatal("Failed to create CPU pprof file: ", err) } defer cpuf.Close() memf, err := os.Create(*pprofFile + ".mem") if err != nil { - log.Fatal("Failed to create Mem pprof file: ", err) + logs.Error.Fatal("Failed to create Mem pprof file: ", err) } defer memf.Close() @@ -350,18 +350,18 @@ func main() { defer pprof.StopCPUProfile() defer pprof.WriteHeapProfile(memf) - log.Printf("Profiling info saved to '%s.(cpu|mem)'", *pprofFile) + logs.Info.Printf("Profiling info saved to '%s.(cpu|mem)'", *pprofFile) } err := store.Open(workerId, config.Store) if err != nil { - log.Fatal("Failed to connect to DB: ", err) + logs.Error.Fatal("Failed to connect to DB: ", err) } - log.Println("DB adapter", store.GetAdapterName()) + logs.Info.Println("DB adapter", store.GetAdapterName()) defer func() { store.Close() - log.Println("Closed database connection(s)") - log.Println("All done, good bye") + logs.Info.Println("Closed database connection(s)") + logs.Info.Println("All done, good bye") }() // API key signing secret @@ -369,7 +369,7 @@ func main() { err = store.InitAuthLogicalNames(config.Auth["logical_names"]) if err != nil { - log.Fatal(err) + logs.Error.Fatal(err) } // List of tag namespaces for user discovery which cannot be changed directly @@ -379,18 +379,18 @@ func main() { authNames := store.GetAuthNames() for _, name := range authNames { if authhdl := store.GetLogicalAuthHandler(name); authhdl == nil { - log.Fatalln("Unknown authenticator", name) + logs.Error.Fatalln("Unknown authenticator", name) } else if jsconf := config.Auth[name]; jsconf != nil { if err := authhdl.Init(jsconf, name); err != nil { - log.Fatalln("Failed to init auth scheme", name+":", err) + logs.Error.Fatalln("Failed to init auth scheme", name+":", err) } tags, err := authhdl.RestrictedTags() if err != nil { - log.Fatalln("Failed get restricted tag namespaces (prefixes)", name+":", err) + logs.Error.Fatalln("Failed get restricted tag namespaces (prefixes)", name+":", err) } for _, tag := range tags { if strings.Contains(tag, ":") { - log.Fatalln("tags restricted by auth handler should not contain character ':'", tag) + logs.Error.Fatalln("tags restricted by auth handler should not contain character ':'", tag) } globals.immutableTagNS[tag] = true } @@ -403,7 +403,7 @@ func main() { // The namespace can be restricted even if the validator is disabled. if vconf.AddToTags { if strings.Contains(name, ":") { - log.Fatalln("acc_validation names should not contain character ':'", name) + logs.Error.Fatalln("acc_validation names should not contain character ':'", name) } globals.immutableTagNS[name] = true } @@ -418,7 +418,7 @@ func main() { lvl := auth.ParseAuthLevel(req) if lvl == auth.LevelNone { if req != "" { - log.Fatalf("Invalid required AuthLevel '%s' in validator '%s'", req, name) + logs.Error.Fatalf("Invalid required AuthLevel '%s' in validator '%s'", req, name) } // Skip empty string continue @@ -436,9 +436,9 @@ func main() { } if val := store.GetValidator(name); val == nil { - log.Fatal("Config provided for an unknown validator '" + name + "'") + logs.Error.Fatal("Config provided for an unknown validator '" + name + "'") } else if err = val.Init(string(vconf.Config)); err != nil { - log.Fatal("Failed to init validator '"+name+"': ", err) + logs.Error.Fatal("Failed to init validator '"+name+"': ", err) } if globals.validators == nil { globals.validators = make(map[string]credValidator) @@ -452,7 +452,7 @@ func main() { globals.maskedTagNS = make(map[string]bool, len(config.MaskedTagNamespaces)) for _, tag := range config.MaskedTagNamespaces { if strings.Contains(tag, ":") { - log.Fatal("masked_tags namespaces should not contain character ':'", tag) + logs.Error.Fatal("masked_tags namespaces should not contain character ':'", tag) } globals.maskedTagNS[tag] = true } @@ -462,14 +462,14 @@ func main() { tags = append(tags, "'"+tag+"'") } if len(tags) > 0 { - log.Println("Restricted tags:", tags) + logs.Info.Println("Restricted tags:", tags) } tags = nil for tag := range globals.maskedTagNS { tags = append(tags, "'"+tag+"'") } if len(tags) > 0 { - log.Println("Masked tags:", tags) + logs.Info.Println("Masked tags:", tags) } // Maximum message size @@ -505,7 +505,7 @@ func main() { conf = string(params) } if err = store.UseMediaHandler(config.Media.UseHandler, conf); err != nil { - log.Fatalf("Failed to init media handler '%s': %s", config.Media.UseHandler, err) + logs.Error.Fatalf("Failed to init media handler '%s': %s", config.Media.UseHandler, err) } } if config.Media.GcPeriod > 0 && config.Media.GcBlockSize > 0 { @@ -513,7 +513,7 @@ func main() { config.Media.GcBlockSize) defer func() { stopFilesGc <- true - log.Println("Stopped files garbage collector") + logs.Info.Println("Stopped files garbage collector") }() } } @@ -521,11 +521,11 @@ func main() { err = push.Init(string(config.Push)) if err != nil { - log.Fatal("Failed to initialize push notifications:", err) + logs.Error.Fatal("Failed to initialize push notifications:", err) } defer func() { push.Stop() - log.Println("Stopped push notifications") + logs.Info.Println("Stopped push notifications") }() // Keep inactive LP sessions for 15 seconds @@ -540,7 +540,7 @@ func main() { tlsConfig, err := parseTLSConfig(*tlsEnabled, config.TLS) if err != nil { - log.Fatalln(err) + logs.Error.Fatalln(err) } // Intialize plugins @@ -554,7 +554,7 @@ func main() { *listenGrpc = config.GrpcListen } if globals.grpcServer, err = serveGrpc(*listenGrpc, config.GrpcKeepalive, tlsConfig); err != nil { - log.Fatal(err) + logs.Error.Fatal(err) } // Serve static content from the directory in -static_data flag if that's @@ -566,7 +566,7 @@ func main() { // Resolve path to static content. *staticPath = toAbsolutePath(rootpath, *staticPath) if _, err = os.Stat(*staticPath); os.IsNotExist(err) { - log.Fatal("Static content directory is not found", *staticPath) + logs.Error.Fatal("Static content directory is not found", *staticPath) } staticMountPoint = config.StaticMount @@ -592,9 +592,9 @@ func main() { // Remove mount point prefix http.StripPrefix(staticMountPoint, http.FileServer(http.Dir(*staticPath)))))))) - log.Printf("Serving static content from '%s' at '%s'", *staticPath, staticMountPoint) + logs.Info.Printf("Serving static content from '%s' at '%s'", *staticPath, staticMountPoint) } else { - log.Println("Static content is disabled") + logs.Info.Println("Static content is disabled") } // Configure root path for serving API calls. @@ -611,7 +611,7 @@ func main() { config.ApiPath += "/" } } - log.Printf("API served from root URL path '%s'", config.ApiPath) + logs.Info.Printf("API served from root URL path '%s'", config.ApiPath) // Handle websocket clients. mux.HandleFunc(config.ApiPath+"v0/channels", serveWebSocket) @@ -622,7 +622,7 @@ func main() { mux.Handle(config.ApiPath+"v0/file/u/", gh.CompressHandler(http.HandlerFunc(largeFileReceive))) // Serve large files. mux.Handle(config.ApiPath+"v0/file/s/", gh.CompressHandler(http.HandlerFunc(largeFileServe))) - log.Println("Large media handling enabled", config.Media.UseHandler) + logs.Info.Println("Large media handling enabled", config.Media.UseHandler) } if staticMountPoint != "/" { @@ -631,6 +631,6 @@ func main() { } if err = listenAndServe(config.Listen, mux, tlsConfig, signalHandler()); err != nil { - log.Fatal(err) + logs.Error.Fatal(err) } } diff --git a/server/media/fs/filesys.go b/server/media/fs/filesys.go index 1311238d..75982710 100644 --- a/server/media/fs/filesys.go +++ b/server/media/fs/filesys.go @@ -7,11 +7,11 @@ import ( "encoding/json" "errors" "io" - "log" "mime" "os" "path/filepath" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/media" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" @@ -72,14 +72,14 @@ func (fh *fshandler) Upload(fdef *types.FileDef, file io.ReadSeeker) (string, er outfile, err := os.Create(fdef.Location) if err != nil { - log.Println("Upload: failed to create file", fdef.Location, err) + logs.Warning.Println("Upload: failed to create file", fdef.Location, err) return "", err } if err = store.Files.StartUpload(fdef); err != nil { outfile.Close() os.Remove(fdef.Location) - log.Println("failed to create file record", fdef.Id, err) + logs.Warning.Println("failed to create file record", fdef.Id, err) return "", err } @@ -116,7 +116,7 @@ func (fh *fshandler) Download(url string) (*types.FileDef, media.ReadSeekCloser, fd, err := fh.getFileRecord(fid) if err != nil { - log.Println("Download: file not found", fid) + logs.Warning.Println("Download: file not found", fid) return nil, nil, err } @@ -137,7 +137,7 @@ func (fh *fshandler) Delete(locations []string) error { for _, loc := range locations { if err, _ := os.Remove(loc).(*os.PathError); err != nil { if err != os.ErrNotExist { - log.Println("fs: error deleting file", loc, err) + logs.Warning.Println("fs: error deleting file", loc, err) } } } diff --git a/server/media/s3/s3.go b/server/media/s3/s3.go index 2d27d8dd..871bf403 100644 --- a/server/media/s3/s3.go +++ b/server/media/s3/s3.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "io" - "log" "mime" "net/http" "sync/atomic" @@ -19,6 +18,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/media" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" @@ -189,7 +189,7 @@ func (ah *awshandler) Upload(fdef *types.FileDef, file io.ReadSeeker) (string, e uploader := s3manager.NewUploaderWithClient(ah.svc) if err = store.Files.StartUpload(fdef); err != nil { - log.Println("failed to create file record", fdef.Id, err) + logs.Warning.Println("failed to create file record", fdef.Id, err) return "", err } @@ -221,7 +221,7 @@ func (ah *awshandler) Upload(fdef *types.FileDef, file io.ReadSeeker) (string, e fname += ext[0] } - log.Println("aws upload success ", fname, "key", key, "id", fdef.Id) + logs.Info.Println("aws upload success ", fname, "key", key, "id", fdef.Id) return ah.conf.ServeURL + fname, nil } diff --git a/server/pbconverter.go b/server/pbconverter.go index 44be727d..6bd582dc 100644 --- a/server/pbconverter.go +++ b/server/pbconverter.go @@ -4,10 +4,10 @@ package main import ( "encoding/json" - "log" "time" "github.com/tinode/chat/pbx" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store/types" ) @@ -66,7 +66,7 @@ func pbServPresSerialize(pres *MsgServerPres) *pbx.ServerMsg_Pres { case "tags": what = pbx.ServerPres_TAGS default: - log.Fatal("Unknown pres.what value", pres.What) + logs.Error.Fatal("Unknown pres.what value", pres.What) } return &pbx.ServerMsg_Pres{Pres: &pbx.ServerPres{ Topic: pres.Topic, @@ -448,7 +448,7 @@ func bytesToInterface(in []byte) interface{} { if len(in) > 0 { err := json.Unmarshal(in, &out) if err != nil { - log.Println("pbx: failed to parse bytes", string(in), err) + logs.Warning.Println("pbx: failed to parse bytes", string(in), err) } } return out @@ -639,7 +639,7 @@ func pbInfoNoteWhatSerialize(what string) pbx.InfoNote { case "recv": out = pbx.InfoNote_RECV default: - log.Fatal("unknown info-note.what", what) + logs.Error.Fatal("unknown info-note.what", what) } return out } @@ -654,7 +654,7 @@ func pbInfoNoteWhatDeserialize(what pbx.InfoNote) string { case pbx.InfoNote_RECV: out = "recv" default: - log.Fatal("unknown info-note.what", what) + logs.Error.Fatal("unknown info-note.what", what) } return out } diff --git a/server/plugins.go b/server/plugins.go index ceb34671..30f9f730 100644 --- a/server/plugins.go +++ b/server/plugins.go @@ -4,11 +4,11 @@ package main import ( "encoding/json" "errors" - "log" "strings" "time" "github.com/tinode/chat/pbx" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store/types" "golang.org/x/net/context" "google.golang.org/grpc" @@ -234,7 +234,7 @@ func pluginsInit(configString json.RawMessage) { var config []pluginConfig if err := json.Unmarshal(configString, &config); err != nil { - log.Fatal(err) + logs.Error.Fatal(err) } nameIndex := make(map[string]bool) @@ -247,7 +247,7 @@ func pluginsInit(configString json.RawMessage) { } if nameIndex[conf.Name] { - log.Fatalf("plugins: duplicate name '%s'", conf.Name) + logs.Error.Fatalf("plugins: duplicate name '%s'", conf.Name) } globals.plugins[count] = Plugin{ @@ -259,29 +259,29 @@ func pluginsInit(configString json.RawMessage) { var err error if globals.plugins[count].filterFireHose, err = ParsePluginFilter(conf.Filters.FireHose, plgFilterByTopicType|plgFilterByPacket); err != nil { - log.Fatal("plugins: bad FireHose filter", err) + logs.Error.Fatal("plugins: bad FireHose filter", err) } if globals.plugins[count].filterAccount, err = ParsePluginFilter(conf.Filters.Account, plgFilterByAction); err != nil { - log.Fatal("plugins: bad Account filter", err) + logs.Error.Fatal("plugins: bad Account filter", err) } if globals.plugins[count].filterTopic, err = ParsePluginFilter(conf.Filters.Topic, plgFilterByTopicType|plgFilterByAction); err != nil { - log.Fatal("plugins: bad FireHose filter", err) + logs.Error.Fatal("plugins: bad FireHose filter", err) } if globals.plugins[count].filterSubscription, err = ParsePluginFilter(conf.Filters.Subscription, plgFilterByTopicType|plgFilterByAction); err != nil { - log.Fatal("plugins: bad Subscription filter", err) + logs.Error.Fatal("plugins: bad Subscription filter", err) } if globals.plugins[count].filterMessage, err = ParsePluginFilter(conf.Filters.Message, plgFilterByTopicType|plgFilterByAction); err != nil { - log.Fatal("plugins: bad Message filter", err) + logs.Error.Fatal("plugins: bad Message filter", err) } globals.plugins[count].filterFind = conf.Filters.Find if parts := strings.SplitN(conf.ServiceAddr, "://", 2); len(parts) < 2 { - log.Fatal("plugins: invalid server address format", conf.ServiceAddr) + logs.Error.Fatal("plugins: invalid server address format", conf.ServiceAddr) } else { globals.plugins[count].network = parts[0] globals.plugins[count].addr = parts[1] @@ -289,7 +289,7 @@ func pluginsInit(configString json.RawMessage) { globals.plugins[count].conn, err = grpc.Dial(globals.plugins[count].addr, grpc.WithInsecure()) if err != nil { - log.Fatalf("plugins: connection failure %v", err) + logs.Error.Fatalf("plugins: connection failure %v", err) } globals.plugins[count].client = pbx.NewPluginClient(globals.plugins[count].conn) @@ -300,7 +300,7 @@ func pluginsInit(configString json.RawMessage) { globals.plugins = globals.plugins[:count] if len(globals.plugins) == 0 { - log.Println("plugins: no active plugins found") + logs.Info.Println("plugins: no active plugins found") globals.plugins = nil } else { var names []string @@ -308,7 +308,7 @@ func pluginsInit(configString json.RawMessage) { names = append(names, globals.plugins[i].name+"("+globals.plugins[i].addr+")") } - log.Println("plugins: active", "'"+strings.Join(names, "', '")+"'") + logs.Info.Println("plugins: active", "'"+strings.Join(names, "', '")+"'") } } @@ -393,7 +393,7 @@ func pluginFireHose(sess *Session, msg *ClientComMessage) (*ClientComMessage, *S } else if p.failureCode != 0 { // Plugin failed and it's configured to stop further processing. - log.Println("plugin: failed,", p.name, err) + logs.Error.Println("plugin: failed,", p.name, err) return nil, &ServerComMessage{Ctrl: &MsgServerCtrl{ Id: id, Code: p.failureCode, @@ -402,7 +402,7 @@ func pluginFireHose(sess *Session, msg *ClientComMessage) (*ClientComMessage, *S Timestamp: ts}} } else { // Plugin failed but configured to ignore failure. - log.Println("plugin: failure ignored,", p.name, err) + logs.Warning.Println("plugin: failure ignored,", p.name, err) } } @@ -436,7 +436,7 @@ func pluginFind(user types.Uid, query string) (string, []types.Subscription, err } resp, err := p.client.Find(ctx, find) if err != nil { - log.Println("plugins: Find call failed", p.name, err) + logs.Warning.Println("plugins: Find call failed", p.name, err) return "", nil, err } respStatus := resp.GetStatus() @@ -493,7 +493,7 @@ func pluginAccount(user *types.User, action int) { ctx = context.Background() } if _, err := p.client.Account(ctx, event); err != nil { - log.Println("plugins: Account call failed", p.name, err) + logs.Warning.Println("plugins: Account call failed", p.name, err) } } } @@ -528,7 +528,7 @@ func pluginTopic(topic *Topic, action int) { ctx = context.Background() } if _, err := p.client.Topic(ctx, event); err != nil { - log.Println("plugins: Topic call failed", p.name, err) + logs.Warning.Println("plugins: Topic call failed", p.name, err) } } } @@ -574,7 +574,7 @@ func pluginSubscription(sub *types.Subscription, action int) { ctx = context.Background() } if _, err := p.client.Subscription(ctx, event); err != nil { - log.Println("plugins: Subscription call failed", p.name, err) + logs.Warning.Println("plugins: Subscription call failed", p.name, err) } } } @@ -609,7 +609,7 @@ func pluginMessage(data *MsgServerData, action int) { ctx = context.Background() } if _, err := p.client.Message(ctx, event); err != nil { - log.Println("plugins: Message call failed", p.name, err) + logs.Warning.Println("plugins: Message call failed", p.name, err) } } } diff --git a/server/pres.go b/server/pres.go index 392aad1f..ddf53c4b 100644 --- a/server/pres.go +++ b/server/pres.go @@ -1,9 +1,9 @@ package main import ( - "log" "strings" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" ) @@ -535,7 +535,7 @@ func (t *Topic) presPubMessageCount(uid types.Uid, mode types.AccessMode, recv, // Cases V.1, V.2 func (t *Topic) presPubMessageDelete(uid types.Uid, mode types.AccessMode, delID int, list []MsgDelRange, skip string) { if len(list) == 0 && delID <= 0 { - log.Printf("Case V.1, V.2: topic[%s] invalid request - missing payload", t.name) + logs.Warning.Printf("Case V.1, V.2: topic[%s] invalid request - missing payload", t.name) return } diff --git a/server/push/fcm/payload.go b/server/push/fcm/payload.go index 94291763..bfe26e2e 100644 --- a/server/push/fcm/payload.go +++ b/server/push/fcm/payload.go @@ -2,13 +2,13 @@ package fcm import ( "errors" - "log" "strconv" "time" fcm "firebase.google.com/go/messaging" "github.com/tinode/chat/server/drafty" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/push" "github.com/tinode/chat/server/store" t "github.com/tinode/chat/server/store/types" @@ -186,7 +186,7 @@ func clonePayload(src map[string]string) map[string]string { func PrepareNotifications(rcpt *push.Receipt, config *AndroidConfig) []MessageData { data, err := payloadToData(&rcpt.Payload) if err != nil { - log.Println("fcm push: could not parse payload;", err) + logs.Warning.Println("fcm push: could not parse payload;", err) return nil } @@ -211,7 +211,7 @@ func PrepareNotifications(rcpt *push.Receipt, config *AndroidConfig) []MessageDa } devices, count, err = store.Devices.GetAll(uids...) if err != nil { - log.Println("fcm push: db error", err) + logs.Warning.Println("fcm push: db error", err) return nil } } @@ -337,7 +337,7 @@ func PrepareNotifications(rcpt *push.Receipt, config *AndroidConfig) []MessageDa func DevicesForUser(uid t.Uid) []string { ddef, count, err := store.Devices.GetAll(uid) if err != nil { - log.Println("fcm devices for user: db error", err) + logs.Warning.Println("fcm devices for user: db error", err) return nil } diff --git a/server/push/fcm/push_fcm.go b/server/push/fcm/push_fcm.go index 7dff60a7..93281d40 100644 --- a/server/push/fcm/push_fcm.go +++ b/server/push/fcm/push_fcm.go @@ -8,11 +8,11 @@ import ( "context" "encoding/json" "errors" - "log" fbase "firebase.google.com/go" fcm "firebase.google.com/go/messaging" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/push" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" @@ -132,7 +132,7 @@ func sendNotifications(rcpt *push.Receipt, config *configType) { resp, err := handler.client.SendAll(ctx, batch) if err != nil { // Complete failure. - log.Println("fcm SendAll failed", err) + logs.Warning.Println("fcm SendAll failed", err) break } @@ -152,7 +152,7 @@ func processSubscription(req *push.ChannelReq) { if len(devices) > subBatchSize { // It's extremely unlikely for a single user to have this many devices. devices = devices[0:subBatchSize] - log.Println("fcm: user", req.Uid.UserId(), "has more than", subBatchSize, "devices") + logs.Warning.Println("fcm: user", req.Uid.UserId(), "has more than", subBatchSize, "devices") } var err error @@ -164,7 +164,7 @@ func processSubscription(req *push.ChannelReq) { } if err != nil { // Complete failure. - log.Println("fcm: sub or upsub failed", req.Unsub, err) + logs.Warning.Println("fcm: sub or upsub failed", req.Unsub, err) } else { // Check for partial failure. handleSubErrors(resp, req.Uid, devices) @@ -193,7 +193,7 @@ func handleSubErrors(response *fcm.TopicManagementResponse, uid types.Uid, devic for _, errinfo := range response.Errors { // FCM documentation sucks. There is no list of possible errors so no action can be taken but logging. - log.Println("fcm sub/unsub error", errinfo.Reason, uid, devices[errinfo.Index]) + logs.Warning.Println("fcm sub/unsub error", errinfo.Reason, uid, devices[errinfo.Index]) } } @@ -203,24 +203,24 @@ func handleFcmError(err error, uid types.Uid, deviceId string) bool { fcm.IsInternal(err) || fcm.IsUnknown(err) { // Transient errors. Stop sending this batch. - log.Println("fcm transient failure", err) + logs.Warning.Println("fcm transient failure", err) return false } if fcm.IsMismatchedCredential(err) || fcm.IsInvalidArgument(err) { // Config errors - log.Println("fcm: request failed", err) + logs.Warning.Println("fcm: request failed", err) return false } if fcm.IsRegistrationTokenNotRegistered(err) { // Token is no longer valid. - log.Println("fcm: invalid token", uid, err) + logs.Warning.Println("fcm: invalid token", uid, err) if err := store.Devices.Delete(uid, deviceId); err != nil { - log.Println("fcm: failed to delete invalid token", err) + logs.Warning.Println("fcm: failed to delete invalid token", err) } } else { // All other errors are treated as non-fatal. - log.Println("fcm error:", err) + logs.Warning.Println("fcm error:", err) } return true } diff --git a/server/push/tnpg/push_tnpg.go b/server/push/tnpg/push_tnpg.go index 7b590fcf..b3cb066f 100644 --- a/server/push/tnpg/push_tnpg.go +++ b/server/push/tnpg/push_tnpg.go @@ -7,10 +7,10 @@ import ( "encoding/json" "errors" "io" - "log" "net/http" "strings" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/push" "github.com/tinode/chat/server/push/fcm" "github.com/tinode/chat/server/store" @@ -173,7 +173,7 @@ func postMessage(endpoint string, body interface{}, config *configType) (*batchR if err != nil { // Just log the error, but don't report it to caller. The push succeeded. - log.Println("tnpg failed to decode response", err) + logs.Warning.Println("tnpg failed to decode response", err) } batch.httpCode = resp.StatusCode @@ -197,15 +197,15 @@ func sendPushes(rcpt *push.Receipt, config *configType) { } resp, err := postMessage(handler.pushUrl, payloads, config) if err != nil { - log.Println("tnpg push request failed:", err) + logs.Warning.Println("tnpg push request failed:", err) break } if resp.httpCode >= 300 { - log.Println("tnpg push rejected:", resp.httpStatus) + logs.Warning.Println("tnpg push rejected:", resp.httpStatus) break } if resp.FatalCode != "" { - log.Println("tnpg push failed:", resp.FatalMessage) + logs.Error.Println("tnpg push failed:", resp.FatalMessage) break } // Check for expired tokens and other errors. @@ -225,20 +225,20 @@ func processSubscription(req *push.ChannelReq, config *configType) { if len(su.Devices) > subBatchSize { // It's extremely unlikely for a single user to have this many devices. su.Devices = su.Devices[0:subBatchSize] - log.Println("tnpg: user", req.Uid.UserId(), "has more than", subBatchSize, "devices") + logs.Warning.Println("tnpg: user", req.Uid.UserId(), "has more than", subBatchSize, "devices") } resp, err := postMessage(handler.subUrl, &su, config) if err != nil { - log.Println("tnpg channel sub request failed:", err) + logs.Warning.Println("tnpg channel sub request failed:", err) return } if resp.httpCode >= 300 { - log.Println("tnpg channel sub rejected:", resp.httpStatus) + logs.Warning.Println("tnpg channel sub rejected:", resp.httpStatus) return } if resp.FatalCode != "" { - log.Println("tnpg channel sub failed:", resp.FatalMessage) + logs.Error.Println("tnpg channel sub failed:", resp.FatalMessage) return } // Check for expired tokens and other errors. @@ -255,20 +255,20 @@ func handlePushResponse(batch *batchResponse, messages []fcm.MessageData) { case "": // no error case messageRateExceeded, quotaExceeded, serverUnavailable, unavailableError, internalError, unknownError: // Transient errors. Stop sending this batch. - log.Println("tnpg: transient failure", resp.ErrorMessage) + logs.Warning.Println("tnpg: transient failure", resp.ErrorMessage) return case mismatchedCredential, invalidArgument, senderIDMismatch, thirdPartyAuthError, invalidAPNSCredentials: // Config errors - log.Println("tnpg: invalid config", resp.ErrorMessage) + logs.Warning.Println("tnpg: invalid config", resp.ErrorMessage) return case registrationTokenNotRegistered, unregisteredError: // Token is no longer valid. - log.Println("tnpg: invalid token", resp.ErrorMessage) + logs.Warning.Println("tnpg: invalid token", resp.ErrorMessage) if err := store.Devices.Delete(messages[i].Uid, messages[i].DeviceId); err != nil { - log.Println("tnpg: failed to delete invalid token", err) + logs.Warning.Println("tnpg: failed to delete invalid token", err) } default: - log.Println("tnpg: unrecognized error", resp.ErrorMessage) + logs.Warning.Println("tnpg: unrecognized error", resp.ErrorMessage) } } } @@ -280,7 +280,7 @@ func handleSubResponse(batch *batchResponse, req *push.ChannelReq, devices []str for _, resp := range batch.Responses { // FCM documentation sucks. There is no list of possible errors so no action can be taken but logging. - log.Println("fcm sub/unsub error", resp.ErrorCode, req.Uid, devices[resp.Index]) + logs.Warning.Println("fcm sub/unsub error", resp.ErrorCode, req.Uid, devices[resp.Index]) } } diff --git a/server/ringhash/ringhash.go b/server/ringhash/ringhash.go index 124d7265..d7d037ae 100644 --- a/server/ringhash/ringhash.go +++ b/server/ringhash/ringhash.go @@ -6,9 +6,10 @@ import ( "encoding/ascii85" "hash/crc32" "hash/fnv" - "log" "sort" "strconv" + + "github.com/tinode/chat/server/logs" ) // Hash is a signature of a hash function used by the package. @@ -128,6 +129,6 @@ func (ring *Ring) Signature() string { func (ring *Ring) dump() { for _, e := range ring.keys { - log.Printf("key: '%s', hash=%d", e.key, e.hash) + logs.Info.Printf("key: '%s', hash=%d", e.key, e.hash) } } diff --git a/server/session.go b/server/session.go index 850885d0..46d4a40f 100644 --- a/server/session.go +++ b/server/session.go @@ -13,7 +13,6 @@ import ( "container/list" "encoding/json" "fmt" - "log" "net/http" "strings" "sync" @@ -23,6 +22,7 @@ import ( "github.com/gorilla/websocket" "github.com/tinode/chat/pbx" "github.com/tinode/chat/server/auth" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" @@ -284,7 +284,7 @@ func (s *Session) queueOut(msg *ServerComMessage) bool { if 200 <= msg.Ctrl.Code && msg.Ctrl.Code < 600 { statsInc(fmt.Sprintf("CtrlCodesTotal%dxx", msg.Ctrl.Code/100), 1) } else { - log.Println("Invalid response code: ", msg.Ctrl.Code) + logs.Warning.Println("Invalid response code: ", msg.Ctrl.Code) } } @@ -296,7 +296,7 @@ func (s *Session) queueOut(msg *ServerComMessage) bool { case s.send <- data: default: // Never block here since it may also block the topic's run() goroutine. - log.Println("s.queueOut: session's send queue full", s.sid) + logs.Error.Println("s.queueOut: session's send queue full", s.sid) return false } return true @@ -312,7 +312,7 @@ func (s *Session) queueOutBytes(data []byte) bool { select { case s.send <- data: default: - log.Println("s.queueOutBytes: session's send queue full", s.sid) + logs.Error.Println("s.queueOutBytes: session's send queue full", s.sid) return false } return true @@ -364,7 +364,7 @@ func (s *Session) dispatchRaw(raw []byte) { var msg ClientComMessage if atomic.LoadInt32(&s.terminating) > 0 { - log.Println("s.dispatch: message received on a terminating session", s.sid) + logs.Warning.Println("s.dispatch: message received on a terminating session", s.sid) s.queueOut(ErrLocked("", "", now)) return } @@ -381,11 +381,11 @@ func (s *Session) dispatchRaw(raw []byte) { toLog = raw[:512] truncated = "<...>" } - log.Printf("in: '%s%s' sid='%s' uid='%s'", toLog, truncated, s.sid, s.uid) + logs.Info.Printf("in: '%s%s' sid='%s' uid='%s'", toLog, truncated, s.sid, s.uid) if err := json.Unmarshal(raw, &msg); err != nil { // Malformed message - log.Println("s.dispatch", err, s.sid) + logs.Warning.Println("s.dispatch", err, s.sid) s.queueOut(ErrMalformed("", "", now)) return } @@ -404,11 +404,11 @@ func (s *Session) dispatch(msg *ClientComMessage) { } else if s.authLvl != auth.LevelRoot { // Only root user can set non-default msg.from && msg.authLvl values. s.queueOut(ErrPermissionDenied("", "", msg.Timestamp)) - log.Println("s.dispatch: non-root asigned msg.from", s.sid) + logs.Warning.Println("s.dispatch: non-root asigned msg.from", s.sid) return } else if fromUid := types.ParseUserId(msg.AsUser); fromUid.IsZero() { s.queueOut(ErrMalformed("", "", msg.Timestamp)) - log.Println("s.dispatch: malformed msg.from: ", msg.AsUser, s.sid) + logs.Warning.Println("s.dispatch: malformed msg.from: ", msg.AsUser, s.sid) return } else if auth.Level(msg.AuthLvl) == auth.LevelNone { // AuthLvl is not set by caller, assign default LevelAuth. @@ -432,7 +432,7 @@ func (s *Session) dispatch(msg *ClientComMessage) { checkVers := func(m *ClientComMessage, handler func(*ClientComMessage)) func(*ClientComMessage) { return func(m *ClientComMessage) { if s.ver == 0 { - log.Println("s.dispatch: {hi} is missing", s.sid) + logs.Warning.Println("s.dispatch: {hi} is missing", s.sid) s.queueOut(ErrCommandOutOfSequence(m.Id, m.Original, msg.Timestamp)) return } @@ -444,7 +444,7 @@ func (s *Session) dispatch(msg *ClientComMessage) { checkUser := func(m *ClientComMessage, handler func(*ClientComMessage)) func(*ClientComMessage) { return func(m *ClientComMessage) { if msg.AsUser == "" { - log.Println("s.dispatch: authentication required", s.sid) + logs.Warning.Println("s.dispatch: authentication required", s.sid) s.queueOut(ErrAuthRequiredReply(m, m.Timestamp)) return } @@ -507,7 +507,7 @@ func (s *Session) dispatch(msg *ClientComMessage) { default: // Unknown message s.queueOut(ErrMalformed("", "", msg.Timestamp)) - log.Println("s.dispatch: unknown message", s.sid) + logs.Warning.Println("s.dispatch: unknown message", s.sid) return } @@ -557,7 +557,7 @@ func (s *Session) subscribe(msg *ClientComMessage) { // Reply with a 500 to the user. s.queueOut(ErrUnknownReply(msg, msg.Timestamp)) s.inflightReqs.Done() - log.Println("s.subscribe: hub.join queue full, topic ", msg.RcptTo, s.sid) + logs.Error.Println("s.subscribe: hub.join queue full, topic ", msg.RcptTo, s.sid) } // Hub will send Ctrl success/failure packets back to session } @@ -592,7 +592,7 @@ func (s *Session) leave(msg *ClientComMessage) { } else { // Session wants to unsubscribe from the topic it did not join // FIXME(gene): allow topic to unsubscribe without joining first; send to hub to unsub - log.Println("s.leave:", "must attach first", s.sid) + logs.Warning.Println("s.leave:", "must attach first", s.sid) s.queueOut(ErrAttachFirst(msg, msg.Timestamp)) } } @@ -643,7 +643,7 @@ func (s *Session) publish(msg *ClientComMessage) { default: // Reply with a 500 to the user. s.queueOut(ErrUnknownReply(msg, msg.Timestamp)) - log.Println("s.publish: sub.broadcast channel full, topic ", msg.RcptTo, s.sid) + logs.Error.Println("s.publish: sub.broadcast channel full, topic ", msg.RcptTo, s.sid) } } else if msg.RcptTo == "sys" { // Publishing to "sys" topic requires no subsription. @@ -652,12 +652,12 @@ func (s *Session) publish(msg *ClientComMessage) { default: // Reply with a 500 to the user. s.queueOut(ErrUnknownReply(msg, msg.Timestamp)) - log.Println("s.publish: hub.route channel full", s.sid) + logs.Error.Println("s.publish: hub.route channel full", s.sid) } } else { // Publish request received without attaching to topic first. s.queueOut(ErrAttachFirst(msg, msg.Timestamp)) - log.Println("s.publish:", "must attach first", s.sid) + logs.Warning.Println("s.publish:", "must attach first", s.sid) } } @@ -669,7 +669,7 @@ func (s *Session) hello(msg *ClientComMessage) { if s.ver == 0 { s.ver = parseVersion(msg.Hi.Version) if s.ver == 0 { - log.Println("s.hello:", "failed to parse version", s.sid) + logs.Warning.Println("s.hello:", "failed to parse version", s.sid) s.queueOut(ErrMalformed(msg.Id, "", msg.Timestamp)) return } @@ -677,7 +677,7 @@ func (s *Session) hello(msg *ClientComMessage) { if versionCompare(s.ver, minSupportedVersionValue) < 0 { s.ver = 0 s.queueOut(ErrVersionNotSupported(msg.Id, msg.Timestamp)) - log.Println("s.hello:", "unsupported version", s.sid) + logs.Warning.Println("s.hello:", "unsupported version", s.sid) return } @@ -722,7 +722,7 @@ func (s *Session) hello(msg *ClientComMessage) { } if err != nil { - log.Println("s.hello:", "device ID", err, s.sid) + logs.Warning.Println("s.hello:", "device ID", err, s.sid) s.queueOut(ErrUnknown(msg.Id, "", msg.Timestamp)) return } @@ -730,7 +730,7 @@ func (s *Session) hello(msg *ClientComMessage) { } else { // Version cannot be changed mid-session. s.queueOut(ErrCommandOutOfSequence(msg.Id, "", msg.Timestamp)) - log.Println("s.hello:", "version cannot be changed", s.sid) + logs.Warning.Println("s.hello:", "version cannot be changed", s.sid) return } @@ -749,7 +749,7 @@ func (s *Session) hello(msg *ClientComMessage) { if len(s.lang) > 2 { // Logging strings longer than 2 b/c language.Parse(XX) always succeeds // returning confidence Low. - log.Println("s.hello:", "could not parse locale ", s.lang) + logs.Warning.Println("s.hello:", "could not parse locale ", s.lang) } s.countryCode = globals.defaultCountryCode } @@ -782,7 +782,7 @@ func (s *Session) acc(msg *ClientComMessage) { if msg.Acc.Token != nil { if !s.uid.IsZero() { s.queueOut(ErrAlreadyAuthenticated(msg.Acc.Id, "", msg.Timestamp)) - log.Println("s.acc: got token while already authenticated", s.sid) + logs.Warning.Println("s.acc: got token while already authenticated", s.sid) return } @@ -791,7 +791,7 @@ func (s *Session) acc(msg *ClientComMessage) { if err != nil { s.queueOut(decodeStoreError(err, msg.Acc.Id, "", msg.Timestamp, map[string]interface{}{"what": "auth"})) - log.Println("s.acc: invalid token", err, s.sid) + logs.Warning.Println("s.acc: invalid token", err, s.sid) return } } @@ -827,7 +827,7 @@ func (s *Session) login(msg *ClientComMessage) { handler := store.GetLogicalAuthHandler(msg.Login.Scheme) if handler == nil { - log.Println("s.login: unknown authentication scheme", msg.Login.Scheme, s.sid) + logs.Warning.Println("s.login: unknown authentication scheme", msg.Login.Scheme, s.sid) s.queueOut(ErrAuthUnknownScheme(msg.Id, "", msg.Timestamp)) return } @@ -837,7 +837,7 @@ func (s *Session) login(msg *ClientComMessage) { resp := decodeStoreError(err, msg.Id, "", msg.Timestamp, nil) if resp.Ctrl.Code >= 500 { // Log internal errors - log.Println("s.login: internal", err, s.sid) + logs.Warning.Println("s.login: internal", err, s.sid) } s.queueOut(resp) return @@ -852,7 +852,7 @@ func (s *Session) login(msg *ClientComMessage) { } if err != nil { - log.Println("s.login: user state check failed", rec.Uid, err, s.sid) + logs.Warning.Println("s.login: user state check failed", rec.Uid, err, s.sid) s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, nil)) return } @@ -873,7 +873,7 @@ func (s *Session) login(msg *ClientComMessage) { } } if err != nil { - log.Println("s.login: failed to validate credentials:", err, s.sid) + logs.Warning.Println("s.login: failed to validate credentials:", err, s.sid) s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, nil)) } else { s.queueOut(s.onLogin(msg.Id, msg.Timestamp, rec, missing)) @@ -965,7 +965,7 @@ func (s *Session) onLogin(msgID string, timestamp time.Time, rec *auth.Rec, miss LastSeen: timestamp, Lang: s.lang, }); err != nil { - log.Println("failed to update device record", err) + logs.Warning.Println("failed to update device record", err) } } } @@ -997,14 +997,14 @@ func (s *Session) get(msg *ClientComMessage) { if meta.pkt.MetaWhat == 0 { s.queueOut(ErrMalformedReply(msg, msg.Timestamp)) - log.Println("s.get: invalid Get message action", msg.Get.What) + logs.Warning.Println("s.get: invalid Get message action", msg.Get.What) } else if sub != nil { select { case sub.meta <- meta: default: // Reply with a 500 to the user. s.queueOut(ErrUnknownReply(msg, msg.Timestamp)) - log.Println("s.get: sub.meta channel full, topic ", msg.RcptTo, s.sid) + logs.Error.Println("s.get: sub.meta channel full, topic ", msg.RcptTo, s.sid) } } else if meta.pkt.MetaWhat&(constMsgMetaDesc|constMsgMetaSub) != 0 { // Request some minimal info from a topic not currently attached to. @@ -1013,10 +1013,10 @@ func (s *Session) get(msg *ClientComMessage) { default: // Reply with a 500 to the user. s.queueOut(ErrUnknownReply(msg, msg.Timestamp)) - log.Println("s.get: hub.meta channel full", s.sid) + logs.Error.Println("s.get: hub.meta channel full", s.sid) } } else { - log.Println("s.get: subscribe first to get=", msg.Get.What) + logs.Warning.Println("s.get: subscribe first to get=", msg.Get.What) s.queueOut(ErrPermissionDeniedReply(msg, msg.Timestamp)) } } @@ -1049,17 +1049,17 @@ func (s *Session) set(msg *ClientComMessage) { if meta.pkt.MetaWhat == 0 { s.queueOut(ErrMalformedReply(msg, msg.Timestamp)) - log.Println("s.set: nil Set action") + logs.Warning.Println("s.set: nil Set action") } else if sub := s.getSub(msg.RcptTo); sub != nil { select { case sub.meta <- meta: default: // Reply with a 500 to the user. s.queueOut(ErrUnknownReply(msg, msg.Timestamp)) - log.Println("s.set: sub.meta channel full, topic ", msg.RcptTo, s.sid) + logs.Error.Println("s.set: sub.meta channel full, topic ", msg.RcptTo, s.sid) } } else if meta.pkt.MetaWhat&(constMsgMetaTags|constMsgMetaCred) != 0 { - log.Println("s.set: can Set tags/creds for subscribed topics only", meta.pkt.MetaWhat) + logs.Warning.Println("s.set: can Set tags/creds for subscribed topics only", meta.pkt.MetaWhat) s.queueOut(ErrPermissionDeniedReply(msg, msg.Timestamp)) } else { // Desc.Private and Sub updates are possible without the subscription. @@ -1068,7 +1068,7 @@ func (s *Session) set(msg *ClientComMessage) { default: // Reply with a 500 to the user. s.queueOut(ErrUnknownReply(msg, msg.Timestamp)) - log.Println("s.set: hub.meta channel full", s.sid) + logs.Error.Println("s.set: hub.meta channel full", s.sid) } } } @@ -1094,7 +1094,7 @@ func (s *Session) del(msg *ClientComMessage) { if msg.MetaWhat == 0 { s.queueOut(ErrMalformedReply(msg, msg.Timestamp)) - log.Println("s.del: invalid Del action", msg.Del.What, s.sid) + logs.Warning.Println("s.del: invalid Del action", msg.Del.What, s.sid) return } sub := s.getSub(msg.RcptTo) @@ -1107,7 +1107,7 @@ func (s *Session) del(msg *ClientComMessage) { default: // Reply with a 500 to the user. s.queueOut(ErrUnknownReply(msg, msg.Timestamp)) - log.Println("s.del: sub.meta channel full, topic ", msg.RcptTo, s.sid) + logs.Error.Println("s.del: sub.meta channel full, topic ", msg.RcptTo, s.sid) } } else if msg.MetaWhat == constMsgDelTopic { // Deleting topic: for sessions attached or not attached, send request to hub first. @@ -1121,12 +1121,12 @@ func (s *Session) del(msg *ClientComMessage) { default: // Reply with a 500 to the user. s.queueOut(ErrUnknownReply(msg, msg.Timestamp)) - log.Println("s.del: hub.unreg channel full", s.sid) + logs.Error.Println("s.del: hub.unreg channel full", s.sid) } } else { // Must join the topic to delete messages or subscriptions. s.queueOut(ErrAttachFirst(msg, msg.Timestamp)) - log.Println("s.del: invalid Del action while unsubbed", msg.Del.What, s.sid) + logs.Warning.Println("s.del: invalid Del action while unsubbed", msg.Del.What, s.sid) } } @@ -1178,7 +1178,7 @@ func (s *Session) note(msg *ClientComMessage) { default: // Reply with a 500 to the user. s.queueOut(ErrUnknownReply(msg, msg.Timestamp)) - log.Println("s.note: sub.broacast channel full, topic ", msg.RcptTo, s.sid) + logs.Error.Println("s.note: sub.broacast channel full, topic ", msg.RcptTo, s.sid) } } else if msg.Note.What == "recv" { // Client received a pres notification about a new message, initiated a fetch @@ -1189,11 +1189,11 @@ func (s *Session) note(msg *ClientComMessage) { default: // Reply with a 500 to the user. s.queueOut(ErrUnknownReply(msg, msg.Timestamp)) - log.Println("s.note: hub.route channel full", s.sid) + logs.Error.Println("s.note: hub.route channel full", s.sid) } } else { s.queueOut(ErrAttachFirst(msg, msg.Timestamp)) - log.Println("s.note: note to invalid topic - must subscribe first", msg.Note.What, s.sid) + logs.Warning.Println("s.note: note to invalid topic - must subscribe first", msg.Note.What, s.sid) } } @@ -1205,7 +1205,7 @@ func (s *Session) note(msg *ClientComMessage) { func (s *Session) expandTopicName(msg *ClientComMessage) (string, *ServerComMessage) { if msg.Original == "" { - log.Println("s.etn: empty topic name", s.sid) + logs.Warning.Println("s.etn: empty topic name", s.sid) return "", ErrMalformed(msg.Id, "", msg.Timestamp) } @@ -1221,11 +1221,11 @@ func (s *Session) expandTopicName(msg *ClientComMessage) (string, *ServerComMess uid2 := types.ParseUserId(msg.Original) if uid2.IsZero() { // Ensure the user id is valid. - log.Println("s.etn: failed to parse p2p topic name", s.sid) + logs.Warning.Println("s.etn: failed to parse p2p topic name", s.sid) return "", ErrMalformed(msg.Id, msg.Original, msg.Timestamp) } else if uid2 == uid1 { // Use 'me' to access self-topic. - log.Println("s.etn: invalid p2p self-subscription", s.sid) + logs.Warning.Println("s.etn: invalid p2p self-subscription", s.sid) return "", ErrPermissionDeniedReply(msg, msg.Timestamp) } routeTo = uid1.P2PName(uid2) diff --git a/server/sessionstore.go b/server/sessionstore.go index 0bdcccad..b060afa7 100644 --- a/server/sessionstore.go +++ b/server/sessionstore.go @@ -10,13 +10,13 @@ package main import ( "container/list" - "log" "net/http" "sync" "time" "github.com/gorilla/websocket" "github.com/tinode/chat/pbx" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" ) @@ -47,7 +47,7 @@ func (ss *SessionStore) NewSession(conn interface{}, sid string) (*Session, int) ss.lock.Lock() if _, found := ss.sessCache[s.sid]; found { - log.Fatalln("ERROR! duplicate session ID", s.sid) + logs.Error.Fatalln("ERROR! duplicate session ID", s.sid) } ss.lock.Unlock() @@ -65,7 +65,7 @@ func (ss *SessionStore) NewSession(conn interface{}, sid string) (*Session, int) s.proto = GRPC s.grpcnode = c default: - log.Panicln("session: unknown connection type", conn) + logs.Error.Panicln("session: unknown connection type", conn) } s.subs = make(map[string]*Subscription) @@ -166,7 +166,7 @@ func (ss *SessionStore) Shutdown() { // TODO: Consider broadcasting shutdown to other cluster nodes. - log.Println("SessionStore shut down, sessions terminated:", len(ss.sessCache)) + logs.Info.Println("SessionStore shut down, sessions terminated:", len(ss.sessCache)) } // EvictUser terminates all sessions of a given user. diff --git a/server/stats.go b/server/stats.go index 33d31112..f4fe68c5 100644 --- a/server/stats.go +++ b/server/stats.go @@ -8,11 +8,12 @@ package main import ( "encoding/json" "expvar" - "log" "net/http" "runtime" "sort" "time" + + "github.com/tinode/chat/server/logs" ) // A simple implementation of histogram expvar.Var. @@ -70,7 +71,7 @@ func statsInit(mux *http.ServeMux, path string) { go statsUpdater() - log.Printf("stats: variables exposed at '%s'", path) + logs.Info.Printf("stats: variables exposed at '%s'", path) } // Register integer variable. Don't check for initialization. @@ -148,12 +149,12 @@ func statsUpdater() { val := upd.value.(float64) v.addSample(val) default: - log.Panicf("stats: unsupported expvar type %T", ev) + logs.Error.Panicf("stats: unsupported expvar type %T", ev) } } else { panic("stats: update to unknown variable " + upd.varname) } } - log.Println("stats: shutdown") + logs.Info.Println("stats: shutdown") } diff --git a/server/topic.go b/server/topic.go index e7f80eef..964a6fe3 100644 --- a/server/topic.go +++ b/server/topic.go @@ -10,7 +10,6 @@ package main import ( "errors" - "log" "reflect" "sort" "strings" @@ -19,6 +18,7 @@ import ( "github.com/tinode/chat/server/auth" "github.com/tinode/chat/server/concurrency" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/push" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" @@ -281,7 +281,7 @@ func (t *Topic) fixUpUserCounts(userCounts map[types.Uid]int) { pud.online -= decrementBy t.perUser[uid] = pud if pud.online < 0 { - log.Printf("topic[%s]: invalid online count for user %s", t.name, uid) + logs.Warning.Printf("topic[%s]: invalid online count for user %s", t.name, uid) } } } @@ -321,7 +321,7 @@ func (t *Topic) runLocal(hub *Hub) { // Failed to subscribe, the topic is still inactive killTimer.Reset(keepAlive) } - log.Printf("topic[%s] subscription failed %v, sid=%s", t.name, err, join.sess.sid) + logs.Warning.Printf("topic[%s] subscription failed %v, sid=%s", t.name, err, join.sess.sid) } } if join.sess.inflightReqs != nil { @@ -352,33 +352,33 @@ func (t *Topic) runLocal(hub *Hub) { // Get request if meta.pkt.MetaWhat&constMsgMetaDesc != 0 { if err := t.replyGetDesc(meta.sess, asUid, meta.pkt.Get.Desc, meta.pkt); err != nil { - log.Printf("topic[%s] meta.Get.Desc failed: %s", t.name, err) + logs.Warning.Printf("topic[%s] meta.Get.Desc failed: %s", t.name, err) } } if meta.pkt.MetaWhat&constMsgMetaSub != 0 { if err := t.replyGetSub(meta.sess, asUid, authLevel, meta.pkt); err != nil { - log.Printf("topic[%s] meta.Get.Sub failed: %s", t.name, err) + logs.Warning.Printf("topic[%s] meta.Get.Sub failed: %s", t.name, err) } } if meta.pkt.MetaWhat&constMsgMetaData != 0 { if err := t.replyGetData(meta.sess, asUid, meta.pkt.Get.Data, meta.pkt); err != nil { - log.Printf("topic[%s] meta.Get.Data failed: %s", t.name, err) + logs.Warning.Printf("topic[%s] meta.Get.Data failed: %s", t.name, err) } } if meta.pkt.MetaWhat&constMsgMetaDel != 0 { if err := t.replyGetDel(meta.sess, asUid, meta.pkt.Get.Del, meta.pkt); err != nil { - log.Printf("topic[%s] meta.Get.Del failed: %s", t.name, err) + logs.Warning.Printf("topic[%s] meta.Get.Del failed: %s", t.name, err) } } if meta.pkt.MetaWhat&constMsgMetaTags != 0 { if err := t.replyGetTags(meta.sess, asUid, meta.pkt); err != nil { - log.Printf("topic[%s] meta.Get.Tags failed: %s", t.name, err) + logs.Warning.Printf("topic[%s] meta.Get.Tags failed: %s", t.name, err) } } if meta.pkt.MetaWhat&constMsgMetaCred != 0 { - log.Printf("topic[%s] handle getCred", t.name) + logs.Warning.Printf("topic[%s] handle getCred", t.name) if err := t.replyGetCreds(meta.sess, asUid, meta.pkt); err != nil { - log.Printf("topic[%s] meta.Get.Creds failed: %s", t.name, err) + logs.Warning.Printf("topic[%s] meta.Get.Creds failed: %s", t.name, err) } } @@ -389,22 +389,22 @@ func (t *Topic) runLocal(hub *Hub) { // Notify plugins of the update pluginTopic(t, plgActUpd) } else { - log.Printf("topic[%s] meta.Set.Desc failed: %v", t.name, err) + logs.Warning.Printf("topic[%s] meta.Set.Desc failed: %v", t.name, err) } } if meta.pkt.MetaWhat&constMsgMetaSub != 0 { if err := t.replySetSub(hub, meta.sess, meta.pkt); err != nil { - log.Printf("topic[%s] meta.Set.Sub failed: %v", t.name, err) + logs.Warning.Printf("topic[%s] meta.Set.Sub failed: %v", t.name, err) } } if meta.pkt.MetaWhat&constMsgMetaTags != 0 { if err := t.replySetTags(meta.sess, asUid, meta.pkt); err != nil { - log.Printf("topic[%s] meta.Set.Tags failed: %v", t.name, err) + logs.Warning.Printf("topic[%s] meta.Set.Tags failed: %v", t.name, err) } } if meta.pkt.MetaWhat&constMsgMetaCred != 0 { if err := t.replySetCred(meta.sess, asUid, authLevel, meta.pkt); err != nil { - log.Printf("topic[%s] meta.Set.Cred failed: %v", t.name, err) + logs.Warning.Printf("topic[%s] meta.Set.Cred failed: %v", t.name, err) } } @@ -423,7 +423,7 @@ func (t *Topic) runLocal(hub *Hub) { } if err != nil { - log.Printf("topic[%s] meta.Del failed: %v", t.name, err) + logs.Warning.Printf("topic[%s] meta.Del failed: %v", t.name, err) } } case upd := <-t.supd: @@ -432,7 +432,7 @@ func (t *Topic) runLocal(hub *Hub) { t.sessToForeground(upd.sess) } else if currentUA != upd.userAgent { if t.cat != types.TopicCatMe { - log.Panicln("invalid topic category in UA update", t.name) + logs.Warning.Panicln("invalid topic category in UA update", t.name) } // 'me' only. Process an update to user agent from one of the sessions. currentUA = upd.userAgent @@ -516,42 +516,42 @@ func (t *Topic) handleSubscription(h *Hub, join *sessionJoin) error { if getWhat&constMsgMetaDesc != 0 { // Send get.desc as a {meta} packet. if err := t.replyGetDesc(join.sess, asUid, msgsub.Get.Desc, join.pkt); err != nil { - log.Printf("topic[%s] handleSubscription Get.Desc failed: %v sid=%s", t.name, err, join.sess.sid) + logs.Warning.Printf("topic[%s] handleSubscription Get.Desc failed: %v sid=%s", t.name, err, join.sess.sid) } } if getWhat&constMsgMetaSub != 0 { // Send get.sub response as a separate {meta} packet if err := t.replyGetSub(join.sess, asUid, authLevel, join.pkt); err != nil { - log.Printf("topic[%s] handleSubscription Get.Sub failed: %v sid=%s", t.name, err, join.sess.sid) + logs.Warning.Printf("topic[%s] handleSubscription Get.Sub failed: %v sid=%s", t.name, err, join.sess.sid) } } if getWhat&constMsgMetaTags != 0 { // Send get.tags response as a separate {meta} packet if err := t.replyGetTags(join.sess, asUid, join.pkt); err != nil { - log.Printf("topic[%s] handleSubscription Get.Tags failed: %v sid=%s", t.name, err, join.sess.sid) + logs.Warning.Printf("topic[%s] handleSubscription Get.Tags failed: %v sid=%s", t.name, err, join.sess.sid) } } if getWhat&constMsgMetaCred != 0 { // Send get.tags response as a separate {meta} packet if err := t.replyGetCreds(join.sess, asUid, join.pkt); err != nil { - log.Printf("topic[%s] handleSubscription Get.Cred failed: %v sid=%s", t.name, err, join.sess.sid) + logs.Warning.Printf("topic[%s] handleSubscription Get.Cred failed: %v sid=%s", t.name, err, join.sess.sid) } } if getWhat&constMsgMetaData != 0 { // Send get.data response as {data} packets if err := t.replyGetData(join.sess, asUid, msgsub.Get.Data, join.pkt); err != nil { - log.Printf("topic[%s] handleSubscription Get.Data failed: %v sid=%s", t.name, err, join.sess.sid) + logs.Warning.Printf("topic[%s] handleSubscription Get.Data failed: %v sid=%s", t.name, err, join.sess.sid) } } if getWhat&constMsgMetaDel != 0 { // Send get.del response as a separate {meta} packet if err := t.replyGetDel(join.sess, asUid, msgsub.Get.Del, join.pkt); err != nil { - log.Printf("topic[%s] handleSubscription Get.Del failed: %v sid=%s", t.name, err, join.sess.sid) + logs.Warning.Printf("topic[%s] handleSubscription Get.Del failed: %v sid=%s", t.name, err, join.sess.sid) } } @@ -591,7 +591,7 @@ func (t *Topic) handleLeaveRequest(hub *Hub, leave *sessionLeave) { // User wants to leave and unsubscribe. // asUid must not be Zero. if err := t.replyLeaveUnsub(hub, leave.sess, leave.pkt, asUid); err != nil { - log.Println("failed to unsub", err, leave.sess.sid) + logs.Error.Println("failed to unsub", err, leave.sess.sid) return } } else if pssd, _ := t.remSession(leave.sess, asUid); pssd != nil { @@ -637,7 +637,7 @@ func (t *Topic) handleLeaveRequest(hub *Hub, leave *sessionLeave) { t.perUser[uid] = pud } } else if !leave.sess.isCluster() { - log.Panic("cannot determine uid: leave req=", leave) + logs.Warning.Panic("cannot determine uid: leave req=", leave) } switch t.cat { @@ -663,7 +663,7 @@ func (t *Topic) handleLeaveRequest(hub *Hub, leave *sessionLeave) { if !meUid.IsZero() { // Update user's last online timestamp & user agent. Only one user can be subscribed to 'me' topic. if err := store.Users.UpdateLastSeen(meUid, mrs.userAgent, now); err != nil { - log.Println(err) + logs.Warning.Println(err) } } case types.TopicCatFnd: @@ -802,7 +802,7 @@ func (t *Topic) sendSubNotifications(asUid types.Uid, sid, userAgent string) { if !t.isLoaded() { t.markLoaded() if err := t.loadContacts(asUid); err != nil { - log.Println("topic: failed to load contacts", t.name, err.Error()) + logs.Error.Println("topic: failed to load contacts", t.name, err.Error()) } // User online: notify users of interest without forcing response (no +en here). t.presUsersOfInterest("on", userAgent) @@ -871,7 +871,7 @@ func (t *Topic) handleBroadcast(msg *ServerComMessage) { Head: msg.Data.Head, Content: msg.Data.Content}, (userData.modeGiven & userData.modeWant).IsReader()); err != nil { - log.Printf("topic[%s]: failed to save message: %v", t.name, err) + logs.Warning.Printf("topic[%s]: failed to save message: %v", t.name, err) msg.sess.queueOut(ErrUnknown(msg.Id, t.original(asUid), msg.Timestamp)) return @@ -970,7 +970,7 @@ func (t *Topic) handleBroadcast(msg *ServerComMessage) { "ReadSeqId": pud.readID}, false); err != nil { - log.Printf("topic[%s]: failed to update SeqRead/Recv counter: %v", t.name, err) + logs.Warning.Printf("topic[%s]: failed to update SeqRead/Recv counter: %v", t.name, err) return } @@ -984,7 +984,7 @@ func (t *Topic) handleBroadcast(msg *ServerComMessage) { } } else { // TODO(gene): remove this - log.Panic("topic: wrong message type for broadcasting", t.name) + logs.Error.Panic("topic: wrong message type for broadcasting", t.name) } // Broadcast the message. Only {data}, {pres}, {info} are broadcastable. @@ -1043,13 +1043,13 @@ func (t *Topic) handleBroadcast(msg *ServerComMessage) { } // Send message to session. if !sess.queueOut(msg) { - log.Printf("topic[%s]: connection stuck, detaching - %s", t.name, sess.sid) + logs.Warning.Printf("topic[%s]: connection stuck, detaching - %s", t.name, sess.sid) // The whole session is being dropped, so sessionLeave.pkt is not set. // Must not block here: it may lead to a deadlock. select { case t.unreg <- &sessionLeave{sess: sess}: default: - log.Printf("topic[%s]: unreg queue full - %s", t.name, sess.sid) + logs.Error.Printf("topic[%s]: unreg queue full - %s", t.name, sess.sid) } } } @@ -3416,7 +3416,7 @@ func (t *Topic) remProxiedSession(sess *Session) bool { numChans := len(t.proxiedChannels) - 3 t.proxiedChannels = t.proxiedChannels[:numChans] if len(t.proxiedSessions)*3+1 != len(t.proxiedChannels) { - log.Panicf("topic[%s]: #proxied sessions (%d) vs #proxied channels mismatch (%d)", + logs.Error.Panicf("topic[%s]: #proxied sessions (%d) vs #proxied channels mismatch (%d)", t.name, len(t.proxiedSessions), len(t.proxiedChannels)) } } @@ -3477,7 +3477,7 @@ func (t *Topic) remSession(sess *Session, asUid types.Uid) (*perSessionData, boo if pssd.uid == asUid || asUid.IsZero() { delete(t.sessions, s) if s.isMultiplex() && !t.remProxiedSession(s) { - log.Printf("topic[%s]: multiplex session %s not removed from the event loop", t.name, s.sid) + logs.Error.Printf("topic[%s]: multiplex session %s not removed from the event loop", t.name, s.sid) } return &pssd, true } @@ -3490,7 +3490,7 @@ func (t *Topic) remSession(sess *Session, asUid types.Uid) (*perSessionData, boo if len(pssd.muids) == 0 { delete(t.sessions, s) if s.isMultiplex() && !t.remProxiedSession(s) { - log.Printf("topic[%s]: multiplex session %s not removed from the event loop: no more attached uids", t.name, s.sid) + logs.Error.Printf("topic[%s]: multiplex session %s not removed from the event loop: no more attached uids", t.name, s.sid) } return &pssd, true } else { diff --git a/server/topic_proxy.go b/server/topic_proxy.go index 1c2c8556..eeaea764 100644 --- a/server/topic_proxy.go +++ b/server/topic_proxy.go @@ -7,10 +7,10 @@ package main import ( - "log" "net/http" "time" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store/types" ) @@ -27,7 +27,7 @@ func (t *Topic) runProxy(hub *Hub) { } else { // Response (ctrl message) will be handled when it's received via the proxy channel. if err := globals.cluster.routeToTopicMaster(ProxyReqJoin, join.pkt, t.name, join.sess); err != nil { - log.Println("proxy topic: route join request from proxy to master failed:", err) + logs.Warning.Println("proxy topic: route join request from proxy to master failed:", err) } } if join.sess.inflightReqs != nil { @@ -36,7 +36,7 @@ func (t *Topic) runProxy(hub *Hub) { case leave := <-t.unreg: if !t.handleProxyLeaveRequest(leave, killTimer) { - log.Println("Failed to update proxy topic state for leave request", leave.sess.sid) + logs.Warning.Println("Failed to update proxy topic state for leave request", leave.sess.sid) } if leave.pkt != nil && leave.sess.inflightReqs != nil { // If it's a client initiated request. @@ -46,13 +46,13 @@ func (t *Topic) runProxy(hub *Hub) { case msg := <-t.broadcast: // Content message intended for broadcasting to recipients if err := globals.cluster.routeToTopicMaster(ProxyReqBroadcast, msg, t.name, msg.sess); err != nil { - log.Println("proxy topic: route broadcast request from proxy to master failed:", err) + logs.Warning.Println("proxy topic: route broadcast request from proxy to master failed:", err) } case meta := <-t.meta: // Request to get/set topic metadata if err := globals.cluster.routeToTopicMaster(ProxyReqMeta, meta.pkt, t.name, meta.sess); err != nil { - log.Println("proxy topic: route meta request from proxy to master failed:", err) + logs.Warning.Println("proxy topic: route meta request from proxy to master failed:", err) } case upd := <-t.supd: @@ -64,7 +64,7 @@ func (t *Topic) runProxy(hub *Hub) { // Subscribed user may not match session user. Find out who is subscribed pssd, ok := t.sessions[upd.sess] if !ok { - log.Println("proxy topic: sess update request from detached session") + logs.Warning.Println("proxy topic: sess update request from detached session") continue } req = ProxyReqBgSession @@ -73,7 +73,7 @@ func (t *Topic) runProxy(hub *Hub) { tmpSess.userAgent = upd.sess.userAgent } if err := globals.cluster.routeToTopicMaster(req, nil, t.name, tmpSess); err != nil { - log.Println("proxy topic: route sess update request from proxy to master failed:", err) + logs.Warning.Println("proxy topic: route sess update request from proxy to master failed:", err) } case msg := <-t.proxy: @@ -86,7 +86,7 @@ func (t *Topic) runProxy(hub *Hub) { } if err := globals.cluster.topicProxyGone(t.name); err != nil { - log.Printf("topic proxy shutdown [%s]: failed to notify master - %s", t.name, err) + logs.Warning.Printf("topic proxy shutdown [%s]: failed to notify master - %s", t.name, err) } // Report completion back to sender, if 'done' is not nil. @@ -116,7 +116,7 @@ func (t *Topic) handleProxyLeaveRequest(leave *sessionLeave, killTimer *time.Tim if pssd, ok := t.sessions[leave.sess]; ok { asUid = pssd.uid } else { - log.Println("proxy topic: leave request sent for unknown session") + logs.Warning.Println("proxy topic: leave request sent for unknown session") return false } } @@ -136,7 +136,7 @@ func (t *Topic) handleProxyLeaveRequest(leave *sessionLeave, killTimer *time.Tim pkt = leave.pkt } if err := globals.cluster.routeToTopicMaster(ProxyReqLeave, pkt, t.name, leave.sess); err != nil { - log.Println("proxy topic: route broadcast request from proxy to master failed:", err) + logs.Warning.Println("proxy topic: route broadcast request from proxy to master failed:", err) } if len(t.sessions) == 0 { // No more sessions attached. Start the countdown. @@ -169,7 +169,7 @@ func (t *Topic) proxyMasterResponse(msg *ClusterResp, killTimer *time.Timer) { } else { sess := globals.sessionStore.Get(msg.OrigSid) if sess == nil { - log.Println("topic_proxy: session not found; already terminated?", msg.OrigSid) + logs.Warning.Println("topic_proxy: session not found; already terminated?", msg.OrigSid) } switch msg.OrigReqType { case ProxyReqJoin: @@ -213,12 +213,12 @@ func (t *Topic) proxyMasterResponse(msg *ClusterResp, killTimer *time.Timer) { } default: - log.Printf("proxy topic [%s] received response referencing unexpected request type %d", + logs.Error.Printf("proxy topic [%s] received response referencing unexpected request type %d", t.name, msg.OrigReqType) } if sess != nil && !sess.queueOut(msg.SrvMsg) { - log.Println("topic proxy: timeout") + logs.Error.Println("topic proxy: timeout") } } } @@ -228,7 +228,7 @@ func (t *Topic) proxyCtrlBroadcast(msg *ServerComMessage) { if msg.Ctrl.Code == http.StatusResetContent && msg.Ctrl.Text == "evicted" { // We received a ctrl command for evicting a user. if msg.uid.IsZero() { - log.Panicf("topic[%s]: proxy received evict message with empty uid", t.name) + logs.Error.Panicf("topic[%s]: proxy received evict message with empty uid", t.name) } for sess := range t.sessions { // Proxy topic may only have ordinary sessions. No multiplexing or proxy sessions here. @@ -247,18 +247,18 @@ func (t *Topic) updateAcsFromPresMsg(pres *MsgServerPres) { uid := types.ParseUserId(pres.Src) dacs := pres.Acs if uid.IsZero() { - log.Printf("proxy topic[%s]: received acs change for invalid user id '%s'", t.name, pres.Src) + logs.Warning.Printf("proxy topic[%s]: received acs change for invalid user id '%s'", t.name, pres.Src) return } // If t.perUser[uid] does not exist, pud is initialized with blanks, otherwise it gets existing values. pud := t.perUser[uid] if err := pud.modeWant.ApplyMutation(dacs.Want); err != nil { - log.Printf("proxy topic[%s]: could not process acs change - want: %+v", t.name, err) + logs.Warning.Printf("proxy topic[%s]: could not process acs change - want: %+v", t.name, err) return } if err := pud.modeGiven.ApplyMutation(dacs.Given); err != nil { - log.Printf("proxy topic[%s]: could not process acs change - given: %+v", t.name, err) + logs.Warning.Printf("proxy topic[%s]: could not process acs change - given: %+v", t.name, err) return } // Update existing or add new. diff --git a/server/user.go b/server/user.go index d25752e6..245125bf 100644 --- a/server/user.go +++ b/server/user.go @@ -1,11 +1,11 @@ package main import ( - "log" "time" "github.com/tinode/chat/server/auth" "github.com/tinode/chat/server/push" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" ) @@ -15,7 +15,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { // The session cannot authenticate with the new account because it's already authenticated. if msg.Acc.Login && (!s.uid.IsZero() || rec != nil) { s.queueOut(ErrAlreadyAuthenticated(msg.Id, "", msg.Timestamp)) - log.Println("create user: login requested while authenticated", s.sid) + logs.Warning.Println("create user: login requested while authenticated", s.sid) return } @@ -24,13 +24,13 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { if authhdl == nil { // New accounts must have an authentication scheme s.queueOut(ErrMalformed(msg.Id, "", msg.Timestamp)) - log.Println("create user: unknown auth handler", s.sid) + logs.Warning.Println("create user: unknown auth handler", s.sid) return } // Check if login is unique. if ok, err := authhdl.IsUnique(msg.Acc.Secret, s.remoteAddr); !ok { - log.Println("create user: auth secret is not unique", err, s.sid) + logs.Warning.Println("create user: auth secret is not unique", err, s.sid) s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, map[string]interface{}{"what": "auth"})) return @@ -42,7 +42,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { // If account state is being assigned, make sure the sender is a root user. if msg.Acc.State != "" { if auth.Level(msg.AuthLvl) != auth.LevelRoot { - log.Println("create user: attempt to set account state by non-root", s.sid) + logs.Warning.Println("create user: attempt to set account state by non-root", s.sid) msg := ErrPermissionDenied(msg.Id, "", msg.Timestamp) msg.Ctrl.Params = map[string]interface{}{"what": "state"} s.queueOut(msg) @@ -51,7 +51,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { state, err := types.NewObjState(msg.Acc.State) if err != nil || state == types.StateUndefined || state == types.StateDeleted { - log.Println("create user: invalid account state", err, s.sid) + logs.Warning.Println("create user: invalid account state", err, s.sid) s.queueOut(ErrMalformed(msg.Id, "", msg.Timestamp)) return } @@ -61,7 +61,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { // Ensure tags are unique and not restricted. if tags := normalizeTags(msg.Acc.Tags); tags != nil { if !restrictedTagsEqual(tags, nil, globals.immutableTagNS) { - log.Println("create user: attempt to directly assign restricted tags", s.sid) + logs.Warning.Println("create user: attempt to directly assign restricted tags", s.sid) msg := ErrPermissionDenied(msg.Id, "", msg.Timestamp) msg.Ctrl.Params = map[string]interface{}{"what": "tags"} s.queueOut(msg) @@ -77,7 +77,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { cr := &creds[i] vld := store.GetValidator(cr.Method) if _, err := vld.PreCheck(cr.Value, cr.Params); err != nil { - log.Println("create user: failed credential pre-check", cr, err, s.sid) + logs.Warning.Println("create user: failed credential pre-check", cr, err, s.sid) s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, map[string]interface{}{"what": cr.Method})) return @@ -118,7 +118,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { // Create user record in the database. if _, err := store.Users.Create(&user, private); err != nil { - log.Println("create user: failed to create user", err, s.sid) + logs.Warning.Println("create user: failed to create user", err, s.sid) s.queueOut(ErrUnknown(msg.Id, "", msg.Timestamp)) return } @@ -126,7 +126,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { // Add authentication record. The authhdl.AddRecord may change tags. rec, err := authhdl.AddRecord(&auth.Rec{Uid: user.Uid(), Tags: user.Tags}, msg.Acc.Secret, s.remoteAddr) if err != nil { - log.Println("create user: add auth record failed", err, s.sid) + logs.Warning.Println("create user: add auth record failed", err, s.sid) // Attempt to delete incomplete user record store.Users.Delete(user.Uid(), false) s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, nil)) @@ -136,7 +136,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { // When creating an account, the user must provide all required credentials. // If any are missing, reject the request. if len(creds) < len(globals.authValidators[rec.AuthLevel]) { - log.Println("create user: missing credentials; have:", creds, "want:", globals.authValidators[rec.AuthLevel], s.sid) + logs.Warning.Println("create user: missing credentials; have:", creds, "want:", globals.authValidators[rec.AuthLevel], s.sid) // Attempt to delete incomplete user record store.Users.Delete(user.Uid(), false) _, missing := stringSliceDelta(globals.authValidators[rec.AuthLevel], credentialMethods(creds)) @@ -155,7 +155,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { if err != nil { // Delete incomplete user record. store.Users.Delete(user.Uid(), false) - log.Println("create user: failed to save or validate credential", err, s.sid) + logs.Warning.Println("create user: failed to save or validate credential", err, s.sid) s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, nil)) return } @@ -194,12 +194,12 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { func replyUpdateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { if s.uid.IsZero() && rec == nil { // Session is not authenticated and no token provided. - log.Println("replyUpdateUser: not a new account and not authenticated", s.sid) + logs.Warning.Println("replyUpdateUser: not a new account and not authenticated", s.sid) s.queueOut(ErrPermissionDenied(msg.Id, "", msg.Timestamp)) return } else if msg.AsUser != "" && rec != nil { // Two UIDs: one from msg.from, one from token. Ambigous, reject. - log.Println("replyUpdateUser: got both authenticated session and token", s.sid) + logs.Warning.Println("replyUpdateUser: got both authenticated session and token", s.sid) s.queueOut(ErrMalformed(msg.Id, "", msg.Timestamp)) return } @@ -213,7 +213,7 @@ func replyUpdateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { if msg.Acc.User != "" && msg.Acc.User != userId { if s.authLvl != auth.LevelRoot { - log.Println("replyUpdateUser: attempt to change another's account by non-root", s.sid) + logs.Warning.Println("replyUpdateUser: attempt to change another's account by non-root", s.sid) s.queueOut(ErrPermissionDenied(msg.Id, "", msg.Timestamp)) return } @@ -226,14 +226,14 @@ func replyUpdateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { if uid.IsZero() { // msg.Acc.User contains invalid data. s.queueOut(ErrMalformed(msg.Id, "", msg.Timestamp)) - log.Println("replyUpdateUser: user id is invalid or missing", s.sid) + logs.Warning.Println("replyUpdateUser: user id is invalid or missing", s.sid) return } // Only root can suspend accounts, including own account. if msg.Acc.State != "" && s.authLvl != auth.LevelRoot { s.queueOut(ErrPermissionDenied(msg.Id, "", msg.Timestamp)) - log.Println("replyUpdateUser: attempt to change account state by non-root", s.sid) + logs.Warning.Println("replyUpdateUser: attempt to change account state by non-root", s.sid) return } @@ -242,7 +242,7 @@ func replyUpdateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { err = types.ErrNotFound } if err != nil { - log.Println("replyUpdateUser: failed to fetch user from DB", err, s.sid) + logs.Warning.Println("replyUpdateUser: failed to fetch user from DB", err, s.sid) s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, nil)) return } @@ -254,7 +254,7 @@ func replyUpdateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { if authLvl == auth.LevelNone { // msg.Acc.AuthLevel contains invalid data. s.queueOut(ErrMalformed(msg.Id, "", msg.Timestamp)) - log.Println("replyUpdateUser: auth level is missing", s.sid) + logs.Warning.Println("replyUpdateUser: auth level is missing", s.sid) return } // Handle request to update credentials. @@ -288,7 +288,7 @@ func replyUpdateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) { } if err != nil { - log.Println("replyUpdateUser: failed to update user", err, s.sid) + logs.Warning.Println("replyUpdateUser: failed to update user", err, s.sid) s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, nil)) return } @@ -315,7 +315,7 @@ func updateUserAuth(msg *ClientComMessage, user *types.User, rec *auth.Rec, remo // Tags may have been changed by authhdl.UpdateRecord, reset them. // Can't do much with the error here, logging it but not returning. if _, err = store.Users.UpdateTags(user.Uid(), nil, nil, rec.Tags); err != nil { - log.Println("updateUserAuth tags update failed:", err) + logs.Warning.Println("updateUserAuth tags update failed:", err) } return nil } @@ -359,7 +359,7 @@ func addCreds(uid types.Uid, creds []MsgCredClient, extraTags []string, lang str if utags, err := store.Users.UpdateTags(uid, extraTags, nil, nil); err == nil { extraTags = utags } else { - log.Println("add cred tags update failed:", err) + logs.Warning.Println("add cred tags update failed:", err) } } else { extraTags = nil @@ -430,7 +430,7 @@ func validatedCreds(uid types.Uid, authLvl auth.Level, creds []MsgCredClient, er if utags, err := store.Users.UpdateTags(uid, tagsToAdd, nil, nil); err == nil { tags = utags } else { - log.Println("validated creds tags update failed:", err) + logs.Warning.Println("validated creds tags update failed:", err) tags = nil } } else { @@ -506,7 +506,7 @@ func deleteCred(uid types.Uid, authLvl auth.Level, cred *MsgCredClient) ([]strin if utags, err := store.Users.UpdateTags(uid, nil, []string{cred.Method + ":" + cred.Value}, nil); err == nil { tags = utags } else { - log.Println("delete cred: failed to update tags:", err) + logs.Warning.Println("delete cred: failed to update tags:", err) tags = nil } } else { @@ -525,7 +525,7 @@ func deleteCred(uid types.Uid, authLvl auth.Level, cred *MsgCredClient) ([]strin func changeUserState(s *Session, uid types.Uid, user *types.User, msg *ClientComMessage) (bool, error) { state, err := types.NewObjState(msg.Acc.State) if err != nil || state == types.StateUndefined { - log.Println("replyUpdateUser: invalid account state", s.sid) + logs.Warning.Println("replyUpdateUser: invalid account state", s.sid) return false, types.ErrMalformed } @@ -569,12 +569,12 @@ func replyDelUser(s *Session, msg *ClientComMessage) { // Delete another user. uid = types.ParseUserId(msg.Del.User) if uid.IsZero() { - log.Println("replyDelUser: invalid user ID", msg.Del.User, s.sid) + logs.Warning.Println("replyDelUser: invalid user ID", msg.Del.User, s.sid) s.queueOut(ErrMalformed(msg.Id, "", msg.Timestamp)) return } } else { - log.Println("replyDelUser: illegal attempt to delete another user", msg.Del.User, s.sid) + logs.Warning.Println("replyDelUser: illegal attempt to delete another user", msg.Del.User, s.sid) s.queueOut(ErrPermissionDenied(msg.Id, "", msg.Timestamp)) return } @@ -584,7 +584,7 @@ func replyDelUser(s *Session, msg *ClientComMessage) { for _, name := range authnames { if err := store.GetAuthHandler(name).DelRecords(uid); err != nil { // This could be completely benign, i.e. authenticator exists but not used. - log.Println("replyDelUser: failed to delete auth record", uid.UserId(), name, err, s.sid) + logs.Warning.Println("replyDelUser: failed to delete auth record", uid.UserId(), name, err, s.sid) if storeErr, ok := err.(types.StoreError); ok && storeErr == types.ErrUnsupported { // Authenticator refused to delete record: user account cannot be deleted. s.queueOut(ErrOperationNotAllowed(msg.Id, "", msg.Timestamp)) @@ -607,7 +607,7 @@ func replyDelUser(s *Session, msg *ClientComMessage) { if uoi, err := store.Users.GetSubs(uid, nil); err == nil { presUsersOfInterestOffline(uid, uoi, "gone") } else { - log.Println("replyDelUser: failed to send notifications to users", err, s.sid) + logs.Warning.Println("replyDelUser: failed to send notifications to users", err, s.sid) } // Notify subscribers of the group topics where the user was the owner that the topics were deleted. @@ -616,18 +616,18 @@ func replyDelUser(s *Session, msg *ClientComMessage) { if subs, err := store.Topics.GetSubs(topicName, nil); err == nil { presSubsOfflineOffline(topicName, types.TopicCatGrp, subs, "gone", &presParams{}, s.sid) } else { - log.Println("replyDelUser: failed to notify topic subscribers", err, topicName, s.sid) + logs.Warning.Println("replyDelUser: failed to notify topic subscribers", err, topicName, s.sid) } } } else { - log.Println("replyDelUser: failed to send notifications to owned topics", err, s.sid) + logs.Warning.Println("replyDelUser: failed to send notifications to owned topics", err, s.sid) } // TODO: suspend all P2P topics with the user. // Delete user's records from the database. if err := store.Users.Delete(uid, msg.Del.Hard); err != nil { - log.Println("replyDelUser: failed to delete user", err, s.sid) + logs.Warning.Println("replyDelUser: failed to delete user", err, s.sid) s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, nil)) return } @@ -858,14 +858,14 @@ func userUpdater() { unreadUpdater := func(uid types.Uid, val int, inc bool) int { uce, ok := usersCache[uid] if !ok { - log.Println("ERROR: attempt to update unread count for user who has not been loaded") + logs.Error.Println("ERROR: attempt to update unread count for user who has not been loaded") return -1 } if uce.unread < 0 { count, err := store.Users.GetUnreadCount(uid) if err != nil { - log.Println("users: failed to load unread count", err) + logs.Warning.Println("users: failed to load unread count", err) return -1 } uce.unread = count @@ -930,7 +930,7 @@ func userUpdater() { } } else { // BUG! - log.Println("ERROR: request to unregister user which has not been registered", uid) + logs.Error.Println("ERROR: request to unregister user which has not been registered", uid) } } continue @@ -946,5 +946,5 @@ func userUpdater() { unreadUpdater(upd.UserId, upd.Unread, upd.Inc) } - log.Println("users: shutdown") + logs.Info.Println("users: shutdown") } diff --git a/server/utils.go b/server/utils.go index 5099941b..d43314e4 100644 --- a/server/utils.go +++ b/server/utils.go @@ -7,7 +7,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "net" "path/filepath" "reflect" @@ -20,6 +19,7 @@ import ( "unicode/utf8" "github.com/tinode/chat/server/auth" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" @@ -457,7 +457,7 @@ func rewriteTag(orig, countryCode string, withLogin bool) string { return orig } - log.Printf("invalid generic tag '%s'", orig) + logs.Warning.Printf("invalid generic tag '%s'", orig) return "" } diff --git a/server/validate/email/validate.go b/server/validate/email/validate.go index 4b064fb0..bec1cfa8 100644 --- a/server/validate/email/validate.go +++ b/server/validate/email/validate.go @@ -8,7 +8,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "math/rand" qp "mime/quotedprintable" "net/mail" @@ -21,6 +20,7 @@ import ( textt "text/template" "time" + "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store" t "github.com/tinode/chat/server/store/types" i18n "golang.org/x/text/language" @@ -545,7 +545,7 @@ func (v *validator) send(to string, content *emailContent) error { err := v.sendMail([]string{to}, message.Bytes()) if err != nil { - log.Println("SMTP error", to, err) + logs.Warning.Println("SMTP error", to, err) } return err