diff --git a/pkg/mount/dir.go b/pkg/mount/dir.go deleted file mode 100644 index 3a484c1..0000000 --- a/pkg/mount/dir.go +++ /dev/null @@ -1,268 +0,0 @@ -package mount - -import ( - "context" - "fmt" - "io" - "os" - "syscall" - "time" - - "bazil.org/fuse" - fusefs "bazil.org/fuse/fs" - "github.com/rclone/rclone/cmd/mountlib" - "github.com/rclone/rclone/fs" - "github.com/rclone/rclone/fs/log" - "github.com/rclone/rclone/vfs" -) - -// Dir represents a directory entry -type Dir struct { - *vfs.Dir - fsys *FS -} - -// Check interface satisfied -var _ fusefs.Node = (*Dir)(nil) - -// Attr updates the attributes of a directory -func (d *Dir) Attr(ctx context.Context, a *fuse.Attr) (err error) { - defer log.Trace(d, "")("attr=%+v, err=%v", a, &err) - a.Valid = d.fsys.opt.AttrTimeout - a.Gid = d.VFS().Opt.GID - a.Uid = d.VFS().Opt.UID - a.Mode = os.ModeDir | d.VFS().Opt.DirPerms - modTime := d.ModTime() - a.Atime = modTime - a.Mtime = modTime - a.Ctime = modTime - // FIXME include Valid so get some caching? - // FIXME fs.Debugf(d.path, "Dir.Attr %+v", a) - return nil -} - -// Check interface satisfied -var _ fusefs.NodeSetattrer = (*Dir)(nil) - -// Setattr handles attribute changes from FUSE. Currently supports ModTime only. -func (d *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) (err error) { - defer log.Trace(d, "stat=%+v", req)("err=%v", &err) - if d.VFS().Opt.NoModTime { - return nil - } - - if req.Valid.MtimeNow() { - err = d.SetModTime(time.Now()) - } else if req.Valid.Mtime() { - err = d.SetModTime(req.Mtime) - } - - return err -} - -// Check interface satisfied -var _ fusefs.NodeRequestLookuper = (*Dir)(nil) - -// Lookup looks up a specific entry in the receiver. -// -// Lookup should return a Node corresponding to the entry. If the -// name does not exist in the directory, Lookup should return ENOENT. -// -// Lookup need not to handle the names "." and "..". -func (d *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fusefs.Node, err error) { - defer log.Trace(d, "name=%q", req.Name)("node=%+v, err=%v", &node, &err) - mnode, err := d.Dir.Stat(req.Name) - if err != nil { - return nil, err - } - resp.EntryValid = d.fsys.opt.AttrTimeout - // Check the mnode to see if it has a fuse Node cached - // We must return the same fuse nodes for vfs Nodes - node, ok := mnode.Sys().(fusefs.Node) - if ok { - return node, nil - } - switch x := mnode.(type) { - case *vfs.File: - node = &File{x, d.fsys} - case *vfs.Dir: - node = &Dir{x, d.fsys} - default: - panic("bad type") - } - // Cache the node for later - mnode.SetSys(node) - return node, nil -} - -// Check interface satisfied -var _ fusefs.HandleReadDirAller = (*Dir)(nil) - -// ReadDirAll reads the contents of the directory -func (d *Dir) ReadDirAll(ctx context.Context) (dirents []fuse.Dirent, err error) { - itemsRead := -1 - defer log.Trace(d, "")("item=%d, err=%v", &itemsRead, &err) - items, err := d.Dir.ReadDirAll() - if err != nil { - return nil, (err) - } - dirents = append(dirents, fuse.Dirent{ - Type: fuse.DT_Dir, - Name: ".", - }, fuse.Dirent{ - Type: fuse.DT_Dir, - Name: "..", - }) - for _, node := range items { - name := node.Name() - if len(name) > mountlib.MaxLeafSize { - fs.Errorf(d, "Name too long (%d bytes) for FUSE, skipping: %s", len(name), name) - continue - } - var dirent = fuse.Dirent{ - // Inode FIXME ??? - Type: fuse.DT_File, - Name: name, - } - if node.IsDir() { - dirent.Type = fuse.DT_Dir - } - dirents = append(dirents, dirent) - } - itemsRead = len(dirents) - return dirents, nil -} - -var _ fusefs.NodeCreater = (*Dir)(nil) - -// Create makes a new file -func (d *Dir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (node fusefs.Node, handle fusefs.Handle, err error) { - defer log.Trace(d, "name=%q", req.Name)("node=%v, handle=%v, err=%v", &node, &handle, &err) - file, err := d.Dir.Create(req.Name, int(req.Flags)) - if err != nil { - return nil, nil, (err) - } - fh, err := file.Open(int(req.Flags) | os.O_CREATE) - if err != nil { - return nil, nil, (err) - } - node = &File{file, d.fsys} - file.SetSys(node) // cache the FUSE node for later - return node, &FileHandle{fh}, err -} - -var _ fusefs.NodeMkdirer = (*Dir)(nil) - -// Mkdir creates a new directory -func (d *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (node fusefs.Node, err error) { - defer log.Trace(d, "name=%q", req.Name)("node=%+v, err=%v", &node, &err) - dir, err := d.Dir.Mkdir(req.Name) - if err != nil { - return nil, (err) - } - node = &Dir{dir, d.fsys} - dir.SetSys(node) // cache the FUSE node for later - return node, nil -} - -var _ fusefs.NodeRemover = (*Dir)(nil) - -// Remove removes the entry with the given name from -// the receiver, which must be a directory. The entry to be removed -// may correspond to a file (unlink) or to a directory (rmdir). -func (d *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) (err error) { - defer log.Trace(d, "name=%q", req.Name)("err=%v", &err) - err = d.Dir.RemoveName(req.Name) - if err != nil { - return (err) - } - return nil -} - -// Invalidate a leaf in a directory -func (d *Dir) invalidateEntry(dirNode fusefs.Node, leaf string) { - fs.Debugf(dirNode, "Invalidating %q", leaf) - err := d.fsys.server.InvalidateEntry(dirNode, leaf) - if err != nil { - fs.Debugf(dirNode, "Failed to invalidate %q: %v", leaf, err) - } -} - -// Check interface satisfied -var _ fusefs.NodeRenamer = (*Dir)(nil) - -// Rename the file -func (d *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fusefs.Node) (err error) { - defer log.Trace(d, "oldName=%q, newName=%q, newDir=%+v", req.OldName, req.NewName, newDir)("err=%v", &err) - destDir, ok := newDir.(*Dir) - if !ok { - return fmt.Errorf("unknown Dir type %T", newDir) - } - - err = d.Dir.Rename(req.OldName, req.NewName, destDir.Dir) - if err != nil { - return (err) - } - - // Invalidate the new directory entry so it gets re-read (in - // the background otherwise we cause a deadlock) - // - // See https://github.com/rclone/rclone/issues/4977 for why - go d.invalidateEntry(newDir, req.NewName) - //go d.invalidateEntry(d, req.OldName) - - return nil -} - -// Check interface satisfied -var _ fusefs.NodeFsyncer = (*Dir)(nil) - -// Fsync the directory -func (d *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) (err error) { - defer log.Trace(d, "")("err=%v", &err) - err = d.Dir.Sync() - if err != nil { - return (err) - } - return nil -} - -// Check interface satisfied -var _ fusefs.NodeLinker = (*Dir)(nil) - -// Link creates a new directory entry in the receiver based on an -// existing Node. Receiver must be a directory. -func (d *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fusefs.Node) (newNode fusefs.Node, err error) { - defer log.Trace(d, "req=%v, old=%v", req, old)("new=%v, err=%v", &newNode, &err) - return nil, syscall.ENOSYS -} - -// Check interface satisfied -var _ fusefs.NodeMknoder = (*Dir)(nil) - -// Mknod is called to create a file. Since we define create this will -// be called in preference, however NFS likes to call it for some -// reason. We don't actually create a file here just the Node. -func (d *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (node fusefs.Node, err error) { - defer log.Trace(d, "name=%v, mode=%d, rdev=%d", req.Name, req.Mode, req.Rdev)("node=%v, err=%v", &node, &err) - if req.Rdev != 0 { - fs.Errorf(d, "Can't create device node %q", req.Name) - return nil, fuse.EIO - } - var cReq = fuse.CreateRequest{ - Name: req.Name, - Flags: fuse.OpenFlags(os.O_CREATE | os.O_WRONLY), - Mode: req.Mode, - Umask: req.Umask, - } - var cResp fuse.CreateResponse - node, handle, err := d.Create(ctx, &cReq, &cResp) - if err != nil { - return nil, err - } - err = handle.(io.Closer).Close() - if err != nil { - return nil, err - } - return node, nil -} diff --git a/pkg/mount/file.go b/pkg/mount/file.go deleted file mode 100644 index ca09697..0000000 --- a/pkg/mount/file.go +++ /dev/null @@ -1,125 +0,0 @@ -package mount - -import ( - "context" - "syscall" - "time" - - "bazil.org/fuse" - fusefs "bazil.org/fuse/fs" - "github.com/rclone/rclone/fs/log" - "github.com/rclone/rclone/vfs" -) - -// File represents a file -type File struct { - *vfs.File - fsys *FS -} - -// Check interface satisfied -var _ fusefs.Node = (*File)(nil) - -// Attr fills out the attributes for the file -func (f *File) Attr(ctx context.Context, a *fuse.Attr) (err error) { - defer log.Trace(f, "")("a=%+v, err=%v", a, &err) - a.Valid = f.fsys.opt.AttrTimeout - modTime := f.File.ModTime() - Size := uint64(f.File.Size()) - Blocks := (Size + 511) / 512 - a.Gid = f.VFS().Opt.GID - a.Uid = f.VFS().Opt.UID - a.Mode = f.VFS().Opt.FilePerms - a.Size = Size - a.Atime = modTime - a.Mtime = modTime - a.Ctime = modTime - a.Blocks = Blocks - return nil -} - -// Check interface satisfied -var _ fusefs.NodeSetattrer = (*File)(nil) - -// Setattr handles attribute changes from FUSE. Currently supports ModTime and Size only -func (f *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) (err error) { - defer log.Trace(f, "a=%+v", req)("err=%v", &err) - if !f.VFS().Opt.NoModTime { - if req.Valid.Mtime() { - err = f.File.SetModTime(req.Mtime) - } else if req.Valid.MtimeNow() { - err = f.File.SetModTime(time.Now()) - } - } - if req.Valid.Size() { - err = f.File.Truncate(int64(req.Size)) - } - return (err) -} - -// Check interface satisfied -var _ fusefs.NodeOpener = (*File)(nil) - -// Open the file for read or write -func (f *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fh fusefs.Handle, err error) { - defer log.Trace(f, "flags=%v", req.Flags)("fh=%v, err=%v", &fh, &err) - - // fuse flags are based off syscall flags as are os flags, so - // should be compatible - handle, err := f.File.Open(int(req.Flags)) - if err != nil { - return nil, (err) - } - - // If size unknown then use direct io to read - if entry := handle.Node().DirEntry(); entry != nil && entry.Size() < 0 { - resp.Flags |= fuse.OpenDirectIO - } - - return &FileHandle{handle}, nil -} - -// Check interface satisfied -var _ fusefs.NodeFsyncer = (*File)(nil) - -// Fsync the file -// -// Note that we don't do anything except return OK -func (f *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) (err error) { - defer log.Trace(f, "")("err=%v", &err) - return nil -} - -// Getxattr gets an extended attribute by the given name from the -// node. -// -// If there is no xattr by that name, returns fuse.ErrNoXattr. -func (f *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - return syscall.ENOSYS // we never implement this -} - -var _ fusefs.NodeGetxattrer = (*File)(nil) - -// Listxattr lists the extended attributes recorded for the node. -func (f *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { - return syscall.ENOSYS // we never implement this -} - -var _ fusefs.NodeListxattrer = (*File)(nil) - -// Setxattr sets an extended attribute with the given name and -// value for the node. -func (f *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { - return syscall.ENOSYS // we never implement this -} - -var _ fusefs.NodeSetxattrer = (*File)(nil) - -// Removexattr removes an extended attribute for the name. -// -// If there is no xattr by that name, returns fuse.ErrNoXattr. -func (f *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error { - return syscall.ENOSYS // we never implement this -} - -var _ fusefs.NodeRemovexattrer = (*File)(nil) diff --git a/pkg/mount/handle.go b/pkg/mount/handle.go deleted file mode 100644 index fce31cc..0000000 --- a/pkg/mount/handle.go +++ /dev/null @@ -1,82 +0,0 @@ -package mount - -import ( - "context" - "io" - - "bazil.org/fuse" - fusefs "bazil.org/fuse/fs" - "github.com/rclone/rclone/fs/log" - "github.com/rclone/rclone/vfs" -) - -// FileHandle is an open for read file handle on a File -type FileHandle struct { - vfs.Handle -} - -// Check interface satisfied -var _ fusefs.HandleReader = (*FileHandle)(nil) - -// Read from the file handle -func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) (err error) { - var n int - defer log.Trace(fh, "len=%d, offset=%d", req.Size, req.Offset)("read=%d, err=%v", &n, &err) - data := make([]byte, req.Size) - n, err = fh.Handle.ReadAt(data, req.Offset) - if err == io.EOF { - err = nil - } else if err != nil { - return (err) - } - resp.Data = data[:n] - return nil -} - -// Check interface satisfied -var _ fusefs.HandleWriter = (*FileHandle)(nil) - -// Write data to the file handle -func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) (err error) { - defer log.Trace(fh, "len=%d, offset=%d", len(req.Data), req.Offset)("written=%d, err=%v", &resp.Size, &err) - n, err := fh.Handle.WriteAt(req.Data, req.Offset) - if err != nil { - return (err) - } - resp.Size = n - return nil -} - -// Check interface satisfied -var _ fusefs.HandleFlusher = (*FileHandle)(nil) - -// Flush is called on each close() of a file descriptor. So if a -// filesystem wants to return write errors in close() and the file has -// cached dirty data, this is a good place to write back data and -// return any errors. Since many applications ignore close() errors -// this is not always useful. -// -// NOTE: The flush() method may be called more than once for each -// open(). This happens if more than one file descriptor refers to an -// opened file due to dup(), dup2() or fork() calls. It is not -// possible to determine if a flush is final, so each flush should be -// treated equally. Multiple write-flush sequences are relatively -// rare, so this shouldn't be a problem. -// -// Filesystems shouldn't assume that flush will always be called after -// some writes, or that if will be called at all. -func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) (err error) { - defer log.Trace(fh, "")("err=%v", &err) - return (fh.Handle.Flush()) -} - -var _ fusefs.HandleReleaser = (*FileHandle)(nil) - -// Release is called when we are finished with the file handle -// -// It isn't called directly from userspace so the error is ignored by -// the kernel -func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) (err error) { - defer log.Trace(fh, "")("err=%v", &err) - return (fh.Handle.Release()) -} diff --git a/pkg/mount/mount.go b/pkg/mount/mount.go deleted file mode 100644 index 9a14154..0000000 --- a/pkg/mount/mount.go +++ /dev/null @@ -1,113 +0,0 @@ -package mount - -import ( - "fmt" - - "bazil.org/fuse" - fusefs "bazil.org/fuse/fs" - "github.com/rclone/rclone/cmd/mountlib" - "github.com/rclone/rclone/fs" - "github.com/rclone/rclone/fs/log" - "github.com/rclone/rclone/vfs" -) - -func MountFn(VFS *vfs.VFS, mountpoint string, opt *mountlib.Options) (<-chan error, func() error, error) { - - f := VFS.Fs() - fs.Debugf(f, "Mounting on %q", mountpoint) - c, err := fuse.Mount(mountpoint, mountOptions(VFS, opt.DeviceName, opt)...) - if err != nil { - return nil, nil, err - } - filesys := NewFS(VFS, opt) - filesys.server = fusefs.New(c, nil) - - // Serve the mount point in the background returning error to errChan - errChan := make(chan error, 1) - go func() { - err := filesys.server.Serve(filesys) - closeErr := c.Close() - if err == nil { - err = closeErr - } - errChan <- err - }() - - unmount := func() error { - // Shutdown the VFS - filesys.VFS.Shutdown() - return fuse.Unmount(mountpoint) - } - return errChan, unmount, nil - -} -func NewFS(VFS *vfs.VFS, opt *mountlib.Options) *FS { - fsys := &FS{ - VFS: VFS, - f: VFS.Fs(), - opt: opt, - } - return fsys -} - -type FS struct { - *vfs.VFS - f fs.Fs - opt *mountlib.Options - server *fusefs.Server -} - -// mountOptions configures the options from the command line flags -func mountOptions(VFS *vfs.VFS, device string, opt *mountlib.Options) (options []fuse.MountOption) { - options = []fuse.MountOption{ - fuse.MaxReadahead(uint32(opt.MaxReadAhead)), - fuse.Subtype("rclone"), - fuse.FSName(device), - - // Options from benchmarking in the fuse module - //fuse.MaxReadahead(64 * 1024 * 1024), - //fuse.WritebackCache(), - } - if opt.AsyncRead { - options = append(options, fuse.AsyncRead()) - } - if opt.AllowNonEmpty { - options = append(options, fuse.AllowNonEmptyMount()) - } - if opt.AllowOther { - options = append(options, fuse.AllowOther()) - } - if opt.AllowRoot { - // options = append(options, fuse.AllowRoot()) - fs.Errorf(nil, "Ignoring --allow-root. Support has been removed upstream - see https://github.com/bazil/fuse/issues/144 for more info") - } - if opt.DefaultPermissions { - options = append(options, fuse.DefaultPermissions()) - } - if VFS.Opt.ReadOnly { - options = append(options, fuse.ReadOnly()) - } - if opt.WritebackCache { - options = append(options, fuse.WritebackCache()) - } - if opt.DaemonTimeout != 0 { - options = append(options, fuse.DaemonTimeout(fmt.Sprint(int(opt.DaemonTimeout.Seconds())))) - } - if len(opt.ExtraOptions) > 0 { - fs.Errorf(nil, "-o/--option not supported with this FUSE backend") - } - if len(opt.ExtraFlags) > 0 { - fs.Errorf(nil, "--fuse-flag not supported with this FUSE backend") - } - return options -} - -// Root returns the root node -func (f *FS) Root() (node fusefs.Node, err error) { - defer log.Trace("", "")("node=%+v, err=%v", &node, &err) - root, err := f.VFS.Root() - if err != nil { - return nil, err - } - return &Dir{root, f}, nil -} diff --git a/route/v1/file.go b/route/v1/file.go index a366d31..a8c8f1b 100644 --- a/route/v1/file.go +++ b/route/v1/file.go @@ -1,6 +1,7 @@ package v1 import ( + "encoding/json" "fmt" "io" "io/ioutil" @@ -61,6 +62,18 @@ type FsListResp struct { Size int `json:"size"` } +var ( + // 升级成 WebSocket 协议 + upgraderFile = websocket.Upgrader{ + // 允许CORS跨域请求 + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + conn *websocket.Conn + err error +) + // @Summary 读取文件 // @Produce application/json // @Accept application/json @@ -793,3 +806,262 @@ func GetSize(c *gin.Context) { } c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS), Data: size}) } + +type CenterHandler struct { + // 广播通道,有数据则循环每个用户广播出去 + broadcast chan []byte + // 注册通道,有用户进来 则推到用户集合map中 + register chan *Client + // 注销通道,有用户关闭连接 则将该用户剔出集合map中 + unregister chan *Client + // 用户集合,每个用户本身也在跑两个协程,监听用户的读、写的状态 + clients map[string]*Client +} + +type Client struct { + handler *CenterHandler + conn *websocket.Conn + // 每个用户自己的循环跑起来的状态监控 + send chan []byte + ID string `json:"id"` + IP string `json:"ip"` + Name service.Name `json:"name"` + RtcSupported bool `json:"rtcSupported"` + TimerId int `json:"timerId"` + LastBeat time.Time `json:"lastBeat"` + Offline bool `json:"offline"` +} + +func ConnectWebSocket(c *gin.Context) { + peerId := c.Query("peer") + writer := c.Writer + request := c.Request + key := uuid.NewV4().String() + //peerModel := service.MyService.Peer().GetPeerByUserAgent(c.Request.UserAgent()) + peerModel := model2.PeerDriveDBModel{} + name := service.GetName(request) + if conn, err = upgraderFile.Upgrade(writer, request, writer.Header()); err != nil { + log.Println(err) + return + } + client := &Client{handler: &handler, conn: conn, send: make(chan []byte, 256), ID: service.GetPeerId(request, key), IP: service.GetIP(request), Name: name, RtcSupported: true, TimerId: 0, LastBeat: time.Now()} + if peerId != "" || len(peerModel.ID) > 0 { + if len(peerModel.ID) == 0 { + peerModel = service.MyService.Peer().GetPeerByID(peerId) + } + if len(peerModel.ID) > 0 { + key = peerId + client.ID = peerModel.ID + client.Name = service.GetNameByDB(peerModel) + } + } + var list = service.MyService.Peer().GetPeers() + if len(peerModel.ID) == 0 { + peerModel.ID = key + peerModel.DisplayName = name.DisplayName + peerModel.DeviceName = name.DeviceName + peerModel.OS = name.OS + peerModel.Browser = name.Browser + peerModel.UserAgent = c.Request.UserAgent() + peerModel.IP = client.IP + service.MyService.Peer().CreatePeer(&peerModel) + list = append(list, peerModel) + } + + cookie := http.Cookie{ + Name: "peerid", + Value: key, + Path: "/", + } + http.SetCookie(writer, &cookie) + + // 推给监控中心注册到用户集合中 + handler.register <- client + if len(list) > 10 { + fmt.Println("有溢出", list) + } + if len(list) > 10 { + kickoutList := []Client{} + count := len(list) - 10 + for i := len(list) - 1; count > 0 && i > -1; i-- { + if _, ok := handler.clients[list[i].ID]; !ok { + count-- + kickoutList = append(kickoutList, Client{ID: list[i].ID, Name: service.GetNameByDB(list[i]), IP: list[i].IP, Offline: true}) + service.MyService.Peer().DeletePeer(list[i].ID) + } + } + if len(kickoutList) > 0 { + other := make(map[string]interface{}) + other["type"] = "kickout" + other["peers"] = kickoutList + otherBy, err := json.Marshal(other) + fmt.Println(err) + client.handler.broadcast <- otherBy + } + } + list = service.MyService.Peer().GetPeers() + if len(list) > 10 { + fmt.Println("解决完后依然有溢出", list) + } + clients := []Client{} + for _, v := range client.handler.clients { + if _, ok := handler.clients[v.ID]; ok { + clients = append(clients, *handler.clients[v.ID]) + } + } + + other := make(map[string]interface{}) + other["type"] = "peers" + other["peers"] = clients + otherBy, err := json.Marshal(other) + fmt.Println(err) + client.handler.broadcast <- otherBy + + pmsg := make(map[string]interface{}) + pmsg["type"] = "peer-joined" + pmsg["peer"] = client + pby, err := json.Marshal(pmsg) + fmt.Println(err) + client.handler.broadcast <- pby + + data := make(map[string]string) + data["displayName"] = client.Name.DisplayName + data["deviceName"] = client.Name.DeviceName + data["id"] = client.ID + msg := make(map[string]interface{}) + msg["type"] = "display-name" + msg["message"] = data + by, _ := json.Marshal(msg) + client.send <- by + + // 每个 client 都挂起 2 个新的协程,监控读、写状态 + go client.writePump() + go client.readPump() + c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS)}) +} + +var handler = CenterHandler{broadcast: make(chan []byte), + register: make(chan *Client), + unregister: make(chan *Client), + clients: make(map[string]*Client)} + +func init() { + // 起个协程跑起来,监听注册、注销、消息 3 个 channel + go handler.monitoring() + + crontab := cron.New(cron.WithSeconds()) //精确到秒 + //定义定时器调用的任务函数 + + task := func() { + handler.broadcast <- []byte(`{"type":"ping"}`) + } + //定时任务 + spec := "*/20 * * * * ?" //cron表达式,每五秒一次 + // 添加定时任务, + crontab.AddFunc(spec, task) + // 启动定时器 + crontab.Start() +} +func (c *Client) writePump() { + defer func() { + c.handler.unregister <- c + c.conn.Close() + }() + for { + // 广播推过来的新消息,马上通过websocket推给自己 + message, _ := <-c.send + if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil { + return + } + } +} + +// 读,监听客户端是否有推送内容过来服务端 +func (c *Client) readPump() { + defer func() { + c.handler.unregister <- c + c.conn.Close() + }() + for { + // 循环监听是否该用户是否要发言 + _, message, err := c.conn.ReadMessage() + if err != nil { + // 异常关闭的处理 + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("error: %v", err) + } + break + } + // 要的话,推给广播中心,广播中心再推给每个用户 + + t := gjson.GetBytes(message, "type") + if t.String() == "disconnect" { + c.handler.unregister <- c + c.conn.Close() + // clients := []Client{} + // list := service.MyService.Peer().GetPeers() + // for _, v := range list { + // if _, ok := handler.clients[v.ID]; ok { + // clients = append(clients, *handler.clients[v.ID]) + // } else { + // clients = append(clients, Client{ID: v.ID, Name: service.GetNameByDB(v), IP: v.IP, Offline: true}) + // } + // } + // other := make(map[string]interface{}) + // other["type"] = "peers" + // other["peers"] = clients + // otherBy, err := json.Marshal(other) + // fmt.Println(err) + c.handler.broadcast <- []byte(`{"type":"peer-left","peerId":"` + c.ID + `"}`) + //c.handler.broadcast <- otherBy + break + } else if t.String() == "pong" { + c.LastBeat = time.Now() + continue + } + to := gjson.GetBytes(message, "to") + + if len(to.String()) > 0 { + toC := c.handler.clients[to.String()] + if toC == nil { + continue + } + data := map[string]interface{}{} + json.Unmarshal(message, &data) + data["sender"] = c.ID + delete(data, "to") + message, err = json.Marshal(data) + toC.send <- message + } + + c.handler.broadcast <- message + } +} +func (ch *CenterHandler) monitoring() { + for { + select { + // 注册,新用户连接过来会推进注册通道,这里接收推进来的用户指针 + case client := <-ch.register: + ch.clients[client.ID] = client + // 注销,关闭连接或连接异常会将用户推出群聊 + case client := <-ch.unregister: + delete(ch.clients, client.ID) + // 消息,监听到有新消息到来 + case message := <-ch.broadcast: + println("消息来了,message:" + string(message)) + // 推送给每个用户的通道,每个用户都有跑协程起了writePump的监听 + for _, client := range ch.clients { + client.send <- message + } + } + } +} +func GetPeers(c *gin.Context) { + peers := service.MyService.Peer().GetPeers() + for i := 0; i < len(peers); i++ { + if _, ok := handler.clients[peers[i].ID]; ok { + peers[i].Online = true + } + } + c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS), Data: peers}) +} diff --git a/service/service.go b/service/service.go index 003c839..1bb9171 100644 --- a/service/service.go +++ b/service/service.go @@ -60,37 +60,14 @@ func NewService(db *gorm.DB, RuntimePath string) Repository { system: NewSystemService(), health: NewHealthService(), shares: NewSharesService(db), - storage: NewStorageService(), peer: NewPeerService(db), other: NewOtherService(), - casa: NewCasaService(), - - gateway: gatewayManagement, - notify: NewNotifyService(db), - rely: NewRelyService(db), - system: NewSystemService(), - health: NewHealthService(), - shares: NewSharesService(db), - - other: NewOtherService(), } } type store struct { - db *gorm.DB - casa CasaService - notify NotifyServer - rely RelyService - system SystemService - shares SharesService - connections ConnectionsService - gateway external.ManagementService - storage StorageService - - health HealthService peer PeerService - other OtherService db *gorm.DB casa CasaService notify NotifyServer