link attachments to messages where they are used

This commit is contained in:
or-else
2018-06-13 22:15:01 +03:00
parent 49945a62e2
commit fff22a62ef
9 changed files with 292 additions and 81 deletions

View File

@ -14,6 +14,12 @@ The promise of [XMPP](http://xmpp.org/) was to deliver federated instant messagi
The goal of this project is to actually deliver on XMPP's original vision: create a modern open platform for federated instant messaging with emphasis on mobile communication. A secondary goal is to create a decentralized IM platform which is much harder to track and block by the governments.
## Getting support
* For support, general questions, discussion post to [https://groups.google.com/d/forum/tinode](https://groups.google.com/d/forum/tinode).
* For bugs [open an issue](https://github.com/tinode/chat/issues/new).
## Demo
### Web

View File

@ -30,7 +30,7 @@ const (
defaultDSN = "root:@tcp(localhost:3306)/tinode?parseTime=true"
defaultDatabase = "tinode"
dbVersion = 102
dbVersion = 103
adapterName = "mysql"
)
@ -384,12 +384,16 @@ func (a *adapter) CreateDb(reset bool) error {
createdat DATETIME(3) NOT NULL,
updatedat DATETIME(3) NOT NULL,
userid BIGINT NOT NULL,
seqid INT,
topic CHAR(25) NOT NULL,
status INT NOT NULL,
mimetype VARCHAR(255) NOT NULL,
size BIGINT NOT NULL,
location VARCHAR(2048) NOT NULL,
PRIMARY KEY(id),
FOREIGN KEY(userid) REFERENCES users(id)
FOREIGN KEY(userid) REFERENCES users(id),
FOREIGN KEY(topic) REFERENCES topics(name),
INDEX messages_topic_seqid (topic, seqid)
)`); err != nil {
return err
}
@ -1428,8 +1432,8 @@ func (a *adapter) MessageGetDeleted(topic string, forUser t.Uid, opts *t.QueryOp
if opts.Since > 0 {
lower = opts.Since
}
if opts.Before > 0 {
upper = opts.Before
if opts.Before > 1 {
upper = opts.Before - 1
}
if opts.Limit > 0 && opts.Limit < limit {
@ -1684,8 +1688,8 @@ func (a *adapter) CredDel(uid t.Uid, method string) error {
func (a *adapter) CredConfirm(uid t.Uid, method string) error {
res, err := a.db.Exec(
"UPDATE credentials SET done=1,synthetic=CONCAT(method,':',value) WHERE userid=? AND method=?",
store.DecodeUid(uid), method)
"UPDATE credentials SET updatedat=?,done=1,synthetic=CONCAT(method,':',value) WHERE userid=? AND method=?",
t.TimeNow(), store.DecodeUid(uid), method)
if err != nil {
if isDupe(err) {
return t.ErrDuplicate
@ -1699,8 +1703,8 @@ func (a *adapter) CredConfirm(uid t.Uid, method string) error {
}
func (a *adapter) CredFail(uid t.Uid, method string) error {
_, err := a.db.Exec("UPDATE credentials SET retries=retries+1 WHERE userid=? AND method=?",
store.DecodeUid(uid), method)
_, err := a.db.Exec("UPDATE credentials SET updatedat=?,retries=retries+1 WHERE userid=? AND method=?",
t.TimeNow(), store.DecodeUid(uid), method)
return err
}
@ -1738,12 +1742,12 @@ func (a *adapter) FileStartUpload(fd *t.FileDef) error {
_, err := a.db.Exec("INSERT INTO fileuploads(id,createdat,updatedat,userid,status,mimetype,size,location)"+
" VALUES(?,?,?,?,?,?,?,?)",
store.DecodeUid(fd.Uid()), fd.CreatedAt, fd.UpdatedAt,
store.DecodeUid(t.ParseUid(fd.User)), fd.Status, fd.MimeType, fd.Size, fd.Location)
store.DecodeUid(t.ParseUid(fd.User)), fd.Topic, fd.SeqId, fd.Status, fd.MimeType, fd.Size, fd.Location)
return err
}
// FileFinishUpload markes file upload as completed, successfully or otherwise
func (a *adapter) FileFinishUpload(fid string, status int) (*t.FileDef, error) {
func (a *adapter) FileFinishUpload(fid string, status int, size int64) (*t.FileDef, error) {
id := t.ParseUid(fid)
if id.IsZero() {
return nil, t.ErrMalformed
@ -1756,34 +1760,100 @@ func (a *adapter) FileFinishUpload(fid string, status int) (*t.FileDef, error) {
if fd == nil {
return nil, t.ErrNotFound
}
_, err = a.db.Exec("UPDATE fileuploads SET status=? WHERE id=?", status, store.DecodeUid(id))
fd.UpdatedAt = t.TimeNow()
_, err = a.db.Exec("UPDATE fileuploads SET updatedat=?, status=?, size=? WHERE id=?",
fd.UpdatedAt, status, size, store.DecodeUid(id))
if err == nil {
fd.Status = status
fd.Size = size
} else {
fd = nil
}
return fd, err
}
// FilesForUser returns all file records for a given user. Query is currently ignored.
// FIXME: use opts.
func (a *adapter) FilesForUser(uid t.Uid, opts *t.QueryOpt) ([]t.FileDef, error) {
rows, err := a.db.Queryx("SELECT id,createdat,updatedat,status,mimetype,size,location "+
"FROM fileuploads WHERE userid=?", store.DecodeUid(uid))
// FilePosted creates a relationship between the file and a message it was posted in.
// TODO: create a separate table for linking attachments to messages.
func (a *adapter) FilePosted(fid string, topic string, seqid int) error {
id := t.ParseUid(fid)
if id.IsZero() {
return t.ErrMalformed
}
_, err := a.db.Exec("UPDATE fileuploads SET topic=?, seqid=? WHERE id=? AND seqid=0",
topic, seqid, store.DecodeUid(id))
return err
}
func optsToQuery(opts *t.QueryOpt) (string, []interface{}) {
var params []string
var args []interface{}
var query string
if !opts.User.IsZero() {
params = append(params, "userid=?")
args = append(args, store.DecodeUid(opts.User))
}
if opts.Topic != "" {
params = append(params, "topic=?")
args = append(args, opts.Topic)
if opts.Before > 1 {
params = append(params, "seqid<?")
args = append(args, opts.Before)
}
if opts.Since > 0 {
params = append(params, "seqid>=?")
args = append(args, opts.Since)
}
}
if len(params) > 0 {
query = strings.Join(params, " AND ")
} else {
return "", nil
}
if opts.Limit > 0 {
var limit = opts.Limit
if limit > maxResults {
limit = maxResults
}
query += " LIMIT ?"
args = append(args, limit)
}
return query, args
}
// FilesForUser returns file records for a given user.
func (a *adapter) FilesGetAll(opts *t.QueryOpt) ([]t.FileDef, error) {
query := "SELECT id,createdat,updatedat,userid AS user,topic,seqid,status,mimetype,size,location " +
"FROM fileuploads WHERE "
var args []interface{}
var where string
if opts != nil {
where, args = optsToQuery(opts)
query += where
}
if len(args) == 0 {
// Must provide some query parameters
return nil, t.ErrMalformed
}
rows, err := a.db.Queryx(query, args...)
if err != nil {
return nil, err
}
var result []t.FileDef
user := uid.String()
for rows.Next() {
var fd t.FileDef
if err = rows.StructScan(&fd); err != nil {
break
}
fd.Id = encodeString(fd.Id).String()
fd.User = user
fd.User = encodeString(fd.User).String()
result = append(result, fd)
}
rows.Close()
@ -1799,7 +1869,7 @@ func (a *adapter) FileGet(fid string) (*t.FileDef, error) {
}
var fd t.FileDef
err := a.db.Get(&fd, "SELECT id,createdat,updatedat,userid AS user,status,mimetype,size,location "+
err := a.db.Get(&fd, "SELECT id,createdat,updatedat,userid AS user,topic,seqid,status,mimetype,size,location "+
"FROM fileuploads WHERE id=?", store.DecodeUid(id))
if err == sql.ErrNoRows {
return nil, nil
@ -1818,29 +1888,20 @@ func (a *adapter) FileGet(fid string) (*t.FileDef, error) {
// FileDelete deletes records of a files if uid matches the owner.
// If uid is zero, delete the records regardless of the owner.
// If fids is not provided, delete all record of a given user.
func (a *adapter) FileDelete(uid t.Uid, fids ...string) error {
var args []interface{}
func (a *adapter) FileDelete(opts *t.QueryOpt) error {
query := "DELETE FROM fileuploads WHERE "
if len(fids) > 0 {
query += "id IN (?" + strings.Repeat(",?", len(fids)-1) + ") "
for _, fid := range fids {
id := t.ParseUid(fid)
if id.IsZero() {
return t.ErrMalformed
}
args = append(args, id)
}
}
if !uid.IsZero() {
if len(args) > 0 {
query += "AND "
}
query += "userid=?"
args = append(args, store.DecodeUid(uid))
var args []interface{}
var where string
if opts != nil {
where, args = optsToQuery(opts)
query += where
}
if len(args) == 0 {
return errors.New("FileDelete: no arguments provided")
// Must provide some query parameters
return t.ErrMalformed
}
_, err := a.db.Exec(query, args...)
return err
}

View File

@ -184,11 +184,16 @@ CREATE TABLE fileuploads(
createdat DATETIME(3) NOT NULL,
updatedat DATETIME(3) NOT NULL,
userid BIGINT NOT NULL,
seqid INT,
topic CHAR(25) NOT NULL,
status INT NOT NULL,
mimetype VARCHAR(255) NOT NULL,
size BIGINT NOT NULL,
location VARCHAR(2048) NOT NULL,
PRIMARY KEY(id),
FOREIGN KEY(userid) REFERENCES users(id)
FOREIGN KEY(userid) REFERENCES users(id),
FOREIGN KEY(topic) REFERENCES topics(name),
# This index should not be unique: one message may have multiple attachments.
INDEX messages_topic_seqid (topic, seqid)
)

View File

@ -27,7 +27,7 @@ const (
defaultHost = "localhost:28015"
defaultDatabase = "tinode"
dbVersion = 102
dbVersion = 103
adapterName = "rethinkdb"
)
@ -282,11 +282,17 @@ func (a *adapter) CreateDb(reset bool) error {
if _, err := rdb.DB(a.dbName).TableCreate("fileuploads", rdb.TableCreateOpts{PrimaryKey: "Id"}).RunWrite(a.conn); err != nil {
return err
}
// Create secondary index on fileuploads.User to be able to get records by user id.
// A secondary index on fileuploads.User to be able to get records by user id.
if _, err := rdb.DB(a.dbName).Table("fileuploads").IndexCreate("User").RunWrite(a.conn); err != nil {
return err
}
// Another secondary index for linking uploaded files to messages.
if _, err := rdb.DB(a.dbName).Table("fileuploads").IndexCreateFunc("Topic_SeqId",
func(row rdb.Term) interface{} {
return []interface{}{row.Field("Topic"), row.Field("SeqId")}
}).RunWrite(a.conn); err != nil {
return err
}
return nil
}
@ -1413,6 +1419,7 @@ func (a *adapter) CredConfirm(uid t.Uid, method string) error {
}
creds[0].Done = true
creds[0].UpdatedAt = t.TimeNow()
if err = a.CredAdd(creds[0]); err != nil {
if rdb.IsConflictErr(err) {
return t.ErrDuplicate
@ -1434,7 +1441,8 @@ func (a *adapter) CredFail(uid t.Uid, method string) error {
GetAllByIndex("User", uid.String()).
Filter(map[string]interface{}{"Method": method}).
Update(map[string]interface{}{
"Retries": rdb.Row.Field("Retries").Add(1).Default(0),
"Retries": rdb.Row.Field("Retries").Add(1).Default(0),
"UpdatedAt": t.TimeNow(),
}).RunWrite(a.conn)
return err
}
@ -1467,21 +1475,82 @@ func (a *adapter) FileStartUpload(fd *t.FileDef) error {
}
// FileFinishUpload markes file upload as completed, successfully or otherwise
func (a *adapter) FileFinishUpload(fid string, status int) (*t.FileDef, error) {
func (a *adapter) FileFinishUpload(fid string, status int, size int64) (*t.FileDef, error) {
if _, err := rdb.DB(a.dbName).Table("fileuploads").Get(fid).
Update(map[string]interface{}{"Status": status}).RunWrite(a.conn); err != nil {
Update(map[string]interface{}{
"Status": status,
"Size": size,
}).RunWrite(a.conn); err != nil {
return nil, err
}
return a.FileGet(fid)
}
// FilePosted creates a relationship between the file and a message it was posted in.
func (a *adapter) FilePosted(fid string, topic string, seqid int) error {
_, err := rdb.DB(a.dbName).Table("fileuploads").Get(fid).
Update(map[string]interface{}{
"Topic": topic,
"SeqId": seqid,
}).RunWrite(a.conn)
return err
}
func optsToQuery(q rdb.Term, opts *t.QueryOpt) *rdb.Term {
if !opts.User.IsZero() {
// Select all user uploads by User index, then filter Topic and SeqId.
q = q.GetAllByIndex("User", opts.User.String())
if opts.Topic != "" {
q = q.Filter(rdb.Row.Field("Topic").Eq(opts.Topic))
if opts.Before > 1 {
q = q.Filter(rdb.Row.Field("SeqId").Lt(opts.Before))
}
if opts.Since > 0 {
q = q.Filter(rdb.Row.Field("SeqId").Ge(opts.Since))
}
}
} else if opts.Topic != "" {
// Select by Topic and SeqId
var lower, upper interface{}
upper = rdb.MaxVal
lower = rdb.MinVal
if opts.Since > 0 {
lower = opts.Since
}
if opts.Before > 1 {
upper = opts.Before
}
q = q.Between([]interface{}{opts.Topic, lower},
[]interface{}{opts.Topic, upper},
rdb.BetweenOpts{Index: "Topic_SeqId"})
} else {
return nil
}
if opts.Limit > 0 && opts.Limit < maxResults {
q = q.Limit(opts.Limit)
}
return &q
}
// FilesForUser returns all file records for a given user. Query is currently ignored.
// FIXME: use opts.
func (a *adapter) FilesForUser(uid t.Uid, opts *t.QueryOpt) ([]t.FileDef, error) {
rows, err := rdb.DB(a.dbName).Table("fileuploads").
GetAllByIndex("User", uid.String()).Run(a.conn)
func (a *adapter) FilesGetAll(opts *t.QueryOpt) ([]t.FileDef, error) {
var q *rdb.Term
if opts != nil {
q = optsToQuery(rdb.DB(a.dbName).Table("fileuploads"), opts)
}
if q == nil {
return nil, t.ErrMalformed
}
rows, err := q.Run(a.conn)
if err != nil || rows.IsNil() {
return nil, err
}
@ -1513,17 +1582,14 @@ func (a *adapter) FileGet(fid string) (*t.FileDef, error) {
// FileDelete deletes records of a files if uid matches the owner.
// If uid is zero, delete the records regardless of the owner.
// If fids is not provided, delete all record of a given user.
func (a *adapter) FileDelete(uid t.Uid, fids ...string) error {
q := rdb.DB(a.dbName).Table("fileuploads")
if len(fids) > 0 {
q = q.GetAll(fids)
if !uid.IsZero() {
q = q.Filter(map[string]interface{}{"User": uid.String()})
}
} else if !uid.IsZero() {
q = q.GetAllByIndex("User", uid.String())
} else {
return errors.New("FileDelete: no arguments provided")
func (a *adapter) FileDelete(opts *t.QueryOpt) error {
var q *rdb.Term
if opts != nil {
q = optsToQuery(rdb.DB(a.dbName).Table("fileuploads"), opts)
}
if q == nil {
return t.ErrMalformed
}
_, err := q.Delete().RunWrite(a.conn)

View File

@ -254,10 +254,11 @@ Indexes:
Sample:
```js
{
"CreatedAt": Tue Dec 05 2017 01:51:38 GMT+00:00 ,
"DelId": 18 ,
"DeletedFor": "xY-YHx09-WI" ,
"Id": "9LfrjW349Rc" ,
"Id": "9LfrjW349Rc",
"CreatedAt": Tue Dec 05 2017 01:51:38 GMT+00:00,
"DelId": 18,
"DeletedFor": "xY-YHx09-WI" ,
"SeqIdRanges": [
{
"Low": 20,
@ -272,4 +273,61 @@ Sample:
### Table `credentials`
The tables stores user credentials used for validation.
### Table `fileuploads`
* `Id` unique credential, primary key
* `CreatedAt` timestamp when the record was created
* `UpdatedAt` timestamp when the last validation attempt was performed (successful or not).
* `Method` validation method
* `Done` indicator if the credential is validated
* `Resp` expected validation response
* `Retries` number of failed attempts at validation
* `User` id of the user who owns this credential
* `Value` value of the credential
Indexes:
* `Id` Primary key composed either as `User`:`Method`:`Value` for unconfirmed credentials or as `Method`:`Value` for confirmed.
* `User` Index
Sample:
```js
{
"Id": "tel:17025550001",
"CreatedAt": Sun Jun 10 2018 16:37:27 GMT+00:00 ,
"Method": "tel" ,
"Done": true ,
"Resp": "123456" ,
"Retries": 0 ,
"UpdatedAt": Sun Jun 10 2018 16:37:27 GMT+00:00 ,
"User": "k3srBRk9RYw" ,
"Value": "17025550001"
}
```
### Table `fileuploads`
The table stores records of uploaded files. The files themselves are stored outside of the database.
* `Id` unique user-visible file name, primary key
* `CreatedAt` timestamp when the record was created
* `UpdatedAt` timestamp of when th upload has cmpleted or failed
* `User` id of the user who uploaded this file.
* `Location` actual location of the file on the server.
* `MimeType` file content type as a [Mime](https://en.wikipedia.org/wiki/MIME) string.
* `Size` size of the file in bytes. Could be 0 if upload has not completed yet.
* `Status` upload status: 0 pending, 1 completed, -1 failed.
Indexes:
* `Id` file name, primary key
* `User` index
* `Topic_SeqId` compound index `["Topic", "SeqId"]`
Sample:
```js
{
"CreatedAt": Sun Jun 10 2018 16:38:45 GMT+00:00 ,
"Id": "sFmjlQ_kA6A" ,
"Location": "uploads/sFmjlQ_kA6A" ,
"MimeType": "image/jpeg" ,
"Size": 54961090 ,
"Status": 1 ,
"UpdatedAt": Sun Jun 10 2018 16:38:45 GMT+00:00 ,
"User": "7j-RR1V7O3Y"
}
```

View File

@ -206,17 +206,17 @@ func largeFileUpload(wrt http.ResponseWriter, req *http.Request) {
return
}
_, err = io.Copy(outfile, file)
size, err := io.Copy(outfile, file)
log.Println("Finished upload", fdef.Location)
outfile.Close()
if err != nil {
store.Files.FinishUpload(fdef.Id, false)
store.Files.FinishUpload(fdef.Id, false, 0)
os.Remove(fdef.Location)
writeHttpResponse(nil)
return
}
store.Files.FinishUpload(fdef.Id, true)
store.Files.FinishUpload(fdef.Id, true, size)
fname := fdef.Id
ext, _ := mime.ExtensionsByType(fdef.MimeType)

View File

@ -134,13 +134,14 @@ type Adapter interface {
// FileStartUpload initializes a file upload
FileStartUpload(fd *t.FileDef) error
// FileFinishUpload markes file upload as completed, successfully or otherwise
FileFinishUpload(fid string, status int) (*t.FileDef, error)
// FilesForUser returns all file records for a given user.
FilesForUser(uid t.Uid, opts *t.QueryOpt) ([]t.FileDef, error)
// FileFinishUpload markes file upload as completed, successfully or otherwise.
FileFinishUpload(fid string, status int, size int64) (*t.FileDef, error)
// FilePosted markes file as an attachment in a specific message
FilePosted(fid string, topic string, seqid int) error
// FileGet fetches a record of a specific file
FileGet(fid string) (*t.FileDef, error)
// FileDelete deletes records of a file if file owner matched the uid.
// If uid is zero, the ownership is not checked.
FileDelete(uid t.Uid, fid ...string) error
// FilesGetAll returns all file records for a given query.
FilesGetAll(opts *t.QueryOpt) ([]t.FileDef, error)
// FileDelete deletes file records by query.
FileDelete(opts *t.QueryOpt) error
}

View File

@ -397,7 +397,7 @@ func (TopicsObjMapper) Update(topic string, update map[string]interface{}) error
return adp.TopicUpdate(topic, update)
}
// Delete deletes topic, messages and subscriptions.
// Delete deletes topic, messages, attachments, and subscriptions.
func (TopicsObjMapper) Delete(topic string) error {
if err := adp.SubsDelForTopic(topic); err != nil {
return err
@ -405,6 +405,9 @@ func (TopicsObjMapper) Delete(topic string) error {
if err := adp.MessageDeleteList(topic, nil); err != nil {
return err
}
if err := adp.FileDelete(&types.QueryOpt{Topic: topic}); err != nil {
return err
}
return adp.TopicDelete(topic)
}
@ -485,6 +488,8 @@ func (MessagesObjMapper) DeleteList(topic string, delID int, forUser types.Uid,
return err
}
// TODO: delete file attachments
// Soft-deleting will update one subscription, hard-deleting will ipdate all.
// Soft- or hard- is defined by the forUSer being defined.
return adp.SubsUpdate(topic, forUser, map[string]interface{}{"DelId": delID})
@ -614,17 +619,17 @@ func (FileMapper) StartUpload(fd *types.FileDef) error {
}
// FinishUpload marks started upload as successfully finished.
func (FileMapper) FinishUpload(fid string, success bool) (*types.FileDef, error) {
func (FileMapper) FinishUpload(fid string, success bool, size int64) (*types.FileDef, error) {
status := types.UploadCompleted
if !success {
status = types.UploadFailed
}
return adp.FileFinishUpload(fid, status)
return adp.FileFinishUpload(fid, status, size)
}
// GetForUser fetches all file records for a given user
func (FileMapper) GetForUser(uid types.Uid) ([]types.FileDef, error) {
return adp.FilesForUser(uid, nil)
func (FileMapper) GetAll(opts *types.QueryOpt) ([]types.FileDef, error) {
return adp.FilesGetAll(opts)
}
// Get fetches a file record for a unique file id.
@ -634,6 +639,11 @@ func (FileMapper) Get(fid string) (*types.FileDef, error) {
// Delete deletes file records by a list of file IDs. If uid is not zero, only
// files owned by the given user are deleted. If fids are missing, delete all files for the given uid.
func (FileMapper) Delete(uid types.Uid, fid ...string) error {
return adp.FileDelete(uid, fid...)
func (FileMapper) Delete(opts *types.QueryOpt) error {
return adp.FileDelete(opts)
}
// Posted links file with a message it was sent in.
func (FileMapper) Posted(fid string, topic string, seqid int) error {
return adp.FilePosted(fid, topic, seqid)
}

View File

@ -974,6 +974,10 @@ type FileDef struct {
Status int
// User who created the file
User string
// Topic where this file was first posted
Topic string
// SeqId of the message where the file was first attached.
SeqId int
// Type of the file.
MimeType string
// Size of the file in bytes.