Merge branch 'main' into socket_service

This commit is contained in:
link 2023-03-06 08:22:23 +00:00
parent 191e07fdfe
commit f7b1ab71b0
6 changed files with 272 additions and 611 deletions

View File

@ -1,268 +0,0 @@
package mount
import (
"context"
"fmt"
"io"
"os"
"syscall"
"time"
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"github.com/rclone/rclone/cmd/mountlib"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/vfs"
)
// Dir represents a directory entry
type Dir struct {
*vfs.Dir
fsys *FS
}
// Check interface satisfied
var _ fusefs.Node = (*Dir)(nil)
// Attr updates the attributes of a directory
func (d *Dir) Attr(ctx context.Context, a *fuse.Attr) (err error) {
defer log.Trace(d, "")("attr=%+v, err=%v", a, &err)
a.Valid = d.fsys.opt.AttrTimeout
a.Gid = d.VFS().Opt.GID
a.Uid = d.VFS().Opt.UID
a.Mode = os.ModeDir | d.VFS().Opt.DirPerms
modTime := d.ModTime()
a.Atime = modTime
a.Mtime = modTime
a.Ctime = modTime
// FIXME include Valid so get some caching?
// FIXME fs.Debugf(d.path, "Dir.Attr %+v", a)
return nil
}
// Check interface satisfied
var _ fusefs.NodeSetattrer = (*Dir)(nil)
// Setattr handles attribute changes from FUSE. Currently supports ModTime only.
func (d *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) (err error) {
defer log.Trace(d, "stat=%+v", req)("err=%v", &err)
if d.VFS().Opt.NoModTime {
return nil
}
if req.Valid.MtimeNow() {
err = d.SetModTime(time.Now())
} else if req.Valid.Mtime() {
err = d.SetModTime(req.Mtime)
}
return err
}
// Check interface satisfied
var _ fusefs.NodeRequestLookuper = (*Dir)(nil)
// Lookup looks up a specific entry in the receiver.
//
// Lookup should return a Node corresponding to the entry. If the
// name does not exist in the directory, Lookup should return ENOENT.
//
// Lookup need not to handle the names "." and "..".
func (d *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fusefs.Node, err error) {
defer log.Trace(d, "name=%q", req.Name)("node=%+v, err=%v", &node, &err)
mnode, err := d.Dir.Stat(req.Name)
if err != nil {
return nil, err
}
resp.EntryValid = d.fsys.opt.AttrTimeout
// Check the mnode to see if it has a fuse Node cached
// We must return the same fuse nodes for vfs Nodes
node, ok := mnode.Sys().(fusefs.Node)
if ok {
return node, nil
}
switch x := mnode.(type) {
case *vfs.File:
node = &File{x, d.fsys}
case *vfs.Dir:
node = &Dir{x, d.fsys}
default:
panic("bad type")
}
// Cache the node for later
mnode.SetSys(node)
return node, nil
}
// Check interface satisfied
var _ fusefs.HandleReadDirAller = (*Dir)(nil)
// ReadDirAll reads the contents of the directory
func (d *Dir) ReadDirAll(ctx context.Context) (dirents []fuse.Dirent, err error) {
itemsRead := -1
defer log.Trace(d, "")("item=%d, err=%v", &itemsRead, &err)
items, err := d.Dir.ReadDirAll()
if err != nil {
return nil, (err)
}
dirents = append(dirents, fuse.Dirent{
Type: fuse.DT_Dir,
Name: ".",
}, fuse.Dirent{
Type: fuse.DT_Dir,
Name: "..",
})
for _, node := range items {
name := node.Name()
if len(name) > mountlib.MaxLeafSize {
fs.Errorf(d, "Name too long (%d bytes) for FUSE, skipping: %s", len(name), name)
continue
}
var dirent = fuse.Dirent{
// Inode FIXME ???
Type: fuse.DT_File,
Name: name,
}
if node.IsDir() {
dirent.Type = fuse.DT_Dir
}
dirents = append(dirents, dirent)
}
itemsRead = len(dirents)
return dirents, nil
}
var _ fusefs.NodeCreater = (*Dir)(nil)
// Create makes a new file
func (d *Dir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (node fusefs.Node, handle fusefs.Handle, err error) {
defer log.Trace(d, "name=%q", req.Name)("node=%v, handle=%v, err=%v", &node, &handle, &err)
file, err := d.Dir.Create(req.Name, int(req.Flags))
if err != nil {
return nil, nil, (err)
}
fh, err := file.Open(int(req.Flags) | os.O_CREATE)
if err != nil {
return nil, nil, (err)
}
node = &File{file, d.fsys}
file.SetSys(node) // cache the FUSE node for later
return node, &FileHandle{fh}, err
}
var _ fusefs.NodeMkdirer = (*Dir)(nil)
// Mkdir creates a new directory
func (d *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (node fusefs.Node, err error) {
defer log.Trace(d, "name=%q", req.Name)("node=%+v, err=%v", &node, &err)
dir, err := d.Dir.Mkdir(req.Name)
if err != nil {
return nil, (err)
}
node = &Dir{dir, d.fsys}
dir.SetSys(node) // cache the FUSE node for later
return node, nil
}
var _ fusefs.NodeRemover = (*Dir)(nil)
// Remove removes the entry with the given name from
// the receiver, which must be a directory. The entry to be removed
// may correspond to a file (unlink) or to a directory (rmdir).
func (d *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) (err error) {
defer log.Trace(d, "name=%q", req.Name)("err=%v", &err)
err = d.Dir.RemoveName(req.Name)
if err != nil {
return (err)
}
return nil
}
// Invalidate a leaf in a directory
func (d *Dir) invalidateEntry(dirNode fusefs.Node, leaf string) {
fs.Debugf(dirNode, "Invalidating %q", leaf)
err := d.fsys.server.InvalidateEntry(dirNode, leaf)
if err != nil {
fs.Debugf(dirNode, "Failed to invalidate %q: %v", leaf, err)
}
}
// Check interface satisfied
var _ fusefs.NodeRenamer = (*Dir)(nil)
// Rename the file
func (d *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fusefs.Node) (err error) {
defer log.Trace(d, "oldName=%q, newName=%q, newDir=%+v", req.OldName, req.NewName, newDir)("err=%v", &err)
destDir, ok := newDir.(*Dir)
if !ok {
return fmt.Errorf("unknown Dir type %T", newDir)
}
err = d.Dir.Rename(req.OldName, req.NewName, destDir.Dir)
if err != nil {
return (err)
}
// Invalidate the new directory entry so it gets re-read (in
// the background otherwise we cause a deadlock)
//
// See https://github.com/rclone/rclone/issues/4977 for why
go d.invalidateEntry(newDir, req.NewName)
//go d.invalidateEntry(d, req.OldName)
return nil
}
// Check interface satisfied
var _ fusefs.NodeFsyncer = (*Dir)(nil)
// Fsync the directory
func (d *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) (err error) {
defer log.Trace(d, "")("err=%v", &err)
err = d.Dir.Sync()
if err != nil {
return (err)
}
return nil
}
// Check interface satisfied
var _ fusefs.NodeLinker = (*Dir)(nil)
// Link creates a new directory entry in the receiver based on an
// existing Node. Receiver must be a directory.
func (d *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fusefs.Node) (newNode fusefs.Node, err error) {
defer log.Trace(d, "req=%v, old=%v", req, old)("new=%v, err=%v", &newNode, &err)
return nil, syscall.ENOSYS
}
// Check interface satisfied
var _ fusefs.NodeMknoder = (*Dir)(nil)
// Mknod is called to create a file. Since we define create this will
// be called in preference, however NFS likes to call it for some
// reason. We don't actually create a file here just the Node.
func (d *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (node fusefs.Node, err error) {
defer log.Trace(d, "name=%v, mode=%d, rdev=%d", req.Name, req.Mode, req.Rdev)("node=%v, err=%v", &node, &err)
if req.Rdev != 0 {
fs.Errorf(d, "Can't create device node %q", req.Name)
return nil, fuse.EIO
}
var cReq = fuse.CreateRequest{
Name: req.Name,
Flags: fuse.OpenFlags(os.O_CREATE | os.O_WRONLY),
Mode: req.Mode,
Umask: req.Umask,
}
var cResp fuse.CreateResponse
node, handle, err := d.Create(ctx, &cReq, &cResp)
if err != nil {
return nil, err
}
err = handle.(io.Closer).Close()
if err != nil {
return nil, err
}
return node, nil
}

View File

@ -1,125 +0,0 @@
package mount
import (
"context"
"syscall"
"time"
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/vfs"
)
// File represents a file
type File struct {
*vfs.File
fsys *FS
}
// Check interface satisfied
var _ fusefs.Node = (*File)(nil)
// Attr fills out the attributes for the file
func (f *File) Attr(ctx context.Context, a *fuse.Attr) (err error) {
defer log.Trace(f, "")("a=%+v, err=%v", a, &err)
a.Valid = f.fsys.opt.AttrTimeout
modTime := f.File.ModTime()
Size := uint64(f.File.Size())
Blocks := (Size + 511) / 512
a.Gid = f.VFS().Opt.GID
a.Uid = f.VFS().Opt.UID
a.Mode = f.VFS().Opt.FilePerms
a.Size = Size
a.Atime = modTime
a.Mtime = modTime
a.Ctime = modTime
a.Blocks = Blocks
return nil
}
// Check interface satisfied
var _ fusefs.NodeSetattrer = (*File)(nil)
// Setattr handles attribute changes from FUSE. Currently supports ModTime and Size only
func (f *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) (err error) {
defer log.Trace(f, "a=%+v", req)("err=%v", &err)
if !f.VFS().Opt.NoModTime {
if req.Valid.Mtime() {
err = f.File.SetModTime(req.Mtime)
} else if req.Valid.MtimeNow() {
err = f.File.SetModTime(time.Now())
}
}
if req.Valid.Size() {
err = f.File.Truncate(int64(req.Size))
}
return (err)
}
// Check interface satisfied
var _ fusefs.NodeOpener = (*File)(nil)
// Open the file for read or write
func (f *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fh fusefs.Handle, err error) {
defer log.Trace(f, "flags=%v", req.Flags)("fh=%v, err=%v", &fh, &err)
// fuse flags are based off syscall flags as are os flags, so
// should be compatible
handle, err := f.File.Open(int(req.Flags))
if err != nil {
return nil, (err)
}
// If size unknown then use direct io to read
if entry := handle.Node().DirEntry(); entry != nil && entry.Size() < 0 {
resp.Flags |= fuse.OpenDirectIO
}
return &FileHandle{handle}, nil
}
// Check interface satisfied
var _ fusefs.NodeFsyncer = (*File)(nil)
// Fsync the file
//
// Note that we don't do anything except return OK
func (f *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) (err error) {
defer log.Trace(f, "")("err=%v", &err)
return nil
}
// Getxattr gets an extended attribute by the given name from the
// node.
//
// If there is no xattr by that name, returns fuse.ErrNoXattr.
func (f *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
return syscall.ENOSYS // we never implement this
}
var _ fusefs.NodeGetxattrer = (*File)(nil)
// Listxattr lists the extended attributes recorded for the node.
func (f *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
return syscall.ENOSYS // we never implement this
}
var _ fusefs.NodeListxattrer = (*File)(nil)
// Setxattr sets an extended attribute with the given name and
// value for the node.
func (f *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
return syscall.ENOSYS // we never implement this
}
var _ fusefs.NodeSetxattrer = (*File)(nil)
// Removexattr removes an extended attribute for the name.
//
// If there is no xattr by that name, returns fuse.ErrNoXattr.
func (f *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error {
return syscall.ENOSYS // we never implement this
}
var _ fusefs.NodeRemovexattrer = (*File)(nil)

View File

@ -1,82 +0,0 @@
package mount
import (
"context"
"io"
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/vfs"
)
// FileHandle is an open for read file handle on a File
type FileHandle struct {
vfs.Handle
}
// Check interface satisfied
var _ fusefs.HandleReader = (*FileHandle)(nil)
// Read from the file handle
func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) (err error) {
var n int
defer log.Trace(fh, "len=%d, offset=%d", req.Size, req.Offset)("read=%d, err=%v", &n, &err)
data := make([]byte, req.Size)
n, err = fh.Handle.ReadAt(data, req.Offset)
if err == io.EOF {
err = nil
} else if err != nil {
return (err)
}
resp.Data = data[:n]
return nil
}
// Check interface satisfied
var _ fusefs.HandleWriter = (*FileHandle)(nil)
// Write data to the file handle
func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) (err error) {
defer log.Trace(fh, "len=%d, offset=%d", len(req.Data), req.Offset)("written=%d, err=%v", &resp.Size, &err)
n, err := fh.Handle.WriteAt(req.Data, req.Offset)
if err != nil {
return (err)
}
resp.Size = n
return nil
}
// Check interface satisfied
var _ fusefs.HandleFlusher = (*FileHandle)(nil)
// Flush is called on each close() of a file descriptor. So if a
// filesystem wants to return write errors in close() and the file has
// cached dirty data, this is a good place to write back data and
// return any errors. Since many applications ignore close() errors
// this is not always useful.
//
// NOTE: The flush() method may be called more than once for each
// open(). This happens if more than one file descriptor refers to an
// opened file due to dup(), dup2() or fork() calls. It is not
// possible to determine if a flush is final, so each flush should be
// treated equally. Multiple write-flush sequences are relatively
// rare, so this shouldn't be a problem.
//
// Filesystems shouldn't assume that flush will always be called after
// some writes, or that if will be called at all.
func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) (err error) {
defer log.Trace(fh, "")("err=%v", &err)
return (fh.Handle.Flush())
}
var _ fusefs.HandleReleaser = (*FileHandle)(nil)
// Release is called when we are finished with the file handle
//
// It isn't called directly from userspace so the error is ignored by
// the kernel
func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) (err error) {
defer log.Trace(fh, "")("err=%v", &err)
return (fh.Handle.Release())
}

View File

@ -1,113 +0,0 @@
package mount
import (
"fmt"
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"github.com/rclone/rclone/cmd/mountlib"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/vfs"
)
func MountFn(VFS *vfs.VFS, mountpoint string, opt *mountlib.Options) (<-chan error, func() error, error) {
f := VFS.Fs()
fs.Debugf(f, "Mounting on %q", mountpoint)
c, err := fuse.Mount(mountpoint, mountOptions(VFS, opt.DeviceName, opt)...)
if err != nil {
return nil, nil, err
}
filesys := NewFS(VFS, opt)
filesys.server = fusefs.New(c, nil)
// Serve the mount point in the background returning error to errChan
errChan := make(chan error, 1)
go func() {
err := filesys.server.Serve(filesys)
closeErr := c.Close()
if err == nil {
err = closeErr
}
errChan <- err
}()
unmount := func() error {
// Shutdown the VFS
filesys.VFS.Shutdown()
return fuse.Unmount(mountpoint)
}
return errChan, unmount, nil
}
func NewFS(VFS *vfs.VFS, opt *mountlib.Options) *FS {
fsys := &FS{
VFS: VFS,
f: VFS.Fs(),
opt: opt,
}
return fsys
}
type FS struct {
*vfs.VFS
f fs.Fs
opt *mountlib.Options
server *fusefs.Server
}
// mountOptions configures the options from the command line flags
func mountOptions(VFS *vfs.VFS, device string, opt *mountlib.Options) (options []fuse.MountOption) {
options = []fuse.MountOption{
fuse.MaxReadahead(uint32(opt.MaxReadAhead)),
fuse.Subtype("rclone"),
fuse.FSName(device),
// Options from benchmarking in the fuse module
//fuse.MaxReadahead(64 * 1024 * 1024),
//fuse.WritebackCache(),
}
if opt.AsyncRead {
options = append(options, fuse.AsyncRead())
}
if opt.AllowNonEmpty {
options = append(options, fuse.AllowNonEmptyMount())
}
if opt.AllowOther {
options = append(options, fuse.AllowOther())
}
if opt.AllowRoot {
// options = append(options, fuse.AllowRoot())
fs.Errorf(nil, "Ignoring --allow-root. Support has been removed upstream - see https://github.com/bazil/fuse/issues/144 for more info")
}
if opt.DefaultPermissions {
options = append(options, fuse.DefaultPermissions())
}
if VFS.Opt.ReadOnly {
options = append(options, fuse.ReadOnly())
}
if opt.WritebackCache {
options = append(options, fuse.WritebackCache())
}
if opt.DaemonTimeout != 0 {
options = append(options, fuse.DaemonTimeout(fmt.Sprint(int(opt.DaemonTimeout.Seconds()))))
}
if len(opt.ExtraOptions) > 0 {
fs.Errorf(nil, "-o/--option not supported with this FUSE backend")
}
if len(opt.ExtraFlags) > 0 {
fs.Errorf(nil, "--fuse-flag not supported with this FUSE backend")
}
return options
}
// Root returns the root node
func (f *FS) Root() (node fusefs.Node, err error) {
defer log.Trace("", "")("node=%+v, err=%v", &node, &err)
root, err := f.VFS.Root()
if err != nil {
return nil, err
}
return &Dir{root, f}, nil
}

View File

@ -1,6 +1,7 @@
package v1
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
@ -61,6 +62,18 @@ type FsListResp struct {
Size int `json:"size"`
}
var (
// 升级成 WebSocket 协议
upgraderFile = websocket.Upgrader{
// 允许CORS跨域请求
CheckOrigin: func(r *http.Request) bool {
return true
},
}
conn *websocket.Conn
err error
)
// @Summary 读取文件
// @Produce application/json
// @Accept application/json
@ -793,3 +806,262 @@ func GetSize(c *gin.Context) {
}
c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS), Data: size})
}
type CenterHandler struct {
// 广播通道,有数据则循环每个用户广播出去
broadcast chan []byte
// 注册通道,有用户进来 则推到用户集合map中
register chan *Client
// 注销通道,有用户关闭连接 则将该用户剔出集合map中
unregister chan *Client
// 用户集合,每个用户本身也在跑两个协程,监听用户的读、写的状态
clients map[string]*Client
}
type Client struct {
handler *CenterHandler
conn *websocket.Conn
// 每个用户自己的循环跑起来的状态监控
send chan []byte
ID string `json:"id"`
IP string `json:"ip"`
Name service.Name `json:"name"`
RtcSupported bool `json:"rtcSupported"`
TimerId int `json:"timerId"`
LastBeat time.Time `json:"lastBeat"`
Offline bool `json:"offline"`
}
func ConnectWebSocket(c *gin.Context) {
peerId := c.Query("peer")
writer := c.Writer
request := c.Request
key := uuid.NewV4().String()
//peerModel := service.MyService.Peer().GetPeerByUserAgent(c.Request.UserAgent())
peerModel := model2.PeerDriveDBModel{}
name := service.GetName(request)
if conn, err = upgraderFile.Upgrade(writer, request, writer.Header()); err != nil {
log.Println(err)
return
}
client := &Client{handler: &handler, conn: conn, send: make(chan []byte, 256), ID: service.GetPeerId(request, key), IP: service.GetIP(request), Name: name, RtcSupported: true, TimerId: 0, LastBeat: time.Now()}
if peerId != "" || len(peerModel.ID) > 0 {
if len(peerModel.ID) == 0 {
peerModel = service.MyService.Peer().GetPeerByID(peerId)
}
if len(peerModel.ID) > 0 {
key = peerId
client.ID = peerModel.ID
client.Name = service.GetNameByDB(peerModel)
}
}
var list = service.MyService.Peer().GetPeers()
if len(peerModel.ID) == 0 {
peerModel.ID = key
peerModel.DisplayName = name.DisplayName
peerModel.DeviceName = name.DeviceName
peerModel.OS = name.OS
peerModel.Browser = name.Browser
peerModel.UserAgent = c.Request.UserAgent()
peerModel.IP = client.IP
service.MyService.Peer().CreatePeer(&peerModel)
list = append(list, peerModel)
}
cookie := http.Cookie{
Name: "peerid",
Value: key,
Path: "/",
}
http.SetCookie(writer, &cookie)
// 推给监控中心注册到用户集合中
handler.register <- client
if len(list) > 10 {
fmt.Println("有溢出", list)
}
if len(list) > 10 {
kickoutList := []Client{}
count := len(list) - 10
for i := len(list) - 1; count > 0 && i > -1; i-- {
if _, ok := handler.clients[list[i].ID]; !ok {
count--
kickoutList = append(kickoutList, Client{ID: list[i].ID, Name: service.GetNameByDB(list[i]), IP: list[i].IP, Offline: true})
service.MyService.Peer().DeletePeer(list[i].ID)
}
}
if len(kickoutList) > 0 {
other := make(map[string]interface{})
other["type"] = "kickout"
other["peers"] = kickoutList
otherBy, err := json.Marshal(other)
fmt.Println(err)
client.handler.broadcast <- otherBy
}
}
list = service.MyService.Peer().GetPeers()
if len(list) > 10 {
fmt.Println("解决完后依然有溢出", list)
}
clients := []Client{}
for _, v := range client.handler.clients {
if _, ok := handler.clients[v.ID]; ok {
clients = append(clients, *handler.clients[v.ID])
}
}
other := make(map[string]interface{})
other["type"] = "peers"
other["peers"] = clients
otherBy, err := json.Marshal(other)
fmt.Println(err)
client.handler.broadcast <- otherBy
pmsg := make(map[string]interface{})
pmsg["type"] = "peer-joined"
pmsg["peer"] = client
pby, err := json.Marshal(pmsg)
fmt.Println(err)
client.handler.broadcast <- pby
data := make(map[string]string)
data["displayName"] = client.Name.DisplayName
data["deviceName"] = client.Name.DeviceName
data["id"] = client.ID
msg := make(map[string]interface{})
msg["type"] = "display-name"
msg["message"] = data
by, _ := json.Marshal(msg)
client.send <- by
// 每个 client 都挂起 2 个新的协程,监控读、写状态
go client.writePump()
go client.readPump()
c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS)})
}
var handler = CenterHandler{broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[string]*Client)}
func init() {
// 起个协程跑起来,监听注册、注销、消息 3 个 channel
go handler.monitoring()
crontab := cron.New(cron.WithSeconds()) //精确到秒
//定义定时器调用的任务函数
task := func() {
handler.broadcast <- []byte(`{"type":"ping"}`)
}
//定时任务
spec := "*/20 * * * * ?" //cron表达式每五秒一次
// 添加定时任务,
crontab.AddFunc(spec, task)
// 启动定时器
crontab.Start()
}
func (c *Client) writePump() {
defer func() {
c.handler.unregister <- c
c.conn.Close()
}()
for {
// 广播推过来的新消息马上通过websocket推给自己
message, _ := <-c.send
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
return
}
}
}
// 读,监听客户端是否有推送内容过来服务端
func (c *Client) readPump() {
defer func() {
c.handler.unregister <- c
c.conn.Close()
}()
for {
// 循环监听是否该用户是否要发言
_, message, err := c.conn.ReadMessage()
if err != nil {
// 异常关闭的处理
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
// 要的话,推给广播中心,广播中心再推给每个用户
t := gjson.GetBytes(message, "type")
if t.String() == "disconnect" {
c.handler.unregister <- c
c.conn.Close()
// clients := []Client{}
// list := service.MyService.Peer().GetPeers()
// for _, v := range list {
// if _, ok := handler.clients[v.ID]; ok {
// clients = append(clients, *handler.clients[v.ID])
// } else {
// clients = append(clients, Client{ID: v.ID, Name: service.GetNameByDB(v), IP: v.IP, Offline: true})
// }
// }
// other := make(map[string]interface{})
// other["type"] = "peers"
// other["peers"] = clients
// otherBy, err := json.Marshal(other)
// fmt.Println(err)
c.handler.broadcast <- []byte(`{"type":"peer-left","peerId":"` + c.ID + `"}`)
//c.handler.broadcast <- otherBy
break
} else if t.String() == "pong" {
c.LastBeat = time.Now()
continue
}
to := gjson.GetBytes(message, "to")
if len(to.String()) > 0 {
toC := c.handler.clients[to.String()]
if toC == nil {
continue
}
data := map[string]interface{}{}
json.Unmarshal(message, &data)
data["sender"] = c.ID
delete(data, "to")
message, err = json.Marshal(data)
toC.send <- message
}
c.handler.broadcast <- message
}
}
func (ch *CenterHandler) monitoring() {
for {
select {
// 注册,新用户连接过来会推进注册通道,这里接收推进来的用户指针
case client := <-ch.register:
ch.clients[client.ID] = client
// 注销,关闭连接或连接异常会将用户推出群聊
case client := <-ch.unregister:
delete(ch.clients, client.ID)
// 消息,监听到有新消息到来
case message := <-ch.broadcast:
println("消息来了message" + string(message))
// 推送给每个用户的通道每个用户都有跑协程起了writePump的监听
for _, client := range ch.clients {
client.send <- message
}
}
}
}
func GetPeers(c *gin.Context) {
peers := service.MyService.Peer().GetPeers()
for i := 0; i < len(peers); i++ {
if _, ok := handler.clients[peers[i].ID]; ok {
peers[i].Online = true
}
}
c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS), Data: peers})
}

View File

@ -60,37 +60,14 @@ func NewService(db *gorm.DB, RuntimePath string) Repository {
system: NewSystemService(),
health: NewHealthService(),
shares: NewSharesService(db),
storage: NewStorageService(),
peer: NewPeerService(db),
other: NewOtherService(),
casa: NewCasaService(),
gateway: gatewayManagement,
notify: NewNotifyService(db),
rely: NewRelyService(db),
system: NewSystemService(),
health: NewHealthService(),
shares: NewSharesService(db),
other: NewOtherService(),
}
}
type store struct {
db *gorm.DB
casa CasaService
notify NotifyServer
rely RelyService
system SystemService
shares SharesService
connections ConnectionsService
gateway external.ManagementService
storage StorageService
health HealthService
peer PeerService
other OtherService
db *gorm.DB
casa CasaService
notify NotifyServer