Split logs into info, warning, errors.

This commit is contained in:
aforge
2020-12-24 00:00:07 +03:00
parent 3c1c3b1a9e
commit ac154abfff
33 changed files with 419 additions and 388 deletions

View File

@ -13,7 +13,8 @@ import (
"crypto/hmac"
"crypto/md5"
"encoding/base64"
"log"
"github.com/tinode/chat/server/logs"
)
// Singned AppID. Composition:
@ -46,11 +47,11 @@ func checkAPIKey(apikey string) (isValid, isRoot bool) {
data, err := base64.URLEncoding.DecodeString(apikey)
if err != nil {
log.Println("failed to decode.base64 appid ", err)
logs.Warning.Println("failed to decode.base64 appid ", err)
return
}
if data[0] != 1 {
log.Println("unknown appid signature algorithm ", data[0])
logs.Warning.Println("unknown appid signature algorithm ", data[0])
return
}
@ -58,7 +59,7 @@ func checkAPIKey(apikey string) (isValid, isRoot bool) {
hasher.Write(data[:apikeyVersion+apikeyAppID+apikeySequence+apikeyWho])
check := hasher.Sum(nil)
if !bytes.Equal(data[apikeyVersion+apikeyAppID+apikeySequence+apikeyWho:], check) {
log.Println("invalid apikey signature")
logs.Warning.Println("invalid apikey signature")
return
}

View File

@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"io/ioutil"
"log"
"net/http"
"net/url"
"regexp"
@ -14,6 +13,7 @@ import (
"time"
"github.com/tinode/chat/server/auth"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
)
@ -179,7 +179,7 @@ func (a *authenticator) Authenticate(secret []byte, remoteAddr string) (*auth.Re
// Auth record not found.
if resp.Record == nil {
log.Println("rest_auth: invalid response: missing Record")
logs.Warning.Println("rest_auth: invalid response: missing Record")
return nil, nil, types.ErrInternal
}
@ -276,7 +276,7 @@ func (a *authenticator) RestrictedTags() ([]string, error) {
if len(resp.ByteVal) > 0 {
a.reToken, err = regexp.Compile(string(resp.ByteVal))
if err != nil {
log.Println("rest_auth: invalid token regexp", string(resp.ByteVal))
logs.Warning.Println("rest_auth: invalid token regexp", string(resp.ByteVal))
}
}
return resp.StrSliceVal, nil

View File

@ -4,7 +4,6 @@ import (
"encoding/gob"
"encoding/json"
"errors"
"log"
"net"
"net/rpc"
"reflect"
@ -14,6 +13,7 @@ import (
"time"
"github.com/tinode/chat/server/auth"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/push"
rh "github.com/tinode/chat/server/ringhash"
"github.com/tinode/chat/server/store/types"
@ -256,7 +256,7 @@ func (n *ClusterNode) reconnect() {
n.reconnecting = false
n.lock.Unlock()
statsInc("LiveClusterNodes", 1)
log.Println("cluster: connected to", n.name)
logs.Info.Println("cluster: connected to", n.name)
// Send this node credentials to the new node.
var unused bool
n.call("Cluster.Ping", &ClusterPing{
@ -274,7 +274,7 @@ func (n *ClusterNode) reconnect() {
// Wait for timer to try to reconnect again. Do nothing if the timer is inactive.
case <-n.done:
// Shutting down
log.Println("cluster: shutdown started at node", n.name)
logs.Info.Println("cluster: shutdown started at node", n.name)
reconnTicker.Stop()
if n.endpoint != nil {
n.endpoint.Close()
@ -283,7 +283,7 @@ func (n *ClusterNode) reconnect() {
n.connected = false
n.reconnecting = false
n.lock.Unlock()
log.Println("cluster: shut down completed at node", n.name)
logs.Info.Println("cluster: shut down completed at node", n.name)
return
}
}
@ -295,7 +295,7 @@ func (n *ClusterNode) call(proc string, req, resp interface{}) error {
}
if err := n.endpoint.Call(proc, req, resp); err != nil {
log.Println("cluster: call failed", n.name, err)
logs.Warning.Println("cluster: call failed", n.name, err)
n.lock.Lock()
if n.connected {
@ -313,7 +313,7 @@ func (n *ClusterNode) call(proc string, req, resp interface{}) error {
func (n *ClusterNode) handleRpcResponse(call *rpc.Call) {
if call.Error != nil {
log.Printf("cluster: %s call failed: %s", call.ServiceMethod, call.Error)
logs.Warning.Printf("cluster: %s call failed: %s", call.ServiceMethod, call.Error)
n.lock.Lock()
if n.connected {
n.endpoint.Close()
@ -327,7 +327,7 @@ func (n *ClusterNode) handleRpcResponse(call *rpc.Call) {
func (n *ClusterNode) callAsync(proc string, req, resp interface{}, done chan *rpc.Call) *rpc.Call {
if done != nil && cap(done) == 0 {
log.Panic("cluster: RPC done channel is unbuffered")
logs.Error.Panic("cluster: RPC done channel is unbuffered")
}
if !n.connected {
@ -425,7 +425,7 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error {
node := c.nodes[msg.Node]
if node == nil {
log.Println("cluster TopicMaster: request from an unknown node", msg.Node)
logs.Warning.Println("cluster TopicMaster: request from an unknown node", msg.Node)
return nil
}
@ -447,7 +447,7 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error {
}
if msg.Signature != c.ring.Signature() {
log.Println("cluster TopicMaster: session signature mismatch", msg.RcptTo)
logs.Warning.Println("cluster TopicMaster: session signature mismatch", msg.RcptTo)
*rejected = true
return nil
}
@ -461,7 +461,7 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error {
node.msess[msid] = struct{}{}
node.lock.Unlock()
log.Println("cluster: multiplexing session started", msid, count)
logs.Info.Println("cluster: multiplexing session started", msid, count)
}
// This is a local copy of a remote session.
@ -496,7 +496,7 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error {
default:
// Reply with a 500 to the user.
sess.queueOut(ErrUnknownReply(msg.CliMsg, msg.CliMsg.Timestamp))
log.Println("cluster: join req failed - hub.join queue full, topic ", msg.CliMsg.RcptTo, "; orig sid ", sess.sid)
logs.Warning.Println("cluster: join req failed - hub.join queue full, topic ", msg.CliMsg.RcptTo, "; orig sid ", sess.sid)
}
case ProxyReqLeave:
@ -506,7 +506,7 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error {
sess: sess,
}
} else {
log.Println("cluster: leave request for unknown topic", msg.RcptTo)
logs.Warning.Println("cluster: leave request for unknown topic", msg.RcptTo)
}
case ProxyReqMeta:
@ -518,10 +518,10 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error {
}:
default:
sess.queueOut(ErrUnknownReply(msg.CliMsg, msg.CliMsg.Timestamp))
log.Println("cluster: meta req failed - topic.meta queue full, topic ", msg.CliMsg.RcptTo, "; orig sid ", sess.sid)
logs.Warning.Println("cluster: meta req failed - topic.meta queue full, topic ", msg.CliMsg.RcptTo, "; orig sid ", sess.sid)
}
} else {
log.Println("cluster: meta request for unknown topic", msg.RcptTo)
logs.Warning.Println("cluster: meta request for unknown topic", msg.RcptTo)
}
case ProxyReqBroadcast:
@ -530,13 +530,13 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error {
select {
case globals.hub.route <- msg.SrvMsg:
default:
log.Println("cluster: route req failed - hub.route queue full")
logs.Error.Println("cluster: route req failed - hub.route queue full")
}
case ProxyReqBgSession, ProxyReqMeUserAgent:
if t := globals.hub.topicGet(msg.RcptTo); t != nil {
if t.supd == nil {
log.Panicln("cluster: invalid topic category in session update", t.name, msg.ReqType)
logs.Error.Panicln("cluster: invalid topic category in session update", t.name, msg.ReqType)
}
su := &sessionUpdate{}
if msg.ReqType == ProxyReqBgSession {
@ -546,11 +546,11 @@ func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error {
}
t.supd <- su
} else {
log.Println("cluster: session update for unknown topic", msg.RcptTo, msg.ReqType)
logs.Warning.Println("cluster: session update for unknown topic", msg.RcptTo, msg.ReqType)
}
default:
log.Println("cluster: unknown request type", msg.ReqType, msg.RcptTo)
logs.Warning.Println("cluster: unknown request type", msg.ReqType, msg.RcptTo)
*rejected = true
}
@ -565,7 +565,7 @@ func (Cluster) TopicProxy(msg *ClusterResp, unused *bool) error {
msg.SrvMsg.uid = types.ParseUserId(msg.SrvMsg.AsUser)
t.proxy <- msg
} else {
log.Println("cluster: master response for unknown topic", msg.RcptTo)
logs.Warning.Println("cluster: master response for unknown topic", msg.RcptTo)
}
return nil
@ -580,7 +580,7 @@ func (c *Cluster) Route(msg *ClusterRoute, rejected *bool) error {
if msg.Sess != nil {
sid = msg.Sess.Sid
}
log.Println("cluster Route: session signature mismatch", sid)
logs.Warning.Println("cluster Route: session signature mismatch", sid)
*rejected = true
return nil
}
@ -590,7 +590,7 @@ func (c *Cluster) Route(msg *ClusterRoute, rejected *bool) error {
sid = msg.Sess.Sid
}
// TODO: maybe panic here.
log.Println("cluster Route: nil server message", sid)
logs.Warning.Println("cluster Route: nil server message", sid)
*rejected = true
return nil
}
@ -621,7 +621,7 @@ func (c *Cluster) UserCacheUpdate(msg *UserCacheReq, rejected *bool) error {
func (c *Cluster) Ping(ping *ClusterPing, unused *bool) error {
node := c.nodes[ping.Node]
if node == nil {
log.Println("cluster Ping from unknown node", ping.Node)
logs.Warning.Println("cluster Ping from unknown node", ping.Node)
return nil
}
@ -717,14 +717,14 @@ func (c *Cluster) routeUserReq(req *UserCacheReq) error {
func (c *Cluster) nodeForTopic(topic string) *ClusterNode {
key := c.ring.Get(topic)
if key == c.thisNodeName {
log.Println("cluster: request to route to self")
logs.Error.Println("cluster: request to route to self")
// Do not route to self
return nil
}
node := c.nodes[key]
if node == nil {
log.Println("cluster: no node for topic", topic, key)
logs.Warning.Println("cluster: no node for topic", topic, key)
}
return node
}
@ -901,7 +901,7 @@ func (c *Cluster) topicProxyGone(topicName string) error {
// Returns snowflake worker id
func clusterInit(configString json.RawMessage, self *string) int {
if globals.cluster != nil {
log.Fatal("Cluster already initialized.")
logs.Error.Fatal("Cluster already initialized.")
}
// Registering variables even if it's a standalone server. Otherwise monitoring software will
@ -916,13 +916,13 @@ func clusterInit(configString json.RawMessage, self *string) int {
// This is a standalone server, not initializing
if len(configString) == 0 {
log.Println("Cluster: running as a standalone server.")
logs.Info.Println("Cluster: running as a standalone server.")
return 1
}
var config clusterConfig
if err := json.Unmarshal(configString, &config); err != nil {
log.Fatal(err)
logs.Error.Fatal(err)
}
thisName := *self
@ -932,7 +932,7 @@ func clusterInit(configString json.RawMessage, self *string) int {
// Name of the current node is not specified: clustering disabled.
if thisName == "" {
log.Println("Cluster: running as a standalone server.")
logs.Info.Println("Cluster: running as a standalone server.")
return 1
}
@ -943,7 +943,7 @@ func clusterInit(configString json.RawMessage, self *string) int {
gob.Register(MsgAccessMode{})
if config.NumProxyEventGoRoutines != 0 {
log.Println("Cluster config: field num_proxy_event_goroutines is deprecated.")
logs.Warning.Println("Cluster config: field num_proxy_event_goroutines is deprecated.")
}
globals.cluster = &Cluster{
@ -970,7 +970,7 @@ func clusterInit(configString json.RawMessage, self *string) int {
if len(globals.cluster.nodes) == 0 {
// Cluster needs at least two nodes.
log.Fatal("Cluster: invalid cluster size: 1")
logs.Error.Fatal("Cluster: invalid cluster size: 1")
}
if !globals.cluster.failoverInit(config.Failover) {
@ -988,7 +988,7 @@ func clusterInit(configString json.RawMessage, self *string) int {
// Proxied session is being closed at the Master node
func (sess *Session) closeRPC() {
if sess.isMultiplex() {
log.Println("cluster: session proxy closed", sess.sid)
logs.Info.Println("cluster: session proxy closed", sess.sid)
}
}
@ -996,13 +996,13 @@ func (sess *Session) closeRPC() {
func (c *Cluster) start() {
addr, err := net.ResolveTCPAddr("tcp", c.listenOn)
if err != nil {
log.Fatal(err)
logs.Error.Fatal(err)
}
c.inbound, err = net.ListenTCP("tcp", addr)
if err != nil {
log.Fatal(err)
logs.Error.Fatal(err)
}
for _, n := range c.nodes {
@ -1017,12 +1017,12 @@ func (c *Cluster) start() {
err = rpc.Register(c)
if err != nil {
log.Fatal(err)
logs.Error.Fatal(err)
}
go rpc.Accept(c.inbound)
log.Printf("Cluster of %d nodes initialized, node '%s' is listening on [%s]", len(globals.cluster.nodes)+1,
logs.Info.Printf("Cluster of %d nodes initialized, node '%s' is listening on [%s]", len(globals.cluster.nodes)+1,
globals.cluster.thisNodeName, c.listenOn)
}
@ -1046,7 +1046,7 @@ func (c *Cluster) shutdown() {
n.done <- true
}
log.Println("Cluster shut down")
logs.Info.Println("Cluster shut down")
}
// Recalculate the ring hash using provided list of nodes or only nodes in a non-failed state.
@ -1144,7 +1144,7 @@ func (t *Topic) clusterSelectProxyEvent() (event ProxyEventType, s *Session, val
}
chosen, value, ok := reflect.Select(t.proxiedChannels)
if !ok {
log.Printf("topic[%s]: clusterWriteLoop EOF - quitting", t.name)
logs.Warning.Printf("topic[%s]: clusterWriteLoop EOF - quitting", t.name)
return EventAbort, nil, nil
}
if chosen == 0 {
@ -1152,14 +1152,14 @@ func (t *Topic) clusterSelectProxyEvent() (event ProxyEventType, s *Session, val
return EventContinue, nil, nil
}
if len(t.proxiedSessions) == 0 {
log.Printf("topic[%s]: clusterWriteLoop - no more proxied sessions (num proxied channels: %d). Quitting.",
logs.Info.Printf("topic[%s]: clusterWriteLoop - no more proxied sessions (num proxied channels: %d). Quitting.",
t.name, len(t.proxiedChannels))
return EventAbort, nil, nil
}
chosen--
sessionIdx := chosen / 3
if sessionIdx >= len(t.proxiedSessions) {
log.Printf("topic[%s]: clusterWriteLoop - invalid proxiedSessions index %d (num proxied sessions %d)", t.name, chosen, len(t.proxiedSessions))
logs.Error.Printf("topic[%s]: clusterWriteLoop - invalid proxiedSessions index %d (num proxied sessions %d)", t.name, chosen, len(t.proxiedSessions))
return EventAbort, nil, nil
}
sess := t.proxiedSessions[sessionIdx]
@ -1188,7 +1188,7 @@ func (t *Topic) clusterWriteLoop() {
}
}()
log.Printf("topic[%s]: starting cluster write loop", t.name)
logs.Info.Printf("topic[%s]: starting cluster write loop", t.name)
for {
// t.m
event, sess, value := t.clusterSelectProxyEvent()
@ -1214,11 +1214,11 @@ func (t *Topic) clusterWriteLoop() {
if srvMsg.Data != nil || srvMsg.Pres != nil || srvMsg.Info != nil {
response.OrigSid = "*"
} else if srvMsg.Ctrl == nil {
log.Println("cluster: request type not set in clusterWriteLoop", sess.sid,
logs.Warning.Println("cluster: request type not set in clusterWriteLoop", sess.sid,
srvMsg.describe(), "src_sid:", srvMsg.sess.sid)
}
default:
log.Panicln("cluster: unknown request type in clusterWriteLoop", srvMsg.sess.proxyReq)
logs.Error.Panicln("cluster: unknown request type in clusterWriteLoop", srvMsg.sess.proxyReq)
}
}
@ -1226,7 +1226,7 @@ func (t *Topic) clusterWriteLoop() {
response.RcptTo = t.name
if err := sess.clnode.masterToProxyAsync(response); err != nil {
log.Printf("cluster: response to proxy failed \"%s\": %s", sess.sid, err.Error())
logs.Warning.Printf("cluster: response to proxy failed \"%s\": %s", sess.sid, err.Error())
return
}
case EventStop: // sess.stop

View File

@ -1,11 +1,12 @@
package main
import (
"log"
"math/rand"
"net/rpc"
"sync"
"time"
"github.com/tinode/chat/server/logs"
)
// Cluster methods related to leader node election. Based on ideas from Raft protocol.
@ -89,7 +90,7 @@ func (c *Cluster) failoverInit(config *clusterFailoverConfig) bool {
return false
}
if len(c.nodes) < 2 {
log.Printf("cluster: failover disabled; need at least 3 nodes, got %d", len(c.nodes)+1)
logs.Error.Printf("cluster: failover disabled; need at least 3 nodes, got %d", len(c.nodes)+1)
return false
}
@ -116,7 +117,7 @@ func (c *Cluster) failoverInit(config *clusterFailoverConfig) bool {
electionVote: make(chan *ClusterVote, len(c.nodes)),
done: make(chan bool, 1)}
log.Println("cluster: failover mode enabled")
logs.Info.Println("cluster: failover mode enabled")
return true
}
@ -185,7 +186,7 @@ func (c *Cluster) sendHealthChecks() {
c.invalidateProxySubs("")
c.gcProxySessions(activeNodes)
log.Println("cluster: initiating failover rehash for nodes", activeNodes)
logs.Info.Println("cluster: initiating failover rehash for nodes", activeNodes)
globals.hub.rehash <- true
}
}
@ -198,7 +199,7 @@ func (c *Cluster) electLeader() {
// Make sure the current node does not report itself as a leader.
statsSet("ClusterLeader", 0)
log.Println("cluster: leading new election for term", c.fo.term)
logs.Info.Println("cluster: leading new election for term", c.fo.term)
nodeCount := len(c.nodes)
// Number of votes needed to elect the leader
@ -245,7 +246,7 @@ func (c *Cluster) electLeader() {
// Current node elected as the leader.
c.fo.leader = c.thisNodeName
statsSet("ClusterLeader", 1)
log.Printf("'%s' elected self as a new leader", c.thisNodeName)
logs.Info.Printf("'%s' elected self as a new leader", c.thisNodeName)
}
}
@ -281,21 +282,21 @@ func (c *Cluster) run() {
if health.Term < c.fo.term {
// This is a health check from a stale leader. Ignore.
log.Println("cluster: health check from a stale leader", health.Term, c.fo.term, health.Leader, c.fo.leader)
logs.Warning.Println("cluster: health check from a stale leader", health.Term, c.fo.term, health.Leader, c.fo.leader)
continue
}
if health.Term > c.fo.term {
c.fo.term = health.Term
c.fo.leader = health.Leader
log.Printf("cluster: leader '%s' elected", c.fo.leader)
logs.Info.Printf("cluster: leader '%s' elected", c.fo.leader)
} else if health.Leader != c.fo.leader {
if c.fo.leader != "" {
// Wrong leader. It's a bug, should never happen!
log.Printf("cluster: wrong leader '%s' while expecting '%s'; term %d",
logs.Error.Printf("cluster: wrong leader '%s' while expecting '%s'; term %d",
health.Leader, c.fo.leader, health.Term)
} else {
log.Printf("cluster: leader set to '%s'", health.Leader)
logs.Info.Printf("cluster: leader set to '%s'", health.Leader)
}
c.fo.leader = health.Leader
}
@ -306,7 +307,7 @@ func (c *Cluster) run() {
missed = 0
if health.Signature != c.ring.Signature() {
if rehashSkipped {
log.Println("cluster: rehashing at a request of",
logs.Info.Println("cluster: rehashing at a request of",
health.Leader, health.Nodes, health.Signature, c.ring.Signature())
c.rehash(health.Nodes)
c.invalidateProxySubs("")
@ -323,7 +324,7 @@ func (c *Cluster) run() {
if c.fo.term < vreq.req.Term {
// This is a new election. This node has not voted yet. Vote for the requestor and
// clear the current leader.
log.Printf("Voting YES for %s, my term %d, vote term %d", vreq.req.Node, c.fo.term, vreq.req.Term)
logs.Info.Printf("Voting YES for %s, my term %d, vote term %d", vreq.req.Node, c.fo.term, vreq.req.Term)
c.fo.term = vreq.req.Term
c.fo.leader = ""
// Election means these is no leader yet.
@ -331,7 +332,7 @@ func (c *Cluster) run() {
vreq.resp <- ClusterVoteResponse{Result: true, Term: c.fo.term}
} else {
// This node has voted already or stale election, reject.
log.Printf("Voting NO for %s, my term %d, vote term %d", vreq.req.Node, c.fo.term, vreq.req.Term)
logs.Info.Printf("Voting NO for %s, my term %d, vote term %d", vreq.req.Node, c.fo.term, vreq.req.Term)
vreq.resp <- ClusterVoteResponse{Result: false, Term: c.fo.term}
}
case <-c.fo.done:

View File

@ -8,12 +8,12 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"log"
"strconv"
"strings"
"time"
"github.com/tinode/chat/server/auth"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store"
t "github.com/tinode/chat/server/store/types"
b "go.mongodb.org/mongo-driver/bson"
@ -114,7 +114,7 @@ func (a *adapter) Open(jsonconfig json.RawMessage) error {
}
if config.ReplicaSet == "" {
log.Println("MongoDB configured as standalone or replica_set option not set. Transaction support is disabled.")
logs.Info.Println("MongoDB configured as standalone or replica_set option not set. Transaction support is disabled.")
} else {
opts.SetReplicaSet(config.ReplicaSet)
a.useTransactions = true
@ -253,7 +253,7 @@ func (a *adapter) SetMaxResults(val int) error {
// CreateDb creates the database optionally dropping an existing database first.
func (a *adapter) CreateDb(reset bool) error {
if reset {
log.Print("Dropping database...")
logs.Info.Print("Dropping database...")
if err := a.db.Drop(a.ctx); err != nil {
return err
}

View File

@ -27,9 +27,8 @@ import (
mdb "go.mongodb.org/mongo-driver/mongo"
mdbopts "go.mongodb.org/mongo-driver/mongo/options"
//backend "github.com/tinode/chat/server/db/rethinkdb"
//backend "github.com/tinode/chat/server/db/mysql"
backend "github.com/tinode/chat/server/db/mongodb"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store/types"
)
@ -1223,6 +1222,7 @@ func initConnectionToDb() {
}
func init() {
logs.Init()
adp = backend.GetAdapter()
conffile := flag.String("config", "./test.conf", "config of the database connection")

View File

@ -13,11 +13,11 @@ import (
"encoding/json"
"errors"
"io"
"log"
"net/http"
"strings"
"time"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
)
@ -34,7 +34,7 @@ func largeFileServe(wrt http.ResponseWriter, req *http.Request) {
wrt.WriteHeader(msg.Ctrl.Code)
enc.Encode(msg)
if err != nil {
log.Println("media serve", err)
logs.Warning.Println("media serve", err)
}
}
@ -67,7 +67,7 @@ func largeFileServe(wrt http.ResponseWriter, req *http.Request) {
wrt.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
wrt.WriteHeader(http.StatusTemporaryRedirect)
enc.Encode(InfoFound("", "", now))
log.Println("media serve redirected", redirTo)
logs.Info.Println("media serve redirected", redirTo)
return
} else if err != nil {
writeHttpResponse(decodeStoreError(err, "", "", now, nil), err)
@ -86,12 +86,12 @@ func largeFileServe(wrt http.ResponseWriter, req *http.Request) {
wrt.Header().Set("Content-Disposition", "attachment")
http.ServeContent(wrt, req, "", fd.UpdatedAt, rsc)
log.Println("media served OK")
logs.Info.Println("media served OK")
}
// largeFileReceive receives files from client over HTTP(S) and passes them to the configured media handler.
func largeFileReceive(wrt http.ResponseWriter, req *http.Request) {
log.Println("Upload request", req.RequestURI)
logs.Info.Println("Upload request", req.RequestURI)
now := types.TimeNow()
enc := json.NewEncoder(wrt)
@ -104,7 +104,7 @@ func largeFileReceive(wrt http.ResponseWriter, req *http.Request) {
wrt.WriteHeader(msg.Ctrl.Code)
enc.Encode(msg)
log.Println("media upload:", msg.Ctrl.Code, msg.Ctrl.Text, "/", err)
logs.Info.Println("media upload:", msg.Ctrl.Code, msg.Ctrl.Text, "/", err)
}
// Check if this is a POST or a PUT request.
@ -148,7 +148,7 @@ func largeFileReceive(wrt http.ResponseWriter, req *http.Request) {
wrt.WriteHeader(http.StatusTemporaryRedirect)
enc.Encode(InfoFound("", "", now))
log.Println("media upload redirected", redirTo)
logs.Info.Println("media upload redirected", redirTo)
return
} else if err != nil {
writeHttpResponse(decodeStoreError(err, "", "", now, nil), err)
@ -199,7 +199,7 @@ func largeFileRunGarbageCollection(period time.Duration, block int) chan<- bool
select {
case <-gcTimer:
if err := store.Files.DeleteUnused(time.Now().Add(-time.Hour), block); err != nil {
log.Println("media gc:", err)
logs.Warning.Println("media gc:", err)
}
case <-stop:
return

View File

@ -12,10 +12,10 @@ package main
import (
"crypto/tls"
"io"
"log"
"time"
"github.com/tinode/chat/pbx"
"github.com/tinode/chat/server/logs"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
@ -39,7 +39,7 @@ func (*grpcNodeServer) MessageLoop(stream pbx.Node_MessageLoopServer) error {
if p, ok := peer.FromContext(stream.Context()); ok {
sess.remoteAddr = p.Addr.String()
}
log.Println("grpc: session started", sess.sid, sess.remoteAddr, count)
logs.Info.Println("grpc: session started", sess.sid, sess.remoteAddr, count)
defer func() {
sess.closeGrpc()
@ -54,10 +54,10 @@ func (*grpcNodeServer) MessageLoop(stream pbx.Node_MessageLoopServer) error {
return nil
}
if err != nil {
log.Println("grpc: recv", sess.sid, err)
logs.Error.Println("grpc: recv", sess.sid, err)
return err
}
log.Println("grpc in:", truncateStringIfTooLong(in.String()), sess.sid)
logs.Info.Println("grpc in:", truncateStringIfTooLong(in.String()), sess.sid)
statsInc("IncomingMessagesGrpcTotal", 1)
sess.dispatch(pbCliDeserialize(in))
@ -86,12 +86,12 @@ func (sess *Session) writeGrpcLoop() {
return
}
if len(sess.send) > sendQueueLimit {
log.Println("grpc: outbound queue limit exceeded", sess.sid)
logs.Error.Println("grpc: outbound queue limit exceeded", sess.sid)
return
}
statsInc("OutgoingMessagesGrpcTotal", 1)
if err := grpcWrite(sess, msg); err != nil {
log.Println("grpc: write", sess.sid, err)
logs.Error.Println("grpc: write", sess.sid, err)
return
}
@ -157,11 +157,11 @@ func serveGrpc(addr string, kaEnabled bool, tlsConf *tls.Config) (*grpc.Server,
srv := grpc.NewServer(opts...)
pbx.RegisterNodeServer(srv, &grpcNodeServer{})
log.Printf("gRPC/%s%s server is registered at [%s]", grpc.Version, secure, addr)
logs.Info.Printf("gRPC/%s%s server is registered at [%s]", grpc.Version, secure, addr)
go func() {
if err := srv.Serve(lis); err != nil {
log.Println("gRPC server failed:", err)
logs.Error.Println("gRPC server failed:", err)
}
}()

View File

@ -13,9 +13,10 @@ import (
"encoding/json"
"errors"
"io/ioutil"
"log"
"net/http"
"time"
"github.com/tinode/chat/server/logs"
)
func (sess *Session) writeOnce(wrt http.ResponseWriter, req *http.Request) {
@ -25,11 +26,11 @@ func (sess *Session) writeOnce(wrt http.ResponseWriter, req *http.Request) {
case msg, ok := <-sess.send:
if ok {
if len(sess.send) > sendQueueLimit {
log.Println("longPoll: outbound queue limit exceeded", sess.sid)
logs.Error.Println("longPoll: outbound queue limit exceeded", sess.sid)
} else {
statsInc("OutgoingMessagesLongpollTotal", 1)
if err := lpWrite(wrt, msg); err != nil {
log.Println("longPoll: writeOnce failed", sess.sid, err)
logs.Error.Println("longPoll: writeOnce failed", sess.sid, err)
}
}
}
@ -58,7 +59,7 @@ func (sess *Session) writeOnce(wrt http.ResponseWriter, req *http.Request) {
case <-time.After(pingPeriod):
// just write an empty packet on timeout
if _, err := wrt.Write([]byte{}); err != nil {
log.Println("longPoll: writeOnce: timout", sess.sid, err)
logs.Error.Println("longPoll: writeOnce: timout", sess.sid, err)
}
return
@ -141,7 +142,7 @@ func serveLongPoll(wrt http.ResponseWriter, req *http.Request) {
var count int
sess, count = globals.sessionStore.NewSession(wrt, "")
sess.remoteAddr = getRemoteAddr(req)
log.Println("longPoll: session started", sess.sid, sess.remoteAddr, count)
logs.Info.Println("longPoll: session started", sess.sid, sess.remoteAddr, count)
wrt.WriteHeader(http.StatusCreated)
pkt := NoErrCreated(req.FormValue("id"), "", now)
@ -156,7 +157,7 @@ func serveLongPoll(wrt http.ResponseWriter, req *http.Request) {
// Existing session
sess = globals.sessionStore.Get(sid)
if sess == nil {
log.Println("longPoll: invalid or expired session id", sid)
logs.Warning.Println("longPoll: invalid or expired session id", sid)
wrt.WriteHeader(http.StatusForbidden)
enc.Encode(ErrSessionNotFound(now))
return
@ -165,13 +166,13 @@ func serveLongPoll(wrt http.ResponseWriter, req *http.Request) {
addr := getRemoteAddr(req)
if sess.remoteAddr != addr {
sess.remoteAddr = addr
log.Println("longPoll: remote address changed", sid, addr)
logs.Warning.Println("longPoll: remote address changed", sid, addr)
}
if req.ContentLength != 0 {
// Read payload and send it for processing.
if code, err := sess.readOnce(wrt, req); err != nil {
log.Println("longPoll: readOnce failed", sess.sid, err)
logs.Warning.Println("longPoll: readOnce failed", sess.sid, err)
// Failed to read request, report an error, if possible
if code != 0 {
wrt.WriteHeader(code)

View File

@ -11,11 +11,11 @@ package main
import (
"encoding/json"
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
"github.com/tinode/chat/server/logs"
)
const (
@ -54,7 +54,7 @@ func (sess *Session) readLoop() {
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure,
websocket.CloseNormalClosure) {
log.Println("ws: readLoop", sess.sid, err)
logs.Error.Println("ws: readLoop", sess.sid, err)
}
return
}
@ -80,14 +80,14 @@ func (sess *Session) writeLoop() {
return
}
if len(sess.send) > sendQueueLimit {
log.Println("ws: outbound queue limit exceeded", sess.sid)
logs.Error.Println("ws: outbound queue limit exceeded", sess.sid)
return
}
statsInc("OutgoingMessagesWebsockTotal", 1)
if err := wsWrite(sess.ws, websocket.TextMessage, msg); err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure,
websocket.CloseNormalClosure) {
log.Println("ws: writeLoop", sess.sid, err)
logs.Error.Println("ws: writeLoop", sess.sid, err)
}
return
}
@ -112,7 +112,7 @@ func (sess *Session) writeLoop() {
if err := wsWrite(sess.ws, websocket.PingMessage, nil); err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure,
websocket.CloseNormalClosure) {
log.Println("ws: writeLoop ping", sess.sid, err)
logs.Error.Println("ws: writeLoop ping", sess.sid, err)
}
return
}
@ -146,23 +146,23 @@ func serveWebSocket(wrt http.ResponseWriter, req *http.Request) {
if isValid, _ := checkAPIKey(getAPIKey(req)); !isValid {
wrt.WriteHeader(http.StatusForbidden)
json.NewEncoder(wrt).Encode(ErrAPIKeyRequired(now))
log.Println("ws: Missing, invalid or expired API key")
logs.Error.Println("ws: Missing, invalid or expired API key")
return
}
if req.Method != http.MethodGet {
wrt.WriteHeader(http.StatusMethodNotAllowed)
json.NewEncoder(wrt).Encode(ErrOperationNotAllowed("", "", now))
log.Println("ws: Invalid HTTP method", req.Method)
logs.Error.Println("ws: Invalid HTTP method", req.Method)
return
}
ws, err := upgrader.Upgrade(wrt, req, nil)
if _, ok := err.(websocket.HandshakeError); ok {
log.Println("ws: Not a websocket handshake")
logs.Error.Println("ws: Not a websocket handshake")
return
} else if err != nil {
log.Println("ws: failed to Upgrade ", err)
logs.Error.Println("ws: failed to Upgrade ", err)
return
}
@ -174,7 +174,7 @@ func serveWebSocket(wrt http.ResponseWriter, req *http.Request) {
sess.remoteAddr = req.RemoteAddr
}
log.Println("ws: session started", sess.sid, sess.remoteAddr, count)
logs.Info.Println("ws: session started", sess.sid, sess.remoteAddr, count)
// Do work in goroutines to return from serveWebSocket() to release file pointers.
// Otherwise "too many open files" will happen.

View File

@ -14,7 +14,6 @@ import (
"encoding/base64"
"encoding/json"
"errors"
"log"
"net"
"net/http"
"net/url"
@ -25,6 +24,7 @@ import (
"syscall"
"time"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
)
@ -54,7 +54,7 @@ func listenAndServe(addr string, mux *http.ServeMux, tlfConf *tls.Config, stop <
if isUnixAddr(globals.tlsRedirectHTTP) || isUnixAddr(addr) {
err = errors.New("HTTP to HTTPS redirect: unix sockets not supported.")
} else {
log.Printf("Redirecting connections from HTTP at [%s] to HTTPS at [%s]",
logs.Info.Printf("Redirecting connections from HTTP at [%s] to HTTPS at [%s]",
globals.tlsRedirectHTTP, addr)
// This is a second HTTP server listenning on a different port.
@ -63,7 +63,7 @@ func listenAndServe(addr string, mux *http.ServeMux, tlfConf *tls.Config, stop <
}
if err == nil {
log.Printf("Listening for client HTTPS connections on [%s]", addr)
logs.Info.Printf("Listening for client HTTPS connections on [%s]", addr)
var lis net.Listener
lis, err = netListener(addr)
if err == nil {
@ -71,7 +71,7 @@ func listenAndServe(addr string, mux *http.ServeMux, tlfConf *tls.Config, stop <
}
}
} else {
log.Printf("Listening for client HTTP connections on [%s]", addr)
logs.Info.Printf("Listening for client HTTP connections on [%s]", addr)
var lis net.Listener
lis, err = netListener(addr)
if err == nil {
@ -81,9 +81,9 @@ func listenAndServe(addr string, mux *http.ServeMux, tlfConf *tls.Config, stop <
if err != nil {
if globals.shuttingDown {
log.Println("HTTP server: stopped")
logs.Info.Println("HTTP server: stopped")
} else {
log.Println("HTTP server: failed", err)
logs.Error.Println("HTTP server: failed", err)
}
}
httpdone <- true
@ -100,7 +100,7 @@ Loop:
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
if err := server.Shutdown(ctx); err != nil {
// failure/timeout shutting down the server gracefully
log.Println("HTTP server failed to terminate gracefully", err)
logs.Error.Println("HTTP server failed to terminate gracefully", err)
}
// While the server shuts down, termianate all sessions.
@ -153,7 +153,7 @@ func signalHandler() <-chan bool {
go func() {
// Wait for a signal. Don't care which signal it is
sig := <-signchan
log.Printf("Signal received: '%s', shutting down", sig)
logs.Info.Printf("Signal received: '%s', shutting down", sig)
stop <- true
}()
@ -349,7 +349,7 @@ func authHttpRequest(req *http.Request) (types.Uid, []byte, error) {
}
uid = rec.Uid
} else {
log.Println("fileUpload: auth data is present but handler is not found", authMethod)
logs.Info.Println("fileUpload: auth data is present but handler is not found", authMethod)
}
} else {
// Find the session, make sure it's appropriately authenticated.

View File

@ -6,11 +6,12 @@ package main
import (
"fmt"
"log"
"net/http"
"path"
"runtime/pprof"
"strings"
"github.com/tinode/chat/server/logs"
)
var pprofHttpRoot string
@ -24,7 +25,7 @@ func servePprof(mux *http.ServeMux, serveAt string) {
pprofHttpRoot = path.Clean("/"+serveAt) + "/"
mux.HandleFunc(pprofHttpRoot, profileHandler)
log.Printf("pprof: profiling info exposed at '%s'", pprofHttpRoot)
logs.Info.Printf("pprof: profiling info exposed at '%s'", pprofHttpRoot)
}
func profileHandler(wrt http.ResponseWriter, req *http.Request) {

View File

@ -10,12 +10,13 @@
package main
import (
"log"
//"log"
"strings"
"sync"
"time"
"github.com/tinode/chat/server/auth"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
)
@ -215,7 +216,7 @@ func (h *Hub) run() {
join.sess.inflightReqs.Done()
}
join.sess.queueOut(ErrServiceUnavailableReply(join.pkt, join.pkt.Timestamp))
log.Println("hub.join loop: topic's reg queue full", join.pkt.RcptTo, join.sess.sid, " - total queue len:", len(t.reg))
logs.Error.Println("hub.join loop: topic's reg queue full", join.pkt.RcptTo, join.sess.sid, " - total queue len:", len(t.reg))
}
}
@ -229,16 +230,16 @@ func (h *Hub) run() {
select {
case dst.broadcast <- msg:
default:
log.Println("hub: topic's broadcast queue is full", dst.name)
logs.Error.Println("hub: topic's broadcast queue is full", dst.name)
}
} else {
log.Println("hub: invalid topic category for broadcast", dst.name)
logs.Warning.Println("hub: invalid topic category for broadcast", dst.name)
}
} else if (strings.HasPrefix(msg.RcptTo, "usr") || strings.HasPrefix(msg.RcptTo, "grp")) &&
globals.cluster.isRemoteTopic(msg.RcptTo) {
// It is a remote topic.
if err := globals.cluster.routeToTopicIntraCluster(msg.RcptTo, msg, msg.sess); err != nil {
log.Printf("hub: routing to '%s' failed", msg.RcptTo)
logs.Warning.Printf("hub: routing to '%s' failed", msg.RcptTo)
}
} else if msg.Pres == nil && msg.Info == nil {
// Topic is unknown or offline.
@ -246,7 +247,7 @@ func (h *Hub) run() {
// TODO(gene): validate topic name, discarding invalid topics
log.Printf("Hub. Topic[%s] is unknown or offline", msg.RcptTo)
logs.Info.Printf("Hub. Topic[%s] is unknown or offline", msg.RcptTo)
msg.sess.queueOut(NoErrAcceptedExplicitTs(msg.Id, msg.RcptTo, types.TimeNow(), msg.Timestamp))
}
@ -276,7 +277,7 @@ func (h *Hub) run() {
if unreg.forUser.IsZero() {
// The topic is being garbage collected or deleted.
if err := h.topicUnreg(unreg.sess, unreg.rcptTo, unreg.pkt, reason); err != nil {
log.Println("hub.topicUnreg failed:", err)
logs.Error.Println("hub.topicUnreg failed:", err)
}
} else {
go h.stopTopicsForUser(unreg.forUser, reason, unreg.done)
@ -321,7 +322,7 @@ func (h *Hub) run() {
<-topicsdone
}
log.Printf("Hub shutdown completed with %d topics", topicCount)
logs.Info.Printf("Hub shutdown completed with %d topics", topicCount)
// let the main goroutine know we are done with the cleanup
hubdone <- true
@ -567,7 +568,7 @@ func replyOfflineTopicGetDesc(sess *Session, msg *ClientComMessage) {
if strings.HasPrefix(topic, "grp") {
stopic, err := store.Topics.Get(topic)
if err != nil {
log.Println("replyOfflineTopicGetDesc", err)
logs.Info.Println("replyOfflineTopicGetDesc", err)
sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Id, msg.Original, now, msg.Timestamp, nil))
return
}
@ -603,7 +604,7 @@ func replyOfflineTopicGetDesc(sess *Session, msg *ClientComMessage) {
}
if uid.IsZero() {
log.Println("replyOfflineTopicGetDesc: malformed p2p topic name")
logs.Warning.Println("replyOfflineTopicGetDesc: malformed p2p topic name")
sess.queueOut(ErrMalformedReply(msg, now))
return
}
@ -627,7 +628,7 @@ func replyOfflineTopicGetDesc(sess *Session, msg *ClientComMessage) {
sub, err := store.Subs.Get(topic, asUid)
if err != nil {
log.Println("replyOfflineTopicGetDesc:", err)
logs.Warning.Println("replyOfflineTopicGetDesc:", err)
sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Id, msg.Original, now, msg.Timestamp, nil))
return
}
@ -658,7 +659,7 @@ func replyOfflineTopicGetSub(sess *Session, msg *ClientComMessage) {
ssub, err := store.Subs.Get(msg.RcptTo, types.ParseUserId(msg.AsUser))
if err != nil {
log.Println("replyOfflineTopicGetSub:", err)
logs.Warning.Println("replyOfflineTopicGetSub:", err)
sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Id, msg.Original, now, msg.Timestamp, nil))
return
}
@ -712,7 +713,7 @@ func replyOfflineTopicSetSub(sess *Session, msg *ClientComMessage) {
sub, err := store.Subs.Get(msg.RcptTo, asUid)
if err != nil {
log.Println("replyOfflineTopicSetSub get sub:", err)
logs.Warning.Println("replyOfflineTopicSetSub get sub:", err)
sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Id, msg.Original, now, msg.Timestamp, nil))
return
}
@ -735,7 +736,7 @@ func replyOfflineTopicSetSub(sess *Session, msg *ClientComMessage) {
if msg.Set.Sub != nil && msg.Set.Sub.Mode != "" {
var modeWant types.AccessMode
if err = modeWant.UnmarshalText([]byte(msg.Set.Sub.Mode)); err != nil {
log.Println("replyOfflineTopicSetSub mode:", err)
logs.Warning.Println("replyOfflineTopicSetSub mode:", err)
sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Id, msg.Original, now, msg.Timestamp, nil))
return
}
@ -762,7 +763,7 @@ func replyOfflineTopicSetSub(sess *Session, msg *ClientComMessage) {
if len(update) > 0 {
err = store.Subs.Update(msg.RcptTo, asUid, update, true)
if err != nil {
log.Println("replyOfflineTopicSetSub update:", err)
logs.Warning.Println("replyOfflineTopicSetSub update:", err)
sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Id, msg.Original, now, msg.Timestamp, nil))
} else {
var params interface{}

View File

@ -9,10 +9,10 @@
package main
import (
"log"
"strings"
"github.com/tinode/chat/server/auth"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
)
@ -62,7 +62,7 @@ func topicInit(t *Topic, join *sessionJoin, h *Hub) {
// Remove topic from cache to prevent hub from forwarding more messages to it.
h.topicDel(join.pkt.RcptTo)
log.Println("init_topic: failed to load or create topic:", join.pkt.RcptTo, err)
logs.Error.Println("init_topic: failed to load or create topic:", join.pkt.RcptTo, err)
join.sess.queueOut(decodeStoreErrorExplicitTs(err, join.pkt.Id, t.xoriginal, timestamp, join.pkt.Timestamp, nil))
// Re-queue pending requests to join the topic.
@ -247,7 +247,7 @@ func initTopicP2P(t *Topic, sreg *sessionJoin) error {
// Case 3, fail
if len(subs) == 0 {
log.Println("hub: missing both subscriptions for '" + t.name + "' (SHOULD NEVER HAPPEN!)")
logs.Error.Println("hub: missing both subscriptions for '" + t.name + "' (SHOULD NEVER HAPPEN!)")
return types.ErrInternal
}
@ -346,7 +346,7 @@ func initTopicP2P(t *Topic, sreg *sessionJoin) error {
// The other user is assumed to have auth level "Auth".
sub2.ModeGiven = users[u1].Access.Auth
if err := sub2.ModeGiven.UnmarshalText([]byte(pktsub.Set.Desc.DefaultAcs.Auth)); err != nil {
log.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Desc.DefaultAcs.Auth)
logs.Error.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Desc.DefaultAcs.Auth)
}
} else {
// Use user1.Auth as modeGiven for the other user
@ -385,11 +385,11 @@ func initTopicP2P(t *Topic, sreg *sessionJoin) error {
if uid != userID1 {
// Report the error and ignore the value
log.Println("hub: setting mode for another user is not supported '" + t.name + "'")
logs.Error.Println("hub: setting mode for another user is not supported '" + t.name + "'")
} else {
// user1 is setting non-default modeWant
if err := userData.modeWant.UnmarshalText([]byte(pktsub.Set.Sub.Mode)); err != nil {
log.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Sub.Mode)
logs.Error.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Sub.Mode)
}
// Ensure sanity
userData.modeWant = userData.modeWant&types.ModeCP2P | types.ModeApprove
@ -531,9 +531,9 @@ func initTopicNewGrp(t *Topic, sreg *sessionJoin, isChan bool) error {
} else {
t.accessAnon = anonMode
}
log.Println("hub: invalid access mode for topic '" + t.name + "': '" + err.Error() + "'")
logs.Error.Println("hub: invalid access mode for topic '" + t.name + "': '" + err.Error() + "'")
} else if authMode.IsOwner() || anonMode.IsOwner() {
log.Println("hub: OWNER default access in topic '" + t.name)
logs.Error.Println("hub: OWNER default access in topic '" + t.name)
t.accessAuth, t.accessAnon = authMode & ^types.ModeOwner, anonMode & ^types.ModeOwner
} else {
t.accessAuth, t.accessAnon = authMode, anonMode
@ -545,7 +545,7 @@ func initTopicNewGrp(t *Topic, sreg *sessionJoin, isChan bool) error {
if pktsub.Set.Sub != nil && pktsub.Set.Sub.Mode != "" {
userData.modeWant = types.ModeCFull
if err := userData.modeWant.UnmarshalText([]byte(pktsub.Set.Sub.Mode)); err != nil {
log.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Sub.Mode)
logs.Error.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Sub.Mode)
}
// User must not unset ModeJoin or the owner flags
userData.modeWant |= types.ModeJoin | types.ModeOwner

24
server/logs/logs.go Normal file
View File

@ -0,0 +1,24 @@
/******************************************************************************
*
* Description :
* Package exposes info, warning and error loggers.
*
*****************************************************************************/
package logs
import (
"log"
"os"
)
var (
Info *log.Logger
Warning *log.Logger
Error *log.Logger
)
func Init() {
Info = log.New(os.Stdout, "I", log.LstdFlags | log.Lshortfile)
Warning = log.New(os.Stdout, "W", log.LstdFlags | log.Lshortfile)
Error = log.New(os.Stdout, "E", log.LstdFlags | log.Lshortfile)
}

View File

@ -13,7 +13,6 @@ package main
import (
"encoding/json"
"flag"
"log"
"net/http"
"os"
"path/filepath"
@ -39,6 +38,8 @@ import (
_ "github.com/tinode/chat/server/db/mysql"
_ "github.com/tinode/chat/server/db/rethinkdb"
"github.com/tinode/chat/server/logs"
// Push notifications
"github.com/tinode/chat/server/push"
_ "github.com/tinode/chat/server/push/fcm"
@ -255,14 +256,13 @@ type configType struct {
func main() {
executable, _ := os.Executable()
// Prepend log lines with file name and line number.
log.SetFlags(log.LstdFlags | log.Lshortfile)
logs.Init()
// All relative paths are resolved against the executable path, not against current working directory.
// Absolute paths are left unchanged.
rootpath, _ := filepath.Split(executable)
log.Printf("Server v%s:%s:%s; pid %d; %d process(es)",
logs.Info.Printf("Server v%s:%s:%s; pid %d; %d process(es)",
currentVersion, executable, buildstamp,
os.Getpid(), runtime.GOMAXPROCS(runtime.NumCPU()))
@ -280,25 +280,25 @@ func main() {
flag.Parse()
*configfile = toAbsolutePath(rootpath, *configfile)
log.Printf("Using config from '%s'", *configfile)
logs.Info.Printf("Using config from '%s'", *configfile)
var config configType
if file, err := os.Open(*configfile); err != nil {
log.Fatal("Failed to read config file: ", err)
logs.Error.Fatal("Failed to read config file: ", err)
} else {
jr := jcr.New(file)
if err = json.NewDecoder(jr).Decode(&config); err != nil {
switch jerr := err.(type) {
case *json.UnmarshalTypeError:
lnum, cnum, _ := jr.LineAndChar(jerr.Offset)
log.Fatalf("Unmarshall error in config file in %s at %d:%d (offset %d bytes): %s",
logs.Error.Fatalf("Unmarshall error in config file in %s at %d:%d (offset %d bytes): %s",
jerr.Field, lnum, cnum, jerr.Offset, jerr.Error())
case *json.SyntaxError:
lnum, cnum, _ := jr.LineAndChar(jerr.Offset)
log.Fatalf("Syntax error in config file at %d:%d (offset %d bytes): %s",
logs.Error.Fatalf("Syntax error in config file at %d:%d (offset %d bytes): %s",
lnum, cnum, jerr.Offset, jerr.Error())
default:
log.Fatal("Failed to parse config file: ", err)
logs.Error.Fatal("Failed to parse config file: ", err)
}
}
file.Close()
@ -336,13 +336,13 @@ func main() {
cpuf, err := os.Create(*pprofFile + ".cpu")
if err != nil {
log.Fatal("Failed to create CPU pprof file: ", err)
logs.Error.Fatal("Failed to create CPU pprof file: ", err)
}
defer cpuf.Close()
memf, err := os.Create(*pprofFile + ".mem")
if err != nil {
log.Fatal("Failed to create Mem pprof file: ", err)
logs.Error.Fatal("Failed to create Mem pprof file: ", err)
}
defer memf.Close()
@ -350,18 +350,18 @@ func main() {
defer pprof.StopCPUProfile()
defer pprof.WriteHeapProfile(memf)
log.Printf("Profiling info saved to '%s.(cpu|mem)'", *pprofFile)
logs.Info.Printf("Profiling info saved to '%s.(cpu|mem)'", *pprofFile)
}
err := store.Open(workerId, config.Store)
if err != nil {
log.Fatal("Failed to connect to DB: ", err)
logs.Error.Fatal("Failed to connect to DB: ", err)
}
log.Println("DB adapter", store.GetAdapterName())
logs.Info.Println("DB adapter", store.GetAdapterName())
defer func() {
store.Close()
log.Println("Closed database connection(s)")
log.Println("All done, good bye")
logs.Info.Println("Closed database connection(s)")
logs.Info.Println("All done, good bye")
}()
// API key signing secret
@ -369,7 +369,7 @@ func main() {
err = store.InitAuthLogicalNames(config.Auth["logical_names"])
if err != nil {
log.Fatal(err)
logs.Error.Fatal(err)
}
// List of tag namespaces for user discovery which cannot be changed directly
@ -379,18 +379,18 @@ func main() {
authNames := store.GetAuthNames()
for _, name := range authNames {
if authhdl := store.GetLogicalAuthHandler(name); authhdl == nil {
log.Fatalln("Unknown authenticator", name)
logs.Error.Fatalln("Unknown authenticator", name)
} else if jsconf := config.Auth[name]; jsconf != nil {
if err := authhdl.Init(jsconf, name); err != nil {
log.Fatalln("Failed to init auth scheme", name+":", err)
logs.Error.Fatalln("Failed to init auth scheme", name+":", err)
}
tags, err := authhdl.RestrictedTags()
if err != nil {
log.Fatalln("Failed get restricted tag namespaces (prefixes)", name+":", err)
logs.Error.Fatalln("Failed get restricted tag namespaces (prefixes)", name+":", err)
}
for _, tag := range tags {
if strings.Contains(tag, ":") {
log.Fatalln("tags restricted by auth handler should not contain character ':'", tag)
logs.Error.Fatalln("tags restricted by auth handler should not contain character ':'", tag)
}
globals.immutableTagNS[tag] = true
}
@ -403,7 +403,7 @@ func main() {
// The namespace can be restricted even if the validator is disabled.
if vconf.AddToTags {
if strings.Contains(name, ":") {
log.Fatalln("acc_validation names should not contain character ':'", name)
logs.Error.Fatalln("acc_validation names should not contain character ':'", name)
}
globals.immutableTagNS[name] = true
}
@ -418,7 +418,7 @@ func main() {
lvl := auth.ParseAuthLevel(req)
if lvl == auth.LevelNone {
if req != "" {
log.Fatalf("Invalid required AuthLevel '%s' in validator '%s'", req, name)
logs.Error.Fatalf("Invalid required AuthLevel '%s' in validator '%s'", req, name)
}
// Skip empty string
continue
@ -436,9 +436,9 @@ func main() {
}
if val := store.GetValidator(name); val == nil {
log.Fatal("Config provided for an unknown validator '" + name + "'")
logs.Error.Fatal("Config provided for an unknown validator '" + name + "'")
} else if err = val.Init(string(vconf.Config)); err != nil {
log.Fatal("Failed to init validator '"+name+"': ", err)
logs.Error.Fatal("Failed to init validator '"+name+"': ", err)
}
if globals.validators == nil {
globals.validators = make(map[string]credValidator)
@ -452,7 +452,7 @@ func main() {
globals.maskedTagNS = make(map[string]bool, len(config.MaskedTagNamespaces))
for _, tag := range config.MaskedTagNamespaces {
if strings.Contains(tag, ":") {
log.Fatal("masked_tags namespaces should not contain character ':'", tag)
logs.Error.Fatal("masked_tags namespaces should not contain character ':'", tag)
}
globals.maskedTagNS[tag] = true
}
@ -462,14 +462,14 @@ func main() {
tags = append(tags, "'"+tag+"'")
}
if len(tags) > 0 {
log.Println("Restricted tags:", tags)
logs.Info.Println("Restricted tags:", tags)
}
tags = nil
for tag := range globals.maskedTagNS {
tags = append(tags, "'"+tag+"'")
}
if len(tags) > 0 {
log.Println("Masked tags:", tags)
logs.Info.Println("Masked tags:", tags)
}
// Maximum message size
@ -505,7 +505,7 @@ func main() {
conf = string(params)
}
if err = store.UseMediaHandler(config.Media.UseHandler, conf); err != nil {
log.Fatalf("Failed to init media handler '%s': %s", config.Media.UseHandler, err)
logs.Error.Fatalf("Failed to init media handler '%s': %s", config.Media.UseHandler, err)
}
}
if config.Media.GcPeriod > 0 && config.Media.GcBlockSize > 0 {
@ -513,7 +513,7 @@ func main() {
config.Media.GcBlockSize)
defer func() {
stopFilesGc <- true
log.Println("Stopped files garbage collector")
logs.Info.Println("Stopped files garbage collector")
}()
}
}
@ -521,11 +521,11 @@ func main() {
err = push.Init(string(config.Push))
if err != nil {
log.Fatal("Failed to initialize push notifications:", err)
logs.Error.Fatal("Failed to initialize push notifications:", err)
}
defer func() {
push.Stop()
log.Println("Stopped push notifications")
logs.Info.Println("Stopped push notifications")
}()
// Keep inactive LP sessions for 15 seconds
@ -540,7 +540,7 @@ func main() {
tlsConfig, err := parseTLSConfig(*tlsEnabled, config.TLS)
if err != nil {
log.Fatalln(err)
logs.Error.Fatalln(err)
}
// Intialize plugins
@ -554,7 +554,7 @@ func main() {
*listenGrpc = config.GrpcListen
}
if globals.grpcServer, err = serveGrpc(*listenGrpc, config.GrpcKeepalive, tlsConfig); err != nil {
log.Fatal(err)
logs.Error.Fatal(err)
}
// Serve static content from the directory in -static_data flag if that's
@ -566,7 +566,7 @@ func main() {
// Resolve path to static content.
*staticPath = toAbsolutePath(rootpath, *staticPath)
if _, err = os.Stat(*staticPath); os.IsNotExist(err) {
log.Fatal("Static content directory is not found", *staticPath)
logs.Error.Fatal("Static content directory is not found", *staticPath)
}
staticMountPoint = config.StaticMount
@ -592,9 +592,9 @@ func main() {
// Remove mount point prefix
http.StripPrefix(staticMountPoint,
http.FileServer(http.Dir(*staticPath))))))))
log.Printf("Serving static content from '%s' at '%s'", *staticPath, staticMountPoint)
logs.Info.Printf("Serving static content from '%s' at '%s'", *staticPath, staticMountPoint)
} else {
log.Println("Static content is disabled")
logs.Info.Println("Static content is disabled")
}
// Configure root path for serving API calls.
@ -611,7 +611,7 @@ func main() {
config.ApiPath += "/"
}
}
log.Printf("API served from root URL path '%s'", config.ApiPath)
logs.Info.Printf("API served from root URL path '%s'", config.ApiPath)
// Handle websocket clients.
mux.HandleFunc(config.ApiPath+"v0/channels", serveWebSocket)
@ -622,7 +622,7 @@ func main() {
mux.Handle(config.ApiPath+"v0/file/u/", gh.CompressHandler(http.HandlerFunc(largeFileReceive)))
// Serve large files.
mux.Handle(config.ApiPath+"v0/file/s/", gh.CompressHandler(http.HandlerFunc(largeFileServe)))
log.Println("Large media handling enabled", config.Media.UseHandler)
logs.Info.Println("Large media handling enabled", config.Media.UseHandler)
}
if staticMountPoint != "/" {
@ -631,6 +631,6 @@ func main() {
}
if err = listenAndServe(config.Listen, mux, tlsConfig, signalHandler()); err != nil {
log.Fatal(err)
logs.Error.Fatal(err)
}
}

View File

@ -7,11 +7,11 @@ import (
"encoding/json"
"errors"
"io"
"log"
"mime"
"os"
"path/filepath"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/media"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
@ -72,14 +72,14 @@ func (fh *fshandler) Upload(fdef *types.FileDef, file io.ReadSeeker) (string, er
outfile, err := os.Create(fdef.Location)
if err != nil {
log.Println("Upload: failed to create file", fdef.Location, err)
logs.Warning.Println("Upload: failed to create file", fdef.Location, err)
return "", err
}
if err = store.Files.StartUpload(fdef); err != nil {
outfile.Close()
os.Remove(fdef.Location)
log.Println("failed to create file record", fdef.Id, err)
logs.Warning.Println("failed to create file record", fdef.Id, err)
return "", err
}
@ -116,7 +116,7 @@ func (fh *fshandler) Download(url string) (*types.FileDef, media.ReadSeekCloser,
fd, err := fh.getFileRecord(fid)
if err != nil {
log.Println("Download: file not found", fid)
logs.Warning.Println("Download: file not found", fid)
return nil, nil, err
}
@ -137,7 +137,7 @@ func (fh *fshandler) Delete(locations []string) error {
for _, loc := range locations {
if err, _ := os.Remove(loc).(*os.PathError); err != nil {
if err != os.ErrNotExist {
log.Println("fs: error deleting file", loc, err)
logs.Warning.Println("fs: error deleting file", loc, err)
}
}
}

View File

@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"io"
"log"
"mime"
"net/http"
"sync/atomic"
@ -19,6 +18,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/media"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
@ -189,7 +189,7 @@ func (ah *awshandler) Upload(fdef *types.FileDef, file io.ReadSeeker) (string, e
uploader := s3manager.NewUploaderWithClient(ah.svc)
if err = store.Files.StartUpload(fdef); err != nil {
log.Println("failed to create file record", fdef.Id, err)
logs.Warning.Println("failed to create file record", fdef.Id, err)
return "", err
}
@ -221,7 +221,7 @@ func (ah *awshandler) Upload(fdef *types.FileDef, file io.ReadSeeker) (string, e
fname += ext[0]
}
log.Println("aws upload success ", fname, "key", key, "id", fdef.Id)
logs.Info.Println("aws upload success ", fname, "key", key, "id", fdef.Id)
return ah.conf.ServeURL + fname, nil
}

View File

@ -4,10 +4,10 @@ package main
import (
"encoding/json"
"log"
"time"
"github.com/tinode/chat/pbx"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store/types"
)
@ -66,7 +66,7 @@ func pbServPresSerialize(pres *MsgServerPres) *pbx.ServerMsg_Pres {
case "tags":
what = pbx.ServerPres_TAGS
default:
log.Fatal("Unknown pres.what value", pres.What)
logs.Error.Fatal("Unknown pres.what value", pres.What)
}
return &pbx.ServerMsg_Pres{Pres: &pbx.ServerPres{
Topic: pres.Topic,
@ -448,7 +448,7 @@ func bytesToInterface(in []byte) interface{} {
if len(in) > 0 {
err := json.Unmarshal(in, &out)
if err != nil {
log.Println("pbx: failed to parse bytes", string(in), err)
logs.Warning.Println("pbx: failed to parse bytes", string(in), err)
}
}
return out
@ -639,7 +639,7 @@ func pbInfoNoteWhatSerialize(what string) pbx.InfoNote {
case "recv":
out = pbx.InfoNote_RECV
default:
log.Fatal("unknown info-note.what", what)
logs.Error.Fatal("unknown info-note.what", what)
}
return out
}
@ -654,7 +654,7 @@ func pbInfoNoteWhatDeserialize(what pbx.InfoNote) string {
case pbx.InfoNote_RECV:
out = "recv"
default:
log.Fatal("unknown info-note.what", what)
logs.Error.Fatal("unknown info-note.what", what)
}
return out
}

View File

@ -4,11 +4,11 @@ package main
import (
"encoding/json"
"errors"
"log"
"strings"
"time"
"github.com/tinode/chat/pbx"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store/types"
"golang.org/x/net/context"
"google.golang.org/grpc"
@ -234,7 +234,7 @@ func pluginsInit(configString json.RawMessage) {
var config []pluginConfig
if err := json.Unmarshal(configString, &config); err != nil {
log.Fatal(err)
logs.Error.Fatal(err)
}
nameIndex := make(map[string]bool)
@ -247,7 +247,7 @@ func pluginsInit(configString json.RawMessage) {
}
if nameIndex[conf.Name] {
log.Fatalf("plugins: duplicate name '%s'", conf.Name)
logs.Error.Fatalf("plugins: duplicate name '%s'", conf.Name)
}
globals.plugins[count] = Plugin{
@ -259,29 +259,29 @@ func pluginsInit(configString json.RawMessage) {
var err error
if globals.plugins[count].filterFireHose, err =
ParsePluginFilter(conf.Filters.FireHose, plgFilterByTopicType|plgFilterByPacket); err != nil {
log.Fatal("plugins: bad FireHose filter", err)
logs.Error.Fatal("plugins: bad FireHose filter", err)
}
if globals.plugins[count].filterAccount, err =
ParsePluginFilter(conf.Filters.Account, plgFilterByAction); err != nil {
log.Fatal("plugins: bad Account filter", err)
logs.Error.Fatal("plugins: bad Account filter", err)
}
if globals.plugins[count].filterTopic, err =
ParsePluginFilter(conf.Filters.Topic, plgFilterByTopicType|plgFilterByAction); err != nil {
log.Fatal("plugins: bad FireHose filter", err)
logs.Error.Fatal("plugins: bad FireHose filter", err)
}
if globals.plugins[count].filterSubscription, err =
ParsePluginFilter(conf.Filters.Subscription, plgFilterByTopicType|plgFilterByAction); err != nil {
log.Fatal("plugins: bad Subscription filter", err)
logs.Error.Fatal("plugins: bad Subscription filter", err)
}
if globals.plugins[count].filterMessage, err =
ParsePluginFilter(conf.Filters.Message, plgFilterByTopicType|plgFilterByAction); err != nil {
log.Fatal("plugins: bad Message filter", err)
logs.Error.Fatal("plugins: bad Message filter", err)
}
globals.plugins[count].filterFind = conf.Filters.Find
if parts := strings.SplitN(conf.ServiceAddr, "://", 2); len(parts) < 2 {
log.Fatal("plugins: invalid server address format", conf.ServiceAddr)
logs.Error.Fatal("plugins: invalid server address format", conf.ServiceAddr)
} else {
globals.plugins[count].network = parts[0]
globals.plugins[count].addr = parts[1]
@ -289,7 +289,7 @@ func pluginsInit(configString json.RawMessage) {
globals.plugins[count].conn, err = grpc.Dial(globals.plugins[count].addr, grpc.WithInsecure())
if err != nil {
log.Fatalf("plugins: connection failure %v", err)
logs.Error.Fatalf("plugins: connection failure %v", err)
}
globals.plugins[count].client = pbx.NewPluginClient(globals.plugins[count].conn)
@ -300,7 +300,7 @@ func pluginsInit(configString json.RawMessage) {
globals.plugins = globals.plugins[:count]
if len(globals.plugins) == 0 {
log.Println("plugins: no active plugins found")
logs.Info.Println("plugins: no active plugins found")
globals.plugins = nil
} else {
var names []string
@ -308,7 +308,7 @@ func pluginsInit(configString json.RawMessage) {
names = append(names, globals.plugins[i].name+"("+globals.plugins[i].addr+")")
}
log.Println("plugins: active", "'"+strings.Join(names, "', '")+"'")
logs.Info.Println("plugins: active", "'"+strings.Join(names, "', '")+"'")
}
}
@ -393,7 +393,7 @@ func pluginFireHose(sess *Session, msg *ClientComMessage) (*ClientComMessage, *S
} else if p.failureCode != 0 {
// Plugin failed and it's configured to stop further processing.
log.Println("plugin: failed,", p.name, err)
logs.Error.Println("plugin: failed,", p.name, err)
return nil, &ServerComMessage{Ctrl: &MsgServerCtrl{
Id: id,
Code: p.failureCode,
@ -402,7 +402,7 @@ func pluginFireHose(sess *Session, msg *ClientComMessage) (*ClientComMessage, *S
Timestamp: ts}}
} else {
// Plugin failed but configured to ignore failure.
log.Println("plugin: failure ignored,", p.name, err)
logs.Warning.Println("plugin: failure ignored,", p.name, err)
}
}
@ -436,7 +436,7 @@ func pluginFind(user types.Uid, query string) (string, []types.Subscription, err
}
resp, err := p.client.Find(ctx, find)
if err != nil {
log.Println("plugins: Find call failed", p.name, err)
logs.Warning.Println("plugins: Find call failed", p.name, err)
return "", nil, err
}
respStatus := resp.GetStatus()
@ -493,7 +493,7 @@ func pluginAccount(user *types.User, action int) {
ctx = context.Background()
}
if _, err := p.client.Account(ctx, event); err != nil {
log.Println("plugins: Account call failed", p.name, err)
logs.Warning.Println("plugins: Account call failed", p.name, err)
}
}
}
@ -528,7 +528,7 @@ func pluginTopic(topic *Topic, action int) {
ctx = context.Background()
}
if _, err := p.client.Topic(ctx, event); err != nil {
log.Println("plugins: Topic call failed", p.name, err)
logs.Warning.Println("plugins: Topic call failed", p.name, err)
}
}
}
@ -574,7 +574,7 @@ func pluginSubscription(sub *types.Subscription, action int) {
ctx = context.Background()
}
if _, err := p.client.Subscription(ctx, event); err != nil {
log.Println("plugins: Subscription call failed", p.name, err)
logs.Warning.Println("plugins: Subscription call failed", p.name, err)
}
}
}
@ -609,7 +609,7 @@ func pluginMessage(data *MsgServerData, action int) {
ctx = context.Background()
}
if _, err := p.client.Message(ctx, event); err != nil {
log.Println("plugins: Message call failed", p.name, err)
logs.Warning.Println("plugins: Message call failed", p.name, err)
}
}
}

View File

@ -1,9 +1,9 @@
package main
import (
"log"
"strings"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
)
@ -535,7 +535,7 @@ func (t *Topic) presPubMessageCount(uid types.Uid, mode types.AccessMode, recv,
// Cases V.1, V.2
func (t *Topic) presPubMessageDelete(uid types.Uid, mode types.AccessMode, delID int, list []MsgDelRange, skip string) {
if len(list) == 0 && delID <= 0 {
log.Printf("Case V.1, V.2: topic[%s] invalid request - missing payload", t.name)
logs.Warning.Printf("Case V.1, V.2: topic[%s] invalid request - missing payload", t.name)
return
}

View File

@ -2,13 +2,13 @@ package fcm
import (
"errors"
"log"
"strconv"
"time"
fcm "firebase.google.com/go/messaging"
"github.com/tinode/chat/server/drafty"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/push"
"github.com/tinode/chat/server/store"
t "github.com/tinode/chat/server/store/types"
@ -186,7 +186,7 @@ func clonePayload(src map[string]string) map[string]string {
func PrepareNotifications(rcpt *push.Receipt, config *AndroidConfig) []MessageData {
data, err := payloadToData(&rcpt.Payload)
if err != nil {
log.Println("fcm push: could not parse payload;", err)
logs.Warning.Println("fcm push: could not parse payload;", err)
return nil
}
@ -211,7 +211,7 @@ func PrepareNotifications(rcpt *push.Receipt, config *AndroidConfig) []MessageDa
}
devices, count, err = store.Devices.GetAll(uids...)
if err != nil {
log.Println("fcm push: db error", err)
logs.Warning.Println("fcm push: db error", err)
return nil
}
}
@ -337,7 +337,7 @@ func PrepareNotifications(rcpt *push.Receipt, config *AndroidConfig) []MessageDa
func DevicesForUser(uid t.Uid) []string {
ddef, count, err := store.Devices.GetAll(uid)
if err != nil {
log.Println("fcm devices for user: db error", err)
logs.Warning.Println("fcm devices for user: db error", err)
return nil
}

View File

@ -8,11 +8,11 @@ import (
"context"
"encoding/json"
"errors"
"log"
fbase "firebase.google.com/go"
fcm "firebase.google.com/go/messaging"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/push"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
@ -132,7 +132,7 @@ func sendNotifications(rcpt *push.Receipt, config *configType) {
resp, err := handler.client.SendAll(ctx, batch)
if err != nil {
// Complete failure.
log.Println("fcm SendAll failed", err)
logs.Warning.Println("fcm SendAll failed", err)
break
}
@ -152,7 +152,7 @@ func processSubscription(req *push.ChannelReq) {
if len(devices) > subBatchSize {
// It's extremely unlikely for a single user to have this many devices.
devices = devices[0:subBatchSize]
log.Println("fcm: user", req.Uid.UserId(), "has more than", subBatchSize, "devices")
logs.Warning.Println("fcm: user", req.Uid.UserId(), "has more than", subBatchSize, "devices")
}
var err error
@ -164,7 +164,7 @@ func processSubscription(req *push.ChannelReq) {
}
if err != nil {
// Complete failure.
log.Println("fcm: sub or upsub failed", req.Unsub, err)
logs.Warning.Println("fcm: sub or upsub failed", req.Unsub, err)
} else {
// Check for partial failure.
handleSubErrors(resp, req.Uid, devices)
@ -193,7 +193,7 @@ func handleSubErrors(response *fcm.TopicManagementResponse, uid types.Uid, devic
for _, errinfo := range response.Errors {
// FCM documentation sucks. There is no list of possible errors so no action can be taken but logging.
log.Println("fcm sub/unsub error", errinfo.Reason, uid, devices[errinfo.Index])
logs.Warning.Println("fcm sub/unsub error", errinfo.Reason, uid, devices[errinfo.Index])
}
}
@ -203,24 +203,24 @@ func handleFcmError(err error, uid types.Uid, deviceId string) bool {
fcm.IsInternal(err) ||
fcm.IsUnknown(err) {
// Transient errors. Stop sending this batch.
log.Println("fcm transient failure", err)
logs.Warning.Println("fcm transient failure", err)
return false
}
if fcm.IsMismatchedCredential(err) || fcm.IsInvalidArgument(err) {
// Config errors
log.Println("fcm: request failed", err)
logs.Warning.Println("fcm: request failed", err)
return false
}
if fcm.IsRegistrationTokenNotRegistered(err) {
// Token is no longer valid.
log.Println("fcm: invalid token", uid, err)
logs.Warning.Println("fcm: invalid token", uid, err)
if err := store.Devices.Delete(uid, deviceId); err != nil {
log.Println("fcm: failed to delete invalid token", err)
logs.Warning.Println("fcm: failed to delete invalid token", err)
}
} else {
// All other errors are treated as non-fatal.
log.Println("fcm error:", err)
logs.Warning.Println("fcm error:", err)
}
return true
}

View File

@ -7,10 +7,10 @@ import (
"encoding/json"
"errors"
"io"
"log"
"net/http"
"strings"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/push"
"github.com/tinode/chat/server/push/fcm"
"github.com/tinode/chat/server/store"
@ -173,7 +173,7 @@ func postMessage(endpoint string, body interface{}, config *configType) (*batchR
if err != nil {
// Just log the error, but don't report it to caller. The push succeeded.
log.Println("tnpg failed to decode response", err)
logs.Warning.Println("tnpg failed to decode response", err)
}
batch.httpCode = resp.StatusCode
@ -197,15 +197,15 @@ func sendPushes(rcpt *push.Receipt, config *configType) {
}
resp, err := postMessage(handler.pushUrl, payloads, config)
if err != nil {
log.Println("tnpg push request failed:", err)
logs.Warning.Println("tnpg push request failed:", err)
break
}
if resp.httpCode >= 300 {
log.Println("tnpg push rejected:", resp.httpStatus)
logs.Warning.Println("tnpg push rejected:", resp.httpStatus)
break
}
if resp.FatalCode != "" {
log.Println("tnpg push failed:", resp.FatalMessage)
logs.Error.Println("tnpg push failed:", resp.FatalMessage)
break
}
// Check for expired tokens and other errors.
@ -225,20 +225,20 @@ func processSubscription(req *push.ChannelReq, config *configType) {
if len(su.Devices) > subBatchSize {
// It's extremely unlikely for a single user to have this many devices.
su.Devices = su.Devices[0:subBatchSize]
log.Println("tnpg: user", req.Uid.UserId(), "has more than", subBatchSize, "devices")
logs.Warning.Println("tnpg: user", req.Uid.UserId(), "has more than", subBatchSize, "devices")
}
resp, err := postMessage(handler.subUrl, &su, config)
if err != nil {
log.Println("tnpg channel sub request failed:", err)
logs.Warning.Println("tnpg channel sub request failed:", err)
return
}
if resp.httpCode >= 300 {
log.Println("tnpg channel sub rejected:", resp.httpStatus)
logs.Warning.Println("tnpg channel sub rejected:", resp.httpStatus)
return
}
if resp.FatalCode != "" {
log.Println("tnpg channel sub failed:", resp.FatalMessage)
logs.Error.Println("tnpg channel sub failed:", resp.FatalMessage)
return
}
// Check for expired tokens and other errors.
@ -255,20 +255,20 @@ func handlePushResponse(batch *batchResponse, messages []fcm.MessageData) {
case "": // no error
case messageRateExceeded, quotaExceeded, serverUnavailable, unavailableError, internalError, unknownError:
// Transient errors. Stop sending this batch.
log.Println("tnpg: transient failure", resp.ErrorMessage)
logs.Warning.Println("tnpg: transient failure", resp.ErrorMessage)
return
case mismatchedCredential, invalidArgument, senderIDMismatch, thirdPartyAuthError, invalidAPNSCredentials:
// Config errors
log.Println("tnpg: invalid config", resp.ErrorMessage)
logs.Warning.Println("tnpg: invalid config", resp.ErrorMessage)
return
case registrationTokenNotRegistered, unregisteredError:
// Token is no longer valid.
log.Println("tnpg: invalid token", resp.ErrorMessage)
logs.Warning.Println("tnpg: invalid token", resp.ErrorMessage)
if err := store.Devices.Delete(messages[i].Uid, messages[i].DeviceId); err != nil {
log.Println("tnpg: failed to delete invalid token", err)
logs.Warning.Println("tnpg: failed to delete invalid token", err)
}
default:
log.Println("tnpg: unrecognized error", resp.ErrorMessage)
logs.Warning.Println("tnpg: unrecognized error", resp.ErrorMessage)
}
}
}
@ -280,7 +280,7 @@ func handleSubResponse(batch *batchResponse, req *push.ChannelReq, devices []str
for _, resp := range batch.Responses {
// FCM documentation sucks. There is no list of possible errors so no action can be taken but logging.
log.Println("fcm sub/unsub error", resp.ErrorCode, req.Uid, devices[resp.Index])
logs.Warning.Println("fcm sub/unsub error", resp.ErrorCode, req.Uid, devices[resp.Index])
}
}

View File

@ -6,9 +6,10 @@ import (
"encoding/ascii85"
"hash/crc32"
"hash/fnv"
"log"
"sort"
"strconv"
"github.com/tinode/chat/server/logs"
)
// Hash is a signature of a hash function used by the package.
@ -128,6 +129,6 @@ func (ring *Ring) Signature() string {
func (ring *Ring) dump() {
for _, e := range ring.keys {
log.Printf("key: '%s', hash=%d", e.key, e.hash)
logs.Info.Printf("key: '%s', hash=%d", e.key, e.hash)
}
}

View File

@ -13,7 +13,6 @@ import (
"container/list"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"sync"
@ -23,6 +22,7 @@ import (
"github.com/gorilla/websocket"
"github.com/tinode/chat/pbx"
"github.com/tinode/chat/server/auth"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
@ -284,7 +284,7 @@ func (s *Session) queueOut(msg *ServerComMessage) bool {
if 200 <= msg.Ctrl.Code && msg.Ctrl.Code < 600 {
statsInc(fmt.Sprintf("CtrlCodesTotal%dxx", msg.Ctrl.Code/100), 1)
} else {
log.Println("Invalid response code: ", msg.Ctrl.Code)
logs.Warning.Println("Invalid response code: ", msg.Ctrl.Code)
}
}
@ -296,7 +296,7 @@ func (s *Session) queueOut(msg *ServerComMessage) bool {
case s.send <- data:
default:
// Never block here since it may also block the topic's run() goroutine.
log.Println("s.queueOut: session's send queue full", s.sid)
logs.Error.Println("s.queueOut: session's send queue full", s.sid)
return false
}
return true
@ -312,7 +312,7 @@ func (s *Session) queueOutBytes(data []byte) bool {
select {
case s.send <- data:
default:
log.Println("s.queueOutBytes: session's send queue full", s.sid)
logs.Error.Println("s.queueOutBytes: session's send queue full", s.sid)
return false
}
return true
@ -364,7 +364,7 @@ func (s *Session) dispatchRaw(raw []byte) {
var msg ClientComMessage
if atomic.LoadInt32(&s.terminating) > 0 {
log.Println("s.dispatch: message received on a terminating session", s.sid)
logs.Warning.Println("s.dispatch: message received on a terminating session", s.sid)
s.queueOut(ErrLocked("", "", now))
return
}
@ -381,11 +381,11 @@ func (s *Session) dispatchRaw(raw []byte) {
toLog = raw[:512]
truncated = "<...>"
}
log.Printf("in: '%s%s' sid='%s' uid='%s'", toLog, truncated, s.sid, s.uid)
logs.Info.Printf("in: '%s%s' sid='%s' uid='%s'", toLog, truncated, s.sid, s.uid)
if err := json.Unmarshal(raw, &msg); err != nil {
// Malformed message
log.Println("s.dispatch", err, s.sid)
logs.Warning.Println("s.dispatch", err, s.sid)
s.queueOut(ErrMalformed("", "", now))
return
}
@ -404,11 +404,11 @@ func (s *Session) dispatch(msg *ClientComMessage) {
} else if s.authLvl != auth.LevelRoot {
// Only root user can set non-default msg.from && msg.authLvl values.
s.queueOut(ErrPermissionDenied("", "", msg.Timestamp))
log.Println("s.dispatch: non-root asigned msg.from", s.sid)
logs.Warning.Println("s.dispatch: non-root asigned msg.from", s.sid)
return
} else if fromUid := types.ParseUserId(msg.AsUser); fromUid.IsZero() {
s.queueOut(ErrMalformed("", "", msg.Timestamp))
log.Println("s.dispatch: malformed msg.from: ", msg.AsUser, s.sid)
logs.Warning.Println("s.dispatch: malformed msg.from: ", msg.AsUser, s.sid)
return
} else if auth.Level(msg.AuthLvl) == auth.LevelNone {
// AuthLvl is not set by caller, assign default LevelAuth.
@ -432,7 +432,7 @@ func (s *Session) dispatch(msg *ClientComMessage) {
checkVers := func(m *ClientComMessage, handler func(*ClientComMessage)) func(*ClientComMessage) {
return func(m *ClientComMessage) {
if s.ver == 0 {
log.Println("s.dispatch: {hi} is missing", s.sid)
logs.Warning.Println("s.dispatch: {hi} is missing", s.sid)
s.queueOut(ErrCommandOutOfSequence(m.Id, m.Original, msg.Timestamp))
return
}
@ -444,7 +444,7 @@ func (s *Session) dispatch(msg *ClientComMessage) {
checkUser := func(m *ClientComMessage, handler func(*ClientComMessage)) func(*ClientComMessage) {
return func(m *ClientComMessage) {
if msg.AsUser == "" {
log.Println("s.dispatch: authentication required", s.sid)
logs.Warning.Println("s.dispatch: authentication required", s.sid)
s.queueOut(ErrAuthRequiredReply(m, m.Timestamp))
return
}
@ -507,7 +507,7 @@ func (s *Session) dispatch(msg *ClientComMessage) {
default:
// Unknown message
s.queueOut(ErrMalformed("", "", msg.Timestamp))
log.Println("s.dispatch: unknown message", s.sid)
logs.Warning.Println("s.dispatch: unknown message", s.sid)
return
}
@ -557,7 +557,7 @@ func (s *Session) subscribe(msg *ClientComMessage) {
// Reply with a 500 to the user.
s.queueOut(ErrUnknownReply(msg, msg.Timestamp))
s.inflightReqs.Done()
log.Println("s.subscribe: hub.join queue full, topic ", msg.RcptTo, s.sid)
logs.Error.Println("s.subscribe: hub.join queue full, topic ", msg.RcptTo, s.sid)
}
// Hub will send Ctrl success/failure packets back to session
}
@ -592,7 +592,7 @@ func (s *Session) leave(msg *ClientComMessage) {
} else {
// Session wants to unsubscribe from the topic it did not join
// FIXME(gene): allow topic to unsubscribe without joining first; send to hub to unsub
log.Println("s.leave:", "must attach first", s.sid)
logs.Warning.Println("s.leave:", "must attach first", s.sid)
s.queueOut(ErrAttachFirst(msg, msg.Timestamp))
}
}
@ -643,7 +643,7 @@ func (s *Session) publish(msg *ClientComMessage) {
default:
// Reply with a 500 to the user.
s.queueOut(ErrUnknownReply(msg, msg.Timestamp))
log.Println("s.publish: sub.broadcast channel full, topic ", msg.RcptTo, s.sid)
logs.Error.Println("s.publish: sub.broadcast channel full, topic ", msg.RcptTo, s.sid)
}
} else if msg.RcptTo == "sys" {
// Publishing to "sys" topic requires no subsription.
@ -652,12 +652,12 @@ func (s *Session) publish(msg *ClientComMessage) {
default:
// Reply with a 500 to the user.
s.queueOut(ErrUnknownReply(msg, msg.Timestamp))
log.Println("s.publish: hub.route channel full", s.sid)
logs.Error.Println("s.publish: hub.route channel full", s.sid)
}
} else {
// Publish request received without attaching to topic first.
s.queueOut(ErrAttachFirst(msg, msg.Timestamp))
log.Println("s.publish:", "must attach first", s.sid)
logs.Warning.Println("s.publish:", "must attach first", s.sid)
}
}
@ -669,7 +669,7 @@ func (s *Session) hello(msg *ClientComMessage) {
if s.ver == 0 {
s.ver = parseVersion(msg.Hi.Version)
if s.ver == 0 {
log.Println("s.hello:", "failed to parse version", s.sid)
logs.Warning.Println("s.hello:", "failed to parse version", s.sid)
s.queueOut(ErrMalformed(msg.Id, "", msg.Timestamp))
return
}
@ -677,7 +677,7 @@ func (s *Session) hello(msg *ClientComMessage) {
if versionCompare(s.ver, minSupportedVersionValue) < 0 {
s.ver = 0
s.queueOut(ErrVersionNotSupported(msg.Id, msg.Timestamp))
log.Println("s.hello:", "unsupported version", s.sid)
logs.Warning.Println("s.hello:", "unsupported version", s.sid)
return
}
@ -722,7 +722,7 @@ func (s *Session) hello(msg *ClientComMessage) {
}
if err != nil {
log.Println("s.hello:", "device ID", err, s.sid)
logs.Warning.Println("s.hello:", "device ID", err, s.sid)
s.queueOut(ErrUnknown(msg.Id, "", msg.Timestamp))
return
}
@ -730,7 +730,7 @@ func (s *Session) hello(msg *ClientComMessage) {
} else {
// Version cannot be changed mid-session.
s.queueOut(ErrCommandOutOfSequence(msg.Id, "", msg.Timestamp))
log.Println("s.hello:", "version cannot be changed", s.sid)
logs.Warning.Println("s.hello:", "version cannot be changed", s.sid)
return
}
@ -749,7 +749,7 @@ func (s *Session) hello(msg *ClientComMessage) {
if len(s.lang) > 2 {
// Logging strings longer than 2 b/c language.Parse(XX) always succeeds
// returning confidence Low.
log.Println("s.hello:", "could not parse locale ", s.lang)
logs.Warning.Println("s.hello:", "could not parse locale ", s.lang)
}
s.countryCode = globals.defaultCountryCode
}
@ -782,7 +782,7 @@ func (s *Session) acc(msg *ClientComMessage) {
if msg.Acc.Token != nil {
if !s.uid.IsZero() {
s.queueOut(ErrAlreadyAuthenticated(msg.Acc.Id, "", msg.Timestamp))
log.Println("s.acc: got token while already authenticated", s.sid)
logs.Warning.Println("s.acc: got token while already authenticated", s.sid)
return
}
@ -791,7 +791,7 @@ func (s *Session) acc(msg *ClientComMessage) {
if err != nil {
s.queueOut(decodeStoreError(err, msg.Acc.Id, "", msg.Timestamp,
map[string]interface{}{"what": "auth"}))
log.Println("s.acc: invalid token", err, s.sid)
logs.Warning.Println("s.acc: invalid token", err, s.sid)
return
}
}
@ -827,7 +827,7 @@ func (s *Session) login(msg *ClientComMessage) {
handler := store.GetLogicalAuthHandler(msg.Login.Scheme)
if handler == nil {
log.Println("s.login: unknown authentication scheme", msg.Login.Scheme, s.sid)
logs.Warning.Println("s.login: unknown authentication scheme", msg.Login.Scheme, s.sid)
s.queueOut(ErrAuthUnknownScheme(msg.Id, "", msg.Timestamp))
return
}
@ -837,7 +837,7 @@ func (s *Session) login(msg *ClientComMessage) {
resp := decodeStoreError(err, msg.Id, "", msg.Timestamp, nil)
if resp.Ctrl.Code >= 500 {
// Log internal errors
log.Println("s.login: internal", err, s.sid)
logs.Warning.Println("s.login: internal", err, s.sid)
}
s.queueOut(resp)
return
@ -852,7 +852,7 @@ func (s *Session) login(msg *ClientComMessage) {
}
if err != nil {
log.Println("s.login: user state check failed", rec.Uid, err, s.sid)
logs.Warning.Println("s.login: user state check failed", rec.Uid, err, s.sid)
s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, nil))
return
}
@ -873,7 +873,7 @@ func (s *Session) login(msg *ClientComMessage) {
}
}
if err != nil {
log.Println("s.login: failed to validate credentials:", err, s.sid)
logs.Warning.Println("s.login: failed to validate credentials:", err, s.sid)
s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, nil))
} else {
s.queueOut(s.onLogin(msg.Id, msg.Timestamp, rec, missing))
@ -965,7 +965,7 @@ func (s *Session) onLogin(msgID string, timestamp time.Time, rec *auth.Rec, miss
LastSeen: timestamp,
Lang: s.lang,
}); err != nil {
log.Println("failed to update device record", err)
logs.Warning.Println("failed to update device record", err)
}
}
}
@ -997,14 +997,14 @@ func (s *Session) get(msg *ClientComMessage) {
if meta.pkt.MetaWhat == 0 {
s.queueOut(ErrMalformedReply(msg, msg.Timestamp))
log.Println("s.get: invalid Get message action", msg.Get.What)
logs.Warning.Println("s.get: invalid Get message action", msg.Get.What)
} else if sub != nil {
select {
case sub.meta <- meta:
default:
// Reply with a 500 to the user.
s.queueOut(ErrUnknownReply(msg, msg.Timestamp))
log.Println("s.get: sub.meta channel full, topic ", msg.RcptTo, s.sid)
logs.Error.Println("s.get: sub.meta channel full, topic ", msg.RcptTo, s.sid)
}
} else if meta.pkt.MetaWhat&(constMsgMetaDesc|constMsgMetaSub) != 0 {
// Request some minimal info from a topic not currently attached to.
@ -1013,10 +1013,10 @@ func (s *Session) get(msg *ClientComMessage) {
default:
// Reply with a 500 to the user.
s.queueOut(ErrUnknownReply(msg, msg.Timestamp))
log.Println("s.get: hub.meta channel full", s.sid)
logs.Error.Println("s.get: hub.meta channel full", s.sid)
}
} else {
log.Println("s.get: subscribe first to get=", msg.Get.What)
logs.Warning.Println("s.get: subscribe first to get=", msg.Get.What)
s.queueOut(ErrPermissionDeniedReply(msg, msg.Timestamp))
}
}
@ -1049,17 +1049,17 @@ func (s *Session) set(msg *ClientComMessage) {
if meta.pkt.MetaWhat == 0 {
s.queueOut(ErrMalformedReply(msg, msg.Timestamp))
log.Println("s.set: nil Set action")
logs.Warning.Println("s.set: nil Set action")
} else if sub := s.getSub(msg.RcptTo); sub != nil {
select {
case sub.meta <- meta:
default:
// Reply with a 500 to the user.
s.queueOut(ErrUnknownReply(msg, msg.Timestamp))
log.Println("s.set: sub.meta channel full, topic ", msg.RcptTo, s.sid)
logs.Error.Println("s.set: sub.meta channel full, topic ", msg.RcptTo, s.sid)
}
} else if meta.pkt.MetaWhat&(constMsgMetaTags|constMsgMetaCred) != 0 {
log.Println("s.set: can Set tags/creds for subscribed topics only", meta.pkt.MetaWhat)
logs.Warning.Println("s.set: can Set tags/creds for subscribed topics only", meta.pkt.MetaWhat)
s.queueOut(ErrPermissionDeniedReply(msg, msg.Timestamp))
} else {
// Desc.Private and Sub updates are possible without the subscription.
@ -1068,7 +1068,7 @@ func (s *Session) set(msg *ClientComMessage) {
default:
// Reply with a 500 to the user.
s.queueOut(ErrUnknownReply(msg, msg.Timestamp))
log.Println("s.set: hub.meta channel full", s.sid)
logs.Error.Println("s.set: hub.meta channel full", s.sid)
}
}
}
@ -1094,7 +1094,7 @@ func (s *Session) del(msg *ClientComMessage) {
if msg.MetaWhat == 0 {
s.queueOut(ErrMalformedReply(msg, msg.Timestamp))
log.Println("s.del: invalid Del action", msg.Del.What, s.sid)
logs.Warning.Println("s.del: invalid Del action", msg.Del.What, s.sid)
return
}
sub := s.getSub(msg.RcptTo)
@ -1107,7 +1107,7 @@ func (s *Session) del(msg *ClientComMessage) {
default:
// Reply with a 500 to the user.
s.queueOut(ErrUnknownReply(msg, msg.Timestamp))
log.Println("s.del: sub.meta channel full, topic ", msg.RcptTo, s.sid)
logs.Error.Println("s.del: sub.meta channel full, topic ", msg.RcptTo, s.sid)
}
} else if msg.MetaWhat == constMsgDelTopic {
// Deleting topic: for sessions attached or not attached, send request to hub first.
@ -1121,12 +1121,12 @@ func (s *Session) del(msg *ClientComMessage) {
default:
// Reply with a 500 to the user.
s.queueOut(ErrUnknownReply(msg, msg.Timestamp))
log.Println("s.del: hub.unreg channel full", s.sid)
logs.Error.Println("s.del: hub.unreg channel full", s.sid)
}
} else {
// Must join the topic to delete messages or subscriptions.
s.queueOut(ErrAttachFirst(msg, msg.Timestamp))
log.Println("s.del: invalid Del action while unsubbed", msg.Del.What, s.sid)
logs.Warning.Println("s.del: invalid Del action while unsubbed", msg.Del.What, s.sid)
}
}
@ -1178,7 +1178,7 @@ func (s *Session) note(msg *ClientComMessage) {
default:
// Reply with a 500 to the user.
s.queueOut(ErrUnknownReply(msg, msg.Timestamp))
log.Println("s.note: sub.broacast channel full, topic ", msg.RcptTo, s.sid)
logs.Error.Println("s.note: sub.broacast channel full, topic ", msg.RcptTo, s.sid)
}
} else if msg.Note.What == "recv" {
// Client received a pres notification about a new message, initiated a fetch
@ -1189,11 +1189,11 @@ func (s *Session) note(msg *ClientComMessage) {
default:
// Reply with a 500 to the user.
s.queueOut(ErrUnknownReply(msg, msg.Timestamp))
log.Println("s.note: hub.route channel full", s.sid)
logs.Error.Println("s.note: hub.route channel full", s.sid)
}
} else {
s.queueOut(ErrAttachFirst(msg, msg.Timestamp))
log.Println("s.note: note to invalid topic - must subscribe first", msg.Note.What, s.sid)
logs.Warning.Println("s.note: note to invalid topic - must subscribe first", msg.Note.What, s.sid)
}
}
@ -1205,7 +1205,7 @@ func (s *Session) note(msg *ClientComMessage) {
func (s *Session) expandTopicName(msg *ClientComMessage) (string, *ServerComMessage) {
if msg.Original == "" {
log.Println("s.etn: empty topic name", s.sid)
logs.Warning.Println("s.etn: empty topic name", s.sid)
return "", ErrMalformed(msg.Id, "", msg.Timestamp)
}
@ -1221,11 +1221,11 @@ func (s *Session) expandTopicName(msg *ClientComMessage) (string, *ServerComMess
uid2 := types.ParseUserId(msg.Original)
if uid2.IsZero() {
// Ensure the user id is valid.
log.Println("s.etn: failed to parse p2p topic name", s.sid)
logs.Warning.Println("s.etn: failed to parse p2p topic name", s.sid)
return "", ErrMalformed(msg.Id, msg.Original, msg.Timestamp)
} else if uid2 == uid1 {
// Use 'me' to access self-topic.
log.Println("s.etn: invalid p2p self-subscription", s.sid)
logs.Warning.Println("s.etn: invalid p2p self-subscription", s.sid)
return "", ErrPermissionDeniedReply(msg, msg.Timestamp)
}
routeTo = uid1.P2PName(uid2)

View File

@ -10,13 +10,13 @@ package main
import (
"container/list"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/tinode/chat/pbx"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
)
@ -47,7 +47,7 @@ func (ss *SessionStore) NewSession(conn interface{}, sid string) (*Session, int)
ss.lock.Lock()
if _, found := ss.sessCache[s.sid]; found {
log.Fatalln("ERROR! duplicate session ID", s.sid)
logs.Error.Fatalln("ERROR! duplicate session ID", s.sid)
}
ss.lock.Unlock()
@ -65,7 +65,7 @@ func (ss *SessionStore) NewSession(conn interface{}, sid string) (*Session, int)
s.proto = GRPC
s.grpcnode = c
default:
log.Panicln("session: unknown connection type", conn)
logs.Error.Panicln("session: unknown connection type", conn)
}
s.subs = make(map[string]*Subscription)
@ -166,7 +166,7 @@ func (ss *SessionStore) Shutdown() {
// TODO: Consider broadcasting shutdown to other cluster nodes.
log.Println("SessionStore shut down, sessions terminated:", len(ss.sessCache))
logs.Info.Println("SessionStore shut down, sessions terminated:", len(ss.sessCache))
}
// EvictUser terminates all sessions of a given user.

View File

@ -8,11 +8,12 @@ package main
import (
"encoding/json"
"expvar"
"log"
"net/http"
"runtime"
"sort"
"time"
"github.com/tinode/chat/server/logs"
)
// A simple implementation of histogram expvar.Var.
@ -70,7 +71,7 @@ func statsInit(mux *http.ServeMux, path string) {
go statsUpdater()
log.Printf("stats: variables exposed at '%s'", path)
logs.Info.Printf("stats: variables exposed at '%s'", path)
}
// Register integer variable. Don't check for initialization.
@ -148,12 +149,12 @@ func statsUpdater() {
val := upd.value.(float64)
v.addSample(val)
default:
log.Panicf("stats: unsupported expvar type %T", ev)
logs.Error.Panicf("stats: unsupported expvar type %T", ev)
}
} else {
panic("stats: update to unknown variable " + upd.varname)
}
}
log.Println("stats: shutdown")
logs.Info.Println("stats: shutdown")
}

View File

@ -10,7 +10,6 @@ package main
import (
"errors"
"log"
"reflect"
"sort"
"strings"
@ -19,6 +18,7 @@ import (
"github.com/tinode/chat/server/auth"
"github.com/tinode/chat/server/concurrency"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/push"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
@ -281,7 +281,7 @@ func (t *Topic) fixUpUserCounts(userCounts map[types.Uid]int) {
pud.online -= decrementBy
t.perUser[uid] = pud
if pud.online < 0 {
log.Printf("topic[%s]: invalid online count for user %s", t.name, uid)
logs.Warning.Printf("topic[%s]: invalid online count for user %s", t.name, uid)
}
}
}
@ -321,7 +321,7 @@ func (t *Topic) runLocal(hub *Hub) {
// Failed to subscribe, the topic is still inactive
killTimer.Reset(keepAlive)
}
log.Printf("topic[%s] subscription failed %v, sid=%s", t.name, err, join.sess.sid)
logs.Warning.Printf("topic[%s] subscription failed %v, sid=%s", t.name, err, join.sess.sid)
}
}
if join.sess.inflightReqs != nil {
@ -352,33 +352,33 @@ func (t *Topic) runLocal(hub *Hub) {
// Get request
if meta.pkt.MetaWhat&constMsgMetaDesc != 0 {
if err := t.replyGetDesc(meta.sess, asUid, meta.pkt.Get.Desc, meta.pkt); err != nil {
log.Printf("topic[%s] meta.Get.Desc failed: %s", t.name, err)
logs.Warning.Printf("topic[%s] meta.Get.Desc failed: %s", t.name, err)
}
}
if meta.pkt.MetaWhat&constMsgMetaSub != 0 {
if err := t.replyGetSub(meta.sess, asUid, authLevel, meta.pkt); err != nil {
log.Printf("topic[%s] meta.Get.Sub failed: %s", t.name, err)
logs.Warning.Printf("topic[%s] meta.Get.Sub failed: %s", t.name, err)
}
}
if meta.pkt.MetaWhat&constMsgMetaData != 0 {
if err := t.replyGetData(meta.sess, asUid, meta.pkt.Get.Data, meta.pkt); err != nil {
log.Printf("topic[%s] meta.Get.Data failed: %s", t.name, err)
logs.Warning.Printf("topic[%s] meta.Get.Data failed: %s", t.name, err)
}
}
if meta.pkt.MetaWhat&constMsgMetaDel != 0 {
if err := t.replyGetDel(meta.sess, asUid, meta.pkt.Get.Del, meta.pkt); err != nil {
log.Printf("topic[%s] meta.Get.Del failed: %s", t.name, err)
logs.Warning.Printf("topic[%s] meta.Get.Del failed: %s", t.name, err)
}
}
if meta.pkt.MetaWhat&constMsgMetaTags != 0 {
if err := t.replyGetTags(meta.sess, asUid, meta.pkt); err != nil {
log.Printf("topic[%s] meta.Get.Tags failed: %s", t.name, err)
logs.Warning.Printf("topic[%s] meta.Get.Tags failed: %s", t.name, err)
}
}
if meta.pkt.MetaWhat&constMsgMetaCred != 0 {
log.Printf("topic[%s] handle getCred", t.name)
logs.Warning.Printf("topic[%s] handle getCred", t.name)
if err := t.replyGetCreds(meta.sess, asUid, meta.pkt); err != nil {
log.Printf("topic[%s] meta.Get.Creds failed: %s", t.name, err)
logs.Warning.Printf("topic[%s] meta.Get.Creds failed: %s", t.name, err)
}
}
@ -389,22 +389,22 @@ func (t *Topic) runLocal(hub *Hub) {
// Notify plugins of the update
pluginTopic(t, plgActUpd)
} else {
log.Printf("topic[%s] meta.Set.Desc failed: %v", t.name, err)
logs.Warning.Printf("topic[%s] meta.Set.Desc failed: %v", t.name, err)
}
}
if meta.pkt.MetaWhat&constMsgMetaSub != 0 {
if err := t.replySetSub(hub, meta.sess, meta.pkt); err != nil {
log.Printf("topic[%s] meta.Set.Sub failed: %v", t.name, err)
logs.Warning.Printf("topic[%s] meta.Set.Sub failed: %v", t.name, err)
}
}
if meta.pkt.MetaWhat&constMsgMetaTags != 0 {
if err := t.replySetTags(meta.sess, asUid, meta.pkt); err != nil {
log.Printf("topic[%s] meta.Set.Tags failed: %v", t.name, err)
logs.Warning.Printf("topic[%s] meta.Set.Tags failed: %v", t.name, err)
}
}
if meta.pkt.MetaWhat&constMsgMetaCred != 0 {
if err := t.replySetCred(meta.sess, asUid, authLevel, meta.pkt); err != nil {
log.Printf("topic[%s] meta.Set.Cred failed: %v", t.name, err)
logs.Warning.Printf("topic[%s] meta.Set.Cred failed: %v", t.name, err)
}
}
@ -423,7 +423,7 @@ func (t *Topic) runLocal(hub *Hub) {
}
if err != nil {
log.Printf("topic[%s] meta.Del failed: %v", t.name, err)
logs.Warning.Printf("topic[%s] meta.Del failed: %v", t.name, err)
}
}
case upd := <-t.supd:
@ -432,7 +432,7 @@ func (t *Topic) runLocal(hub *Hub) {
t.sessToForeground(upd.sess)
} else if currentUA != upd.userAgent {
if t.cat != types.TopicCatMe {
log.Panicln("invalid topic category in UA update", t.name)
logs.Warning.Panicln("invalid topic category in UA update", t.name)
}
// 'me' only. Process an update to user agent from one of the sessions.
currentUA = upd.userAgent
@ -516,42 +516,42 @@ func (t *Topic) handleSubscription(h *Hub, join *sessionJoin) error {
if getWhat&constMsgMetaDesc != 0 {
// Send get.desc as a {meta} packet.
if err := t.replyGetDesc(join.sess, asUid, msgsub.Get.Desc, join.pkt); err != nil {
log.Printf("topic[%s] handleSubscription Get.Desc failed: %v sid=%s", t.name, err, join.sess.sid)
logs.Warning.Printf("topic[%s] handleSubscription Get.Desc failed: %v sid=%s", t.name, err, join.sess.sid)
}
}
if getWhat&constMsgMetaSub != 0 {
// Send get.sub response as a separate {meta} packet
if err := t.replyGetSub(join.sess, asUid, authLevel, join.pkt); err != nil {
log.Printf("topic[%s] handleSubscription Get.Sub failed: %v sid=%s", t.name, err, join.sess.sid)
logs.Warning.Printf("topic[%s] handleSubscription Get.Sub failed: %v sid=%s", t.name, err, join.sess.sid)
}
}
if getWhat&constMsgMetaTags != 0 {
// Send get.tags response as a separate {meta} packet
if err := t.replyGetTags(join.sess, asUid, join.pkt); err != nil {
log.Printf("topic[%s] handleSubscription Get.Tags failed: %v sid=%s", t.name, err, join.sess.sid)
logs.Warning.Printf("topic[%s] handleSubscription Get.Tags failed: %v sid=%s", t.name, err, join.sess.sid)
}
}
if getWhat&constMsgMetaCred != 0 {
// Send get.tags response as a separate {meta} packet
if err := t.replyGetCreds(join.sess, asUid, join.pkt); err != nil {
log.Printf("topic[%s] handleSubscription Get.Cred failed: %v sid=%s", t.name, err, join.sess.sid)
logs.Warning.Printf("topic[%s] handleSubscription Get.Cred failed: %v sid=%s", t.name, err, join.sess.sid)
}
}
if getWhat&constMsgMetaData != 0 {
// Send get.data response as {data} packets
if err := t.replyGetData(join.sess, asUid, msgsub.Get.Data, join.pkt); err != nil {
log.Printf("topic[%s] handleSubscription Get.Data failed: %v sid=%s", t.name, err, join.sess.sid)
logs.Warning.Printf("topic[%s] handleSubscription Get.Data failed: %v sid=%s", t.name, err, join.sess.sid)
}
}
if getWhat&constMsgMetaDel != 0 {
// Send get.del response as a separate {meta} packet
if err := t.replyGetDel(join.sess, asUid, msgsub.Get.Del, join.pkt); err != nil {
log.Printf("topic[%s] handleSubscription Get.Del failed: %v sid=%s", t.name, err, join.sess.sid)
logs.Warning.Printf("topic[%s] handleSubscription Get.Del failed: %v sid=%s", t.name, err, join.sess.sid)
}
}
@ -591,7 +591,7 @@ func (t *Topic) handleLeaveRequest(hub *Hub, leave *sessionLeave) {
// User wants to leave and unsubscribe.
// asUid must not be Zero.
if err := t.replyLeaveUnsub(hub, leave.sess, leave.pkt, asUid); err != nil {
log.Println("failed to unsub", err, leave.sess.sid)
logs.Error.Println("failed to unsub", err, leave.sess.sid)
return
}
} else if pssd, _ := t.remSession(leave.sess, asUid); pssd != nil {
@ -637,7 +637,7 @@ func (t *Topic) handleLeaveRequest(hub *Hub, leave *sessionLeave) {
t.perUser[uid] = pud
}
} else if !leave.sess.isCluster() {
log.Panic("cannot determine uid: leave req=", leave)
logs.Warning.Panic("cannot determine uid: leave req=", leave)
}
switch t.cat {
@ -663,7 +663,7 @@ func (t *Topic) handleLeaveRequest(hub *Hub, leave *sessionLeave) {
if !meUid.IsZero() {
// Update user's last online timestamp & user agent. Only one user can be subscribed to 'me' topic.
if err := store.Users.UpdateLastSeen(meUid, mrs.userAgent, now); err != nil {
log.Println(err)
logs.Warning.Println(err)
}
}
case types.TopicCatFnd:
@ -802,7 +802,7 @@ func (t *Topic) sendSubNotifications(asUid types.Uid, sid, userAgent string) {
if !t.isLoaded() {
t.markLoaded()
if err := t.loadContacts(asUid); err != nil {
log.Println("topic: failed to load contacts", t.name, err.Error())
logs.Error.Println("topic: failed to load contacts", t.name, err.Error())
}
// User online: notify users of interest without forcing response (no +en here).
t.presUsersOfInterest("on", userAgent)
@ -871,7 +871,7 @@ func (t *Topic) handleBroadcast(msg *ServerComMessage) {
Head: msg.Data.Head,
Content: msg.Data.Content}, (userData.modeGiven & userData.modeWant).IsReader()); err != nil {
log.Printf("topic[%s]: failed to save message: %v", t.name, err)
logs.Warning.Printf("topic[%s]: failed to save message: %v", t.name, err)
msg.sess.queueOut(ErrUnknown(msg.Id, t.original(asUid), msg.Timestamp))
return
@ -970,7 +970,7 @@ func (t *Topic) handleBroadcast(msg *ServerComMessage) {
"ReadSeqId": pud.readID},
false); err != nil {
log.Printf("topic[%s]: failed to update SeqRead/Recv counter: %v", t.name, err)
logs.Warning.Printf("topic[%s]: failed to update SeqRead/Recv counter: %v", t.name, err)
return
}
@ -984,7 +984,7 @@ func (t *Topic) handleBroadcast(msg *ServerComMessage) {
}
} else {
// TODO(gene): remove this
log.Panic("topic: wrong message type for broadcasting", t.name)
logs.Error.Panic("topic: wrong message type for broadcasting", t.name)
}
// Broadcast the message. Only {data}, {pres}, {info} are broadcastable.
@ -1043,13 +1043,13 @@ func (t *Topic) handleBroadcast(msg *ServerComMessage) {
}
// Send message to session.
if !sess.queueOut(msg) {
log.Printf("topic[%s]: connection stuck, detaching - %s", t.name, sess.sid)
logs.Warning.Printf("topic[%s]: connection stuck, detaching - %s", t.name, sess.sid)
// The whole session is being dropped, so sessionLeave.pkt is not set.
// Must not block here: it may lead to a deadlock.
select {
case t.unreg <- &sessionLeave{sess: sess}:
default:
log.Printf("topic[%s]: unreg queue full - %s", t.name, sess.sid)
logs.Error.Printf("topic[%s]: unreg queue full - %s", t.name, sess.sid)
}
}
}
@ -3416,7 +3416,7 @@ func (t *Topic) remProxiedSession(sess *Session) bool {
numChans := len(t.proxiedChannels) - 3
t.proxiedChannels = t.proxiedChannels[:numChans]
if len(t.proxiedSessions)*3+1 != len(t.proxiedChannels) {
log.Panicf("topic[%s]: #proxied sessions (%d) vs #proxied channels mismatch (%d)",
logs.Error.Panicf("topic[%s]: #proxied sessions (%d) vs #proxied channels mismatch (%d)",
t.name, len(t.proxiedSessions), len(t.proxiedChannels))
}
}
@ -3477,7 +3477,7 @@ func (t *Topic) remSession(sess *Session, asUid types.Uid) (*perSessionData, boo
if pssd.uid == asUid || asUid.IsZero() {
delete(t.sessions, s)
if s.isMultiplex() && !t.remProxiedSession(s) {
log.Printf("topic[%s]: multiplex session %s not removed from the event loop", t.name, s.sid)
logs.Error.Printf("topic[%s]: multiplex session %s not removed from the event loop", t.name, s.sid)
}
return &pssd, true
}
@ -3490,7 +3490,7 @@ func (t *Topic) remSession(sess *Session, asUid types.Uid) (*perSessionData, boo
if len(pssd.muids) == 0 {
delete(t.sessions, s)
if s.isMultiplex() && !t.remProxiedSession(s) {
log.Printf("topic[%s]: multiplex session %s not removed from the event loop: no more attached uids", t.name, s.sid)
logs.Error.Printf("topic[%s]: multiplex session %s not removed from the event loop: no more attached uids", t.name, s.sid)
}
return &pssd, true
} else {

View File

@ -7,10 +7,10 @@
package main
import (
"log"
"net/http"
"time"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store/types"
)
@ -27,7 +27,7 @@ func (t *Topic) runProxy(hub *Hub) {
} else {
// Response (ctrl message) will be handled when it's received via the proxy channel.
if err := globals.cluster.routeToTopicMaster(ProxyReqJoin, join.pkt, t.name, join.sess); err != nil {
log.Println("proxy topic: route join request from proxy to master failed:", err)
logs.Warning.Println("proxy topic: route join request from proxy to master failed:", err)
}
}
if join.sess.inflightReqs != nil {
@ -36,7 +36,7 @@ func (t *Topic) runProxy(hub *Hub) {
case leave := <-t.unreg:
if !t.handleProxyLeaveRequest(leave, killTimer) {
log.Println("Failed to update proxy topic state for leave request", leave.sess.sid)
logs.Warning.Println("Failed to update proxy topic state for leave request", leave.sess.sid)
}
if leave.pkt != nil && leave.sess.inflightReqs != nil {
// If it's a client initiated request.
@ -46,13 +46,13 @@ func (t *Topic) runProxy(hub *Hub) {
case msg := <-t.broadcast:
// Content message intended for broadcasting to recipients
if err := globals.cluster.routeToTopicMaster(ProxyReqBroadcast, msg, t.name, msg.sess); err != nil {
log.Println("proxy topic: route broadcast request from proxy to master failed:", err)
logs.Warning.Println("proxy topic: route broadcast request from proxy to master failed:", err)
}
case meta := <-t.meta:
// Request to get/set topic metadata
if err := globals.cluster.routeToTopicMaster(ProxyReqMeta, meta.pkt, t.name, meta.sess); err != nil {
log.Println("proxy topic: route meta request from proxy to master failed:", err)
logs.Warning.Println("proxy topic: route meta request from proxy to master failed:", err)
}
case upd := <-t.supd:
@ -64,7 +64,7 @@ func (t *Topic) runProxy(hub *Hub) {
// Subscribed user may not match session user. Find out who is subscribed
pssd, ok := t.sessions[upd.sess]
if !ok {
log.Println("proxy topic: sess update request from detached session")
logs.Warning.Println("proxy topic: sess update request from detached session")
continue
}
req = ProxyReqBgSession
@ -73,7 +73,7 @@ func (t *Topic) runProxy(hub *Hub) {
tmpSess.userAgent = upd.sess.userAgent
}
if err := globals.cluster.routeToTopicMaster(req, nil, t.name, tmpSess); err != nil {
log.Println("proxy topic: route sess update request from proxy to master failed:", err)
logs.Warning.Println("proxy topic: route sess update request from proxy to master failed:", err)
}
case msg := <-t.proxy:
@ -86,7 +86,7 @@ func (t *Topic) runProxy(hub *Hub) {
}
if err := globals.cluster.topicProxyGone(t.name); err != nil {
log.Printf("topic proxy shutdown [%s]: failed to notify master - %s", t.name, err)
logs.Warning.Printf("topic proxy shutdown [%s]: failed to notify master - %s", t.name, err)
}
// Report completion back to sender, if 'done' is not nil.
@ -116,7 +116,7 @@ func (t *Topic) handleProxyLeaveRequest(leave *sessionLeave, killTimer *time.Tim
if pssd, ok := t.sessions[leave.sess]; ok {
asUid = pssd.uid
} else {
log.Println("proxy topic: leave request sent for unknown session")
logs.Warning.Println("proxy topic: leave request sent for unknown session")
return false
}
}
@ -136,7 +136,7 @@ func (t *Topic) handleProxyLeaveRequest(leave *sessionLeave, killTimer *time.Tim
pkt = leave.pkt
}
if err := globals.cluster.routeToTopicMaster(ProxyReqLeave, pkt, t.name, leave.sess); err != nil {
log.Println("proxy topic: route broadcast request from proxy to master failed:", err)
logs.Warning.Println("proxy topic: route broadcast request from proxy to master failed:", err)
}
if len(t.sessions) == 0 {
// No more sessions attached. Start the countdown.
@ -169,7 +169,7 @@ func (t *Topic) proxyMasterResponse(msg *ClusterResp, killTimer *time.Timer) {
} else {
sess := globals.sessionStore.Get(msg.OrigSid)
if sess == nil {
log.Println("topic_proxy: session not found; already terminated?", msg.OrigSid)
logs.Warning.Println("topic_proxy: session not found; already terminated?", msg.OrigSid)
}
switch msg.OrigReqType {
case ProxyReqJoin:
@ -213,12 +213,12 @@ func (t *Topic) proxyMasterResponse(msg *ClusterResp, killTimer *time.Timer) {
}
default:
log.Printf("proxy topic [%s] received response referencing unexpected request type %d",
logs.Error.Printf("proxy topic [%s] received response referencing unexpected request type %d",
t.name, msg.OrigReqType)
}
if sess != nil && !sess.queueOut(msg.SrvMsg) {
log.Println("topic proxy: timeout")
logs.Error.Println("topic proxy: timeout")
}
}
}
@ -228,7 +228,7 @@ func (t *Topic) proxyCtrlBroadcast(msg *ServerComMessage) {
if msg.Ctrl.Code == http.StatusResetContent && msg.Ctrl.Text == "evicted" {
// We received a ctrl command for evicting a user.
if msg.uid.IsZero() {
log.Panicf("topic[%s]: proxy received evict message with empty uid", t.name)
logs.Error.Panicf("topic[%s]: proxy received evict message with empty uid", t.name)
}
for sess := range t.sessions {
// Proxy topic may only have ordinary sessions. No multiplexing or proxy sessions here.
@ -247,18 +247,18 @@ func (t *Topic) updateAcsFromPresMsg(pres *MsgServerPres) {
uid := types.ParseUserId(pres.Src)
dacs := pres.Acs
if uid.IsZero() {
log.Printf("proxy topic[%s]: received acs change for invalid user id '%s'", t.name, pres.Src)
logs.Warning.Printf("proxy topic[%s]: received acs change for invalid user id '%s'", t.name, pres.Src)
return
}
// If t.perUser[uid] does not exist, pud is initialized with blanks, otherwise it gets existing values.
pud := t.perUser[uid]
if err := pud.modeWant.ApplyMutation(dacs.Want); err != nil {
log.Printf("proxy topic[%s]: could not process acs change - want: %+v", t.name, err)
logs.Warning.Printf("proxy topic[%s]: could not process acs change - want: %+v", t.name, err)
return
}
if err := pud.modeGiven.ApplyMutation(dacs.Given); err != nil {
log.Printf("proxy topic[%s]: could not process acs change - given: %+v", t.name, err)
logs.Warning.Printf("proxy topic[%s]: could not process acs change - given: %+v", t.name, err)
return
}
// Update existing or add new.

View File

@ -1,11 +1,11 @@
package main
import (
"log"
"time"
"github.com/tinode/chat/server/auth"
"github.com/tinode/chat/server/push"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
)
@ -15,7 +15,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
// The session cannot authenticate with the new account because it's already authenticated.
if msg.Acc.Login && (!s.uid.IsZero() || rec != nil) {
s.queueOut(ErrAlreadyAuthenticated(msg.Id, "", msg.Timestamp))
log.Println("create user: login requested while authenticated", s.sid)
logs.Warning.Println("create user: login requested while authenticated", s.sid)
return
}
@ -24,13 +24,13 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
if authhdl == nil {
// New accounts must have an authentication scheme
s.queueOut(ErrMalformed(msg.Id, "", msg.Timestamp))
log.Println("create user: unknown auth handler", s.sid)
logs.Warning.Println("create user: unknown auth handler", s.sid)
return
}
// Check if login is unique.
if ok, err := authhdl.IsUnique(msg.Acc.Secret, s.remoteAddr); !ok {
log.Println("create user: auth secret is not unique", err, s.sid)
logs.Warning.Println("create user: auth secret is not unique", err, s.sid)
s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp,
map[string]interface{}{"what": "auth"}))
return
@ -42,7 +42,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
// If account state is being assigned, make sure the sender is a root user.
if msg.Acc.State != "" {
if auth.Level(msg.AuthLvl) != auth.LevelRoot {
log.Println("create user: attempt to set account state by non-root", s.sid)
logs.Warning.Println("create user: attempt to set account state by non-root", s.sid)
msg := ErrPermissionDenied(msg.Id, "", msg.Timestamp)
msg.Ctrl.Params = map[string]interface{}{"what": "state"}
s.queueOut(msg)
@ -51,7 +51,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
state, err := types.NewObjState(msg.Acc.State)
if err != nil || state == types.StateUndefined || state == types.StateDeleted {
log.Println("create user: invalid account state", err, s.sid)
logs.Warning.Println("create user: invalid account state", err, s.sid)
s.queueOut(ErrMalformed(msg.Id, "", msg.Timestamp))
return
}
@ -61,7 +61,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
// Ensure tags are unique and not restricted.
if tags := normalizeTags(msg.Acc.Tags); tags != nil {
if !restrictedTagsEqual(tags, nil, globals.immutableTagNS) {
log.Println("create user: attempt to directly assign restricted tags", s.sid)
logs.Warning.Println("create user: attempt to directly assign restricted tags", s.sid)
msg := ErrPermissionDenied(msg.Id, "", msg.Timestamp)
msg.Ctrl.Params = map[string]interface{}{"what": "tags"}
s.queueOut(msg)
@ -77,7 +77,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
cr := &creds[i]
vld := store.GetValidator(cr.Method)
if _, err := vld.PreCheck(cr.Value, cr.Params); err != nil {
log.Println("create user: failed credential pre-check", cr, err, s.sid)
logs.Warning.Println("create user: failed credential pre-check", cr, err, s.sid)
s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp,
map[string]interface{}{"what": cr.Method}))
return
@ -118,7 +118,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
// Create user record in the database.
if _, err := store.Users.Create(&user, private); err != nil {
log.Println("create user: failed to create user", err, s.sid)
logs.Warning.Println("create user: failed to create user", err, s.sid)
s.queueOut(ErrUnknown(msg.Id, "", msg.Timestamp))
return
}
@ -126,7 +126,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
// Add authentication record. The authhdl.AddRecord may change tags.
rec, err := authhdl.AddRecord(&auth.Rec{Uid: user.Uid(), Tags: user.Tags}, msg.Acc.Secret, s.remoteAddr)
if err != nil {
log.Println("create user: add auth record failed", err, s.sid)
logs.Warning.Println("create user: add auth record failed", err, s.sid)
// Attempt to delete incomplete user record
store.Users.Delete(user.Uid(), false)
s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, nil))
@ -136,7 +136,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
// When creating an account, the user must provide all required credentials.
// If any are missing, reject the request.
if len(creds) < len(globals.authValidators[rec.AuthLevel]) {
log.Println("create user: missing credentials; have:", creds, "want:", globals.authValidators[rec.AuthLevel], s.sid)
logs.Warning.Println("create user: missing credentials; have:", creds, "want:", globals.authValidators[rec.AuthLevel], s.sid)
// Attempt to delete incomplete user record
store.Users.Delete(user.Uid(), false)
_, missing := stringSliceDelta(globals.authValidators[rec.AuthLevel], credentialMethods(creds))
@ -155,7 +155,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
if err != nil {
// Delete incomplete user record.
store.Users.Delete(user.Uid(), false)
log.Println("create user: failed to save or validate credential", err, s.sid)
logs.Warning.Println("create user: failed to save or validate credential", err, s.sid)
s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, nil))
return
}
@ -194,12 +194,12 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
func replyUpdateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
if s.uid.IsZero() && rec == nil {
// Session is not authenticated and no token provided.
log.Println("replyUpdateUser: not a new account and not authenticated", s.sid)
logs.Warning.Println("replyUpdateUser: not a new account and not authenticated", s.sid)
s.queueOut(ErrPermissionDenied(msg.Id, "", msg.Timestamp))
return
} else if msg.AsUser != "" && rec != nil {
// Two UIDs: one from msg.from, one from token. Ambigous, reject.
log.Println("replyUpdateUser: got both authenticated session and token", s.sid)
logs.Warning.Println("replyUpdateUser: got both authenticated session and token", s.sid)
s.queueOut(ErrMalformed(msg.Id, "", msg.Timestamp))
return
}
@ -213,7 +213,7 @@ func replyUpdateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
if msg.Acc.User != "" && msg.Acc.User != userId {
if s.authLvl != auth.LevelRoot {
log.Println("replyUpdateUser: attempt to change another's account by non-root", s.sid)
logs.Warning.Println("replyUpdateUser: attempt to change another's account by non-root", s.sid)
s.queueOut(ErrPermissionDenied(msg.Id, "", msg.Timestamp))
return
}
@ -226,14 +226,14 @@ func replyUpdateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
if uid.IsZero() {
// msg.Acc.User contains invalid data.
s.queueOut(ErrMalformed(msg.Id, "", msg.Timestamp))
log.Println("replyUpdateUser: user id is invalid or missing", s.sid)
logs.Warning.Println("replyUpdateUser: user id is invalid or missing", s.sid)
return
}
// Only root can suspend accounts, including own account.
if msg.Acc.State != "" && s.authLvl != auth.LevelRoot {
s.queueOut(ErrPermissionDenied(msg.Id, "", msg.Timestamp))
log.Println("replyUpdateUser: attempt to change account state by non-root", s.sid)
logs.Warning.Println("replyUpdateUser: attempt to change account state by non-root", s.sid)
return
}
@ -242,7 +242,7 @@ func replyUpdateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
err = types.ErrNotFound
}
if err != nil {
log.Println("replyUpdateUser: failed to fetch user from DB", err, s.sid)
logs.Warning.Println("replyUpdateUser: failed to fetch user from DB", err, s.sid)
s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, nil))
return
}
@ -254,7 +254,7 @@ func replyUpdateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
if authLvl == auth.LevelNone {
// msg.Acc.AuthLevel contains invalid data.
s.queueOut(ErrMalformed(msg.Id, "", msg.Timestamp))
log.Println("replyUpdateUser: auth level is missing", s.sid)
logs.Warning.Println("replyUpdateUser: auth level is missing", s.sid)
return
}
// Handle request to update credentials.
@ -288,7 +288,7 @@ func replyUpdateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
}
if err != nil {
log.Println("replyUpdateUser: failed to update user", err, s.sid)
logs.Warning.Println("replyUpdateUser: failed to update user", err, s.sid)
s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, nil))
return
}
@ -315,7 +315,7 @@ func updateUserAuth(msg *ClientComMessage, user *types.User, rec *auth.Rec, remo
// Tags may have been changed by authhdl.UpdateRecord, reset them.
// Can't do much with the error here, logging it but not returning.
if _, err = store.Users.UpdateTags(user.Uid(), nil, nil, rec.Tags); err != nil {
log.Println("updateUserAuth tags update failed:", err)
logs.Warning.Println("updateUserAuth tags update failed:", err)
}
return nil
}
@ -359,7 +359,7 @@ func addCreds(uid types.Uid, creds []MsgCredClient, extraTags []string, lang str
if utags, err := store.Users.UpdateTags(uid, extraTags, nil, nil); err == nil {
extraTags = utags
} else {
log.Println("add cred tags update failed:", err)
logs.Warning.Println("add cred tags update failed:", err)
}
} else {
extraTags = nil
@ -430,7 +430,7 @@ func validatedCreds(uid types.Uid, authLvl auth.Level, creds []MsgCredClient, er
if utags, err := store.Users.UpdateTags(uid, tagsToAdd, nil, nil); err == nil {
tags = utags
} else {
log.Println("validated creds tags update failed:", err)
logs.Warning.Println("validated creds tags update failed:", err)
tags = nil
}
} else {
@ -506,7 +506,7 @@ func deleteCred(uid types.Uid, authLvl auth.Level, cred *MsgCredClient) ([]strin
if utags, err := store.Users.UpdateTags(uid, nil, []string{cred.Method + ":" + cred.Value}, nil); err == nil {
tags = utags
} else {
log.Println("delete cred: failed to update tags:", err)
logs.Warning.Println("delete cred: failed to update tags:", err)
tags = nil
}
} else {
@ -525,7 +525,7 @@ func deleteCred(uid types.Uid, authLvl auth.Level, cred *MsgCredClient) ([]strin
func changeUserState(s *Session, uid types.Uid, user *types.User, msg *ClientComMessage) (bool, error) {
state, err := types.NewObjState(msg.Acc.State)
if err != nil || state == types.StateUndefined {
log.Println("replyUpdateUser: invalid account state", s.sid)
logs.Warning.Println("replyUpdateUser: invalid account state", s.sid)
return false, types.ErrMalformed
}
@ -569,12 +569,12 @@ func replyDelUser(s *Session, msg *ClientComMessage) {
// Delete another user.
uid = types.ParseUserId(msg.Del.User)
if uid.IsZero() {
log.Println("replyDelUser: invalid user ID", msg.Del.User, s.sid)
logs.Warning.Println("replyDelUser: invalid user ID", msg.Del.User, s.sid)
s.queueOut(ErrMalformed(msg.Id, "", msg.Timestamp))
return
}
} else {
log.Println("replyDelUser: illegal attempt to delete another user", msg.Del.User, s.sid)
logs.Warning.Println("replyDelUser: illegal attempt to delete another user", msg.Del.User, s.sid)
s.queueOut(ErrPermissionDenied(msg.Id, "", msg.Timestamp))
return
}
@ -584,7 +584,7 @@ func replyDelUser(s *Session, msg *ClientComMessage) {
for _, name := range authnames {
if err := store.GetAuthHandler(name).DelRecords(uid); err != nil {
// This could be completely benign, i.e. authenticator exists but not used.
log.Println("replyDelUser: failed to delete auth record", uid.UserId(), name, err, s.sid)
logs.Warning.Println("replyDelUser: failed to delete auth record", uid.UserId(), name, err, s.sid)
if storeErr, ok := err.(types.StoreError); ok && storeErr == types.ErrUnsupported {
// Authenticator refused to delete record: user account cannot be deleted.
s.queueOut(ErrOperationNotAllowed(msg.Id, "", msg.Timestamp))
@ -607,7 +607,7 @@ func replyDelUser(s *Session, msg *ClientComMessage) {
if uoi, err := store.Users.GetSubs(uid, nil); err == nil {
presUsersOfInterestOffline(uid, uoi, "gone")
} else {
log.Println("replyDelUser: failed to send notifications to users", err, s.sid)
logs.Warning.Println("replyDelUser: failed to send notifications to users", err, s.sid)
}
// Notify subscribers of the group topics where the user was the owner that the topics were deleted.
@ -616,18 +616,18 @@ func replyDelUser(s *Session, msg *ClientComMessage) {
if subs, err := store.Topics.GetSubs(topicName, nil); err == nil {
presSubsOfflineOffline(topicName, types.TopicCatGrp, subs, "gone", &presParams{}, s.sid)
} else {
log.Println("replyDelUser: failed to notify topic subscribers", err, topicName, s.sid)
logs.Warning.Println("replyDelUser: failed to notify topic subscribers", err, topicName, s.sid)
}
}
} else {
log.Println("replyDelUser: failed to send notifications to owned topics", err, s.sid)
logs.Warning.Println("replyDelUser: failed to send notifications to owned topics", err, s.sid)
}
// TODO: suspend all P2P topics with the user.
// Delete user's records from the database.
if err := store.Users.Delete(uid, msg.Del.Hard); err != nil {
log.Println("replyDelUser: failed to delete user", err, s.sid)
logs.Warning.Println("replyDelUser: failed to delete user", err, s.sid)
s.queueOut(decodeStoreError(err, msg.Id, "", msg.Timestamp, nil))
return
}
@ -858,14 +858,14 @@ func userUpdater() {
unreadUpdater := func(uid types.Uid, val int, inc bool) int {
uce, ok := usersCache[uid]
if !ok {
log.Println("ERROR: attempt to update unread count for user who has not been loaded")
logs.Error.Println("ERROR: attempt to update unread count for user who has not been loaded")
return -1
}
if uce.unread < 0 {
count, err := store.Users.GetUnreadCount(uid)
if err != nil {
log.Println("users: failed to load unread count", err)
logs.Warning.Println("users: failed to load unread count", err)
return -1
}
uce.unread = count
@ -930,7 +930,7 @@ func userUpdater() {
}
} else {
// BUG!
log.Println("ERROR: request to unregister user which has not been registered", uid)
logs.Error.Println("ERROR: request to unregister user which has not been registered", uid)
}
}
continue
@ -946,5 +946,5 @@ func userUpdater() {
unreadUpdater(upd.UserId, upd.Unread, upd.Inc)
}
log.Println("users: shutdown")
logs.Info.Println("users: shutdown")
}

View File

@ -7,7 +7,6 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"net"
"path/filepath"
"reflect"
@ -20,6 +19,7 @@ import (
"unicode/utf8"
"github.com/tinode/chat/server/auth"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
@ -457,7 +457,7 @@ func rewriteTag(orig, countryCode string, withLogin bool) string {
return orig
}
log.Printf("invalid generic tag '%s'", orig)
logs.Warning.Printf("invalid generic tag '%s'", orig)
return ""
}

View File

@ -8,7 +8,6 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"math/rand"
qp "mime/quotedprintable"
"net/mail"
@ -21,6 +20,7 @@ import (
textt "text/template"
"time"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store"
t "github.com/tinode/chat/server/store/types"
i18n "golang.org/x/text/language"
@ -545,7 +545,7 @@ func (v *validator) send(to string, content *emailContent) error {
err := v.sendMail([]string{to}, message.Bytes())
if err != nil {
log.Println("SMTP error", to, err)
logs.Warning.Println("SMTP error", to, err)
}
return err