Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >自动监控文件并上传S3对象存储服务器 | Golang

自动监控文件并上传S3对象存储服务器 | Golang

作者头像
ZGGSONG
发布于 2022-09-23 03:21:15
发布于 2022-09-23 03:21:15
1.2K00
代码可运行
举报
文章被收录于专栏:日志日志
运行总次数:0
代码可运行

前言

需求:

  • 监控目录下文件变动
  • 上传文件至S3服务器

本地平台:Windows 10 专业版 21H2 (19044.1826)、开发语言:go1.18.3 windows/amd64

监控目录下文件变动使用 github.com/fsnotify/fsnotify 上传测试服务器使用 Minio 进行测试

实现

监控文件生成

根据仓库中的示例代码也可以实现

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package main

import (
    "log"

    "github.com/fsnotify/fsnotify"
)

func main() {
    // Create new watcher.
    watcher, err := fsnotify.NewWatcher()
    if err != nil {
        log.Fatal(err)
    }
    defer watcher.Close()

    // Start listening for events.
    go func() {
        for {
            select {
            case event, ok := <-watcher.Events:
                if !ok {
                    return
                }
                log.Println("event:", event)
                if event.Op == fsnotify.Write {
                    log.Println("modified file:", event.Name)
                }
            case err, ok := <-watcher.Errors:
                if !ok {
                    return
                }
                log.Println("error:", err)
            }
        }
    }()

    // Add a path.
    err = watcher.Add("/tmp")
    if err != nil {
        log.Fatal(err)
    }

    // Block main goroutine forever.
    <-make(chan struct{})
}

监控文件生成时有两种监测结果(winodws):

  • 复制文件是触发一次Create事件,两次Write事件
  • 直接创建文件(cmd&代码)触发一次Create事件,一次Write事件

经过测试Windows上是这样,Linux就比较一致,不论怎么创建文件,都是一次Create事件+一次Write事件

这个示例只能解决监控当前目录下的内容,子目录下的内容无法监控

解决:在监控到创建了目录以后,把新创建的目录加入到监控目录中去

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func StartWatch(dir string) {
    watch, _ := fsnotify.NewWatcher()
    w := Watch{
        watch: watch,
    }
    w.watchEx(dir)
    log.Println("开始监控目录: ", dir, "...")
    select {}
}

func (w *Watch) watchEx(dir string) {
    //通过Walk来遍历目录下的所有子目录
    err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
        //这里判断是否为目录,只需监控目录即可 || 目录下的文件也在监控范围内,不需要我们一个一个加
        if info.IsDir() {
            path, err := filepath.Abs(path)
            if err != nil {
                return err
            }
            err = w.watch.Add(path)
            if err != nil {
                return err
            }
        }
        return nil
    })
    if err != nil {
        log.Println("监控失败 : ", err.Error())
        return
    }
    go w.watchExec()
}

func (w *Watch) watchExec() {
    for {
        select {
        case ev := <-w.watch.Events:
            {
                if ev.Op&fsnotify.Create == fsnotify.Create {
                    fmt.Println("创建文件 : ", ev.Name)
                    //获取新创建文件的信息,如果是目录,则加入监控中
                    file, err := os.Stat(ev.Name)
                    if err == nil && file.IsDir() {
                        w.watch.Add(ev.Name)
                        fmt.Println("添加监控 : ", ev.Name)
                    }
                }
                if ev.Op&fsnotify.Write == fsnotify.Write {
                    fmt.Println("写入文件 : ", ev.Name)
                }
                if ev.Op&fsnotify.Remove == fsnotify.Remove {
                    fmt.Println("删除文件 : ", ev.Name)
                    //如果删除文件是目录,则移除监控
                    fi, err := os.Stat(ev.Name)
                    if err == nil && fi.IsDir() {
                        w.watch.Remove(ev.Name)
                        fmt.Println("删除监控 : ", ev.Name)
                    }
                }
                if ev.Op&fsnotify.Rename == fsnotify.Rename {
                    //如果重命名文件是目录,则移除监控 ,注意这里无法使用os.Stat来判断是否是目录了
                    //因为重命名后,go已经无法找到原文件来获取信息了,所以简单粗爆直接remove
                    fmt.Println("重命名文件 : ", ev.Name)
                    w.watch.Remove(ev.Name)
                }
                if ev.Op&fsnotify.Chmod == fsnotify.Chmod {
                    fmt.Println("修改权限 : ", ev.Name)
                }
            }
        case err := <-w.watch.Errors:
            {
                log.Errorln("监控目录出错: ", err)
                return
            }
        }
    }
}

type Watch struct {
    watch *fsnotify.Watcher
}

上传S3服务器

  1. 上传之前得先有一台S3对象存储服务器,这里我直接就使用 Minio 镜像进行搭建

Minio新版和旧版还是有出入的,搭建以及后续维护和旧版差别比较大,而且网上各类教程主要针对旧版,方便后续排错,我是直接安装旧版

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
docker pull minio/minio:RELEASE.2021-06-17T00-10-46Z

docker run -d -p 9000:9000 --restart=always --name minioDemo\
  -e "MINIO_ACCESS_KEY=admin" \
  -e "MINIO_SECRET_KEY=admin123." \
  -v D:/docker/minio/data:/data \
  -v D:/docker/minio/config:/root/.minio \
  minio/minio:RELEASE.2021-06-17T00-10-46Z server /data
  1. 我这边使用的是 https://github.com/aws/aws-sdk-go-v2SDK,基本参照 官方文档 进行开发,整体难度也不高,主要是在创建私有S3服务的Client 创建上有点坑,我也记录写下来过 详情查看

过了段时间再看,发现官方的文档整理了,好多东西都没了,主要的代码Github也没有了 另一个官方文档

如下代码片段仅供参考(仅实现了上传对象及Tag)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func uploadHandler(ctx context.Context, path, keyName, tags string) error {
    endpoint := global.GLO_CONF.S3EndPoint
    accessKey := global.GLO_CONF.S3AccessKey
    secretKey := global.GLO_CONF.S3SecretKey
    bucket := global.GLO_CONF.S3Bucket

    client, err := util.GetClient(ctx, endpoint, accessKey, secretKey)
    if err != nil {
        return errors.New("创建S3连接请求失败, " + err.Error())
    }

    if err = util.UPutObject(ctx, client, path, bucket, keyName); err != nil {
        return err
    }
    if err = util.UPutTag(ctx, client, bucket, keyName, tags); err != nil {
        return err
    }
    return nil
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func GetClient(ctx context.Context, endpoint, accessKey, secretKey string) (*s3.Client, error) {
    cfg, err := config.LoadDefaultConfig(
        ctx,
        config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")),
        config.WithEndpointResolverWithOptions(
            aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
                return aws.Endpoint{URL: endpoint}, nil
            })),
        config.WithRegion("us-east-1"),
    )
    if err != nil {
        return nil, err
    }

    client := s3.NewFromConfig(cfg, func(o *s3.Options) {
        o.UsePathStyle = true
        o.EndpointOptions.DisableHTTPS = true
    })
    return client, nil
}

type S3PutObjectAPI interface {
    PutObject(ctx context.Context,
        params *s3.PutObjectInput,
        optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}

func PutFile(c context.Context, api S3PutObjectAPI, input *s3.PutObjectInput) (*s3.PutObjectOutput, error) {
    return api.PutObject(c, input)
}

type S3PutTaggingAPI interface {
    PutObjectTagging(ctx context.Context,
        params *s3.PutObjectTaggingInput,
        optFns ...func(*s3.Options)) (*s3.PutObjectTaggingOutput, error)
}

func putTag(c context.Context, api S3PutTaggingAPI, input *s3.PutObjectTaggingInput) (*s3.PutObjectTaggingOutput, error) {
    return api.PutObjectTagging(c, input)
}

// UPutObject 上传对象
func UPutObject(ctx context.Context, client *s3.Client, path, bucket, key string) error {
    file, err := os.Open(path)
    if err != nil {
        return errors.New("上传对象时打开文件失败, " + err.Error())
    }
    defer file.Close()

    input := &s3.PutObjectInput{
        Bucket: aws.String(bucket),
        Key:    aws.String(key),
        Body:   file,
    }
    _, err = PutFile(ctx, client, input)
    if err != nil {
        return errors.New("上传对象时发生错误, " + err.Error())
    }
    return nil
}

// UPutTag 上传标签
func UPutTag(ctx context.Context, client *s3.Client, bucket, key string, tags map[string]string) error {
    input := &s3.PutObjectTaggingInput{
        Bucket: aws.String(bucket),
        Key:    aws.String(key),
    }
    index := 0
    for k, v := range tags {
        input.Tagging.TagSet[index] = types.Tag{Key: aws.String(k), Value: aws.String(v)}
        index++
    }
    _, err := putTag(ctx, client, input)
    if err != nil {
        return errors.New("上传标签时发生错误, " + err.Error())
    }
    return nil
}

后续

在实际实现过程中发现大文件在生成是比较耗时,而且创建文件的方式多种多样,一方面可以从监控的角度去解决,另一方面(我的方式),通过数据库的方式,这样刚好解决了上传超时或失败后的重传问题,多个协程之间通过 channel 来解决通信问题,最后代码主体结构变成了如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//init
...
  
//开启文件监控
go util.InitWatch(listeningPath)

//每隔INTERVAL 自动检测上传给对象存储服务器
go func() {
  for {
    timer := time.NewTimer(time.Second * time.Duration(global.GLO_CONF.Interval))
    <-timer.C
    core.UploadServe(global.GLO_DB)
  }
}()

//每隔INTERVAL 转发完成信息
go func() {
  for {
    timer := time.NewTimer(time.Second * time.Duration(global.GLO_CONF.Interval))
    <-timer.C
    core.ScadaMesServe(global.GLO_DB)
  }
}()

//删除数据库废弃数据, 超时时间: 30天
go func() {
  for {
    // 检查频率: 24小时
    timer := time.NewTimer(time.Hour * time.Duration(24))
    <-timer.C
    core.CleanServe(global.GLO_DB)
  }
}()

//阻塞等待文件变化并保存至数据库
for {
  select {
    case path := <-global.GLO_CH_FILE:
    go core.Add2Cache(path, kmap)
    case reqByte := <-global.GLO_CH_REQ:
    go core.UpdateCache(reqByte)
  }
}

当然也有别的方式,只是我没想到

参考

本文作者:ZGGSONG

本文链接:https://www.zggsong.cn/archives/listen_file_upload_s3_with_golang.html

版权声明:本站所有未注明转载的文章均为原创,并采用CC BY-NV-SA 4.0授权协议,转载请注明来源

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
S3对象存储获取预签名URL | Golang
最近学习使用对象存储,自然要学习一下 Amazon S3,同时最近学了一下Golang,简单记录一下学习使用 AWS SDK for Go V2 生成文件预签名URL,
ZGGSONG
2022/07/21
3.2K0
S3对象存储获取预签名URL | Golang
最近学习使用对象存储,自然要学习一下 Amazon S3,同时最近学了一下Golang,简单记录一下学习使用 AWS SDK for Go V2 生成文件预签名URL,
ZGGSONG
2022/09/09
2.4K1
使用DaemonSet实现heapdump文件自动化管理
heapdump文件是Java应用遭遇OOM后的诊断报告,记录了某一时刻 JVM 堆中对象的详细使用情况,是 JVM 堆内存的一个快照。通过分析 heapdump 文件,我们可以深入了解到内存中究竟存在哪些对象,它们占用了多少内存空间,以及对象之间的引用关系如何。这对于定位内存泄漏问题至关重要。
没有故事的陈师傅
2025/02/07
950
使用DaemonSet实现heapdump文件自动化管理
MinIO 分片上传
MinIO 简单易用。简单性是 EB 级数据基础设施的基础 - 无论是在技术上还是在操作上。MinIO 使用和部署非常简单,没有其他对象存储可以让您在最快的时间内实现下载到生产环境的部署。
恋喵大鲤鱼
2023/10/12
4.4K0
MinIO 分片上传
golang 源码分析:mc,minio-go
对象存储服务(Object Storage Service,OSS)是一种海量、安全、低成本、高可靠的云存储服务,适合存放任意类型的文件。容量和处理能力弹性扩展,多种存储类型供选择,全面优化存储成本。Minio 除了直接作为对象存储使用,还可以作为云上对象存储服务的网关层,无缝对接到 Amazon S3、MicroSoft Azure。 在学习minio的源码之前,先阅读下minio的客户端mc和golang sdk minio-go
golangLeetcode
2022/08/03
1.5K0
S3上传代码用例-golang
S3上传用例-golang 使用AWS-SDk-golang实现文件上传,支持大文件并发,注意只支持AWS4签名,因此ceph的jewel以上版本才可以用。 package main import ( "fmt" "os" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/service/s3/s3manager"
用户1260683
2018/03/26
6.8K0
Golang语言 监控文件变化小程序.
package main import ( "log" "github.com/go-fsnotify/fsnotify" ) func main() { watcher, err := fsnotify.NewWatcher() if err != nil { log.Fatal(err) } defer watcher.Close() done := make(chan bool) go func() { for { select { case event :=
李海彬
2018/03/20
1.7K0
golang 源码分析:minio(part II)文件的操作
在分析完minio请求的路由后golang 源码分析:minio(part I)路由,我们看下一个文件是如何落盘的,不考虑gateway情况,我们从serverMain开始:
golangLeetcode
2022/08/03
1.4K0
golang 源码分析:minio(part II)文件的操作
开发一个 etcd 备份的 operator
前面我们已经实现了一个简单的 etcd operator,要实现 etcd 集群的完整运维,备份和恢复肯定也是必不可少的,本文主要和大家介绍如何编写一个用于 etcd 备份的 Operator。
我是阳明
2021/01/04
1.9K0
开发一个 etcd 备份的 operator
通过Go写一个简易版的上传文件到存储的工具
一、场景说明在设计CICD流程中,需要将安卓的APK包上传到文件存储中去,然后将对应的下载链接发给产研、运营人员进行内部使用测试;之前我是通过Groovy实现将构建好的成品上传到私服中去的,后面业务全部迁移上云之后,就想着在将APK包直接上传到OSS存储中去,然后将生成的下载链接通过企业微信或者钉钉自动推送给相应的人员。项目地址:https://github.com/dqzboy二、实现方式创建子账号,获取子账号AccessKey创建Bucket,并授权OSS读写权限通过Go调用OSS-SDK进行实现文件的
浅时光博客
2022/09/12
1K0
通过Go写一个简易版的上传文件到存储的工具
GO 中 ETCD 的编码案例分享
要是对 服务注册与发现,ETCD 还有点兴趣的话,欢迎查看文章 服务注册与发现之ETCD
阿兵云原生
2023/02/16
3340
聊聊dapr的fswatcher
github.com/fsnotify/fsnotify@v1.4.9/kqueue.go
code4it
2021/03/08
3460
聊聊dapr的fswatcher
在 K8S 中 Java OOM dump 文件存储方案
本文试图解决在 k8s 环境下 java 内存溢出时候 dump 文件的存储问题。
谢正伟
2021/05/27
10.1K1
在 K8S 中 Java OOM dump 文件存储方案
grpc reslover源码分析
1.程序启动时,客户端是如何从一个域名/服务名,获取到其对应的实例ip,然后与之建立连接的呢?
golangLeetcode
2022/08/02
4630
通过S3协议实现通用的文件存储服务中间件
在日常开发文件上传相关服务时,通常都会选择腾讯云,阿里云,七牛云等提供的oss服务作为文件存储系统,如果需要自行搭建文件存储系统,通常则会采用minio等开源项目。
大忽悠爱学习
2022/09/28
5.5K0
通过S3协议实现通用的文件存储服务中间件
记对象存储服务——Minio的使用
上面实例中,采用此方法上传的文件,不论图片还是文本,在存储桶中展示均为灰色的文件,这样会导致外联会直接下载文件,从而起不到预览的效果,如下图标红所示:
吐吐吐吐吐葡萄皮
2019/04/04
15.3K3
记对象存储服务——Minio的使用
MinIO对象存储
MinIO 是一个基于Apache License v2.0开源协议的对象存储服务。它兼容亚马逊S3云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等,而一个对象文件可以是任意大小,从几kb到最大5T不等。
别团等shy哥发育
2023/03/30
7K0
MinIO对象存储
K8s源码分析(12)-资源的数据访问层
上一篇文章中,我们主要介绍了在 kubernetes 中不同版本的资源是如何注册到 schema 对象之中,包括内部版本资源,所有外部版本资源。以及资源的 model 类型的注册,资源的初始化函数(即默认值函数)的注册,资源的 label 转换函数的注册,和内外部版本相互转换函数的注册。在本篇文章里, 我们主要来介绍资源的数据访问层。
TA码字
2021/12/07
6980
K8s源码分析(12)-资源的数据访问层
milvus对象存储和消息中间件设计模式分析
对于这种类型一个是mq,一个是存储,相比工厂方法设计模式,使用抽象工厂设计模式更合理。
melodyshu
2024/04/23
1590
go-kit 微服务 整合Promtheus解决监控告警问题
Prometheus基于中央化的规则计算、统一分析和告警的新模型, 完美地解决了传统监控模型的痛点。所以说其对传统监控系统的测试和告警模型进行了彻底的颠覆。
Johns
2021/04/08
1.4K0
go-kit 微服务 整合Promtheus解决监控告警问题
相关推荐
S3对象存储获取预签名URL | Golang
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验