基于Python实现Harbor镜像垃圾清理的代码是什么
Admin 2022-08-18 群英技术资讯 276 次浏览
[root@lidabai ~]# vim harbor_clearimage.py # -*- coding:utf-8 -*- import requests from requests.auth import HTTPBasicAuth from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry import os import time import logging from logging.handlers import RotatingFileHandler class Harbor(object): def __init__(self, api_url, api_user, api_passwd, tag_num, proj_exclude): self.api_url = api_url self.api_user = api_user self.api_passwd = api_passwd self.api_auth = HTTPBasicAuth(self.api_user, self.api_passwd) self.tag_num = int(tag_num) self.proj_exclude = proj_exclude self.proj_url = self.api_url + "/projects" self.repos_url = self.api_url + "/repositories" self.header_dict = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.80 Safari/537.36', 'Content-Type': 'application/x-www-form-urlencoded' } self.deldata = [] self.session = requests.Session() self.session.auth = self.api_auth retry = Retry(connect=3, backoff_factor=1) adapter = HTTPAdapter(max_retries=retry) self.session.mount('http://', adapter) self.session.keep_alive = False def soft_del_repos(self): try: projresp = self.session.get(self.proj_url, headers=self.header_dict) if projresp.status_code == 200: projdata = projresp.json() for proj in projdata: if proj['name'] not in self.proj_exclude # and proj['name'] == "gxjxhwebtest-28003" : try: reporesp = self.session.get(self.repos_url, params={"project_id": proj['project_id']} , headers=self.header_dict) if reporesp.status_code == 200: repodata = reporesp.json() for repo in repodata: if repo["tags_count"] > self.tag_num: tag_url = self.repos_url + "/" + repo['name'] + "/tags" tags = self.session.get(tag_url).json() tagdata = sorted(tags, key=lambda a: a["created"]) del_tags = tagdata[0:len(tagdata) - self.tag_num] for tag in del_tags: del_repo_tag_url = tag_url + "/" + tag['name'] cmd = 'curl -v -X DELETE -u "' + self.api_user + ":" + self.api_passwd + '" -H "accept: application/json" ' + del_repo_tag_url try: #del_resp = self.session.delete(del_repo_tag_url,headers=self.header_dict) ok = os.system(cmd) if ok == 0 : logging.info("httpdel:" + del_repo_tag_url ) _deldata = {"project_id":proj['project_id'],"project_name":proj['name'],"repo_name":repo['name'],"tag_name":tag['name']} self.deldata.append(_deldata) logging.info("httpdel project_id=" + str(proj['project_id']) + ",project_name=" + proj['name'] + ",repo_name=" + repo['name'] + ",tag_name=" + tag['name']) else: logging.error("exec_cmd fail:" + cmd ) except: logging.error("exec_cmd fail:" + cmd ) except: logging.error("httpget fail:" + self.repos_url) else: logging.error("httpget fail:" + self.proj_url ) except: logging.error("apilogin fail:" + self.api_url ) return self.deldata def hard_del_repo(self): pwd_cmd = "cd /dcos/app/harbor/ " #进入到Harbor安装目录 stop_cmd = "docker-compose stop" #停止Harbor服务 del_cmd = "docker run -it --name gc --rm --volumes-from registry goharbor/registry-photon:v2.7.1-patch-2819-v1.8.6 garbage-collect /etc/registryctl/config.yml" start_cmd = "docker-compose start" #启动Harbor服务 os.system(pwd_cmd) ok1 = os.system(stop_cmd) if ok1 == 0 : time.sleep(10) ok2 = os.system(del_cmd) ok3 = os.system(start_cmd) if ok3 == 0 : logging.info("hard_del_repo ok:") else: logging.error("hard_del_repo fail:") if __name__ == "__main__": Rthandler = RotatingFileHandler('harbor_repo_clear.log', maxBytes=10*1024*1024,backupCount=5) logging.basicConfig(level=logging.INFO) formatter = logging.Formatter('%(levelname)s %(asctime)s %(process)d %(thread)d %(pathname)s %(filename)s %(funcName)s[line:%(lineno)d] %(message)s') Rthandler.setFormatter(formatter) logging.getLogger('').addHandler(Rthandler) api_url = "http://192.168,2,66:443/api" #Harbor服务的API URL api_user = "admin" #超级管理员 api_passwd = "Harbor12345" #超级管理员的用户密码 tag_num = 20 #保留的tag数量 proj_exclude = ["library"] harborClient = Harbor(api_url,api_user,api_passwd,tag_num,proj_exclude) data = harborClient.soft_del_repos() if len(data) > 0 : #harborClient.hard_del_repos() logging.info("hard_del_repo:")
[root@lidabai ~]# python harbor_clearimage.py
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
猜你喜欢
python怎样实现遍历磁盘目录?一些Python新手对于如何便利磁盘不是很清楚,因此这篇文章就给大家介绍一下使用深度遍历,用栈遍历和广度遍历磁盘,需要的朋友可以参考学习。
本文主要介绍了pandas实现数据合并的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
这篇文章主要介绍numpy怎么创建空数组,下文有具体的实例和代码,对新手理解numpy创建空数组有一定参考价值,感兴趣的朋友可以了解一下,希望大家阅读完这篇文章能有所收获。
这篇文章主要介绍了Python format()格式化输出方法, Python 2.6以后,Python 中的就提供了字符串类型(str)提供了 format() 方法对字符串进行格式化,夏敏我们就来了解这个方法吧,需要的小伙伴也可以参考一下</P><P>
这篇文章主要介绍了基于Python实现新版正方系统滑动验证码识别算法和方案,文中示例代码对我们的学习和工作有一定的帮助,感兴趣的可以了解一下
成为群英会员,开启智能安全云计算之旅
立即注册Copyright © QY Network Company Ltd. All Rights Reserved. 2003-2020 群英 版权所有
增值电信经营许可证 : B1.B2-20140078 粤ICP备09006778号 域名注册商资质 粤 D3.1-20240008