- 支持ctrl+关闭程序
- 相对完善的统计数据
# 后台启动
nohup xxx.py &
# 统计处理量
cat cos_process_log.csv |awk ‘{print $1}’ |awk ‘BEGIN{sum=0;count=0;}{sum+=$1;count+=1;}END{print sum / 1024 / 1024 / 1024 “GB ” count}’
283.74GB 5819
# 查询处理进度
tail -f nohup.out
进度:POOL: 5919 START: 5908 FINISH: 5876
# -*- coding=utf-8 from qcloud_cos.cos_client import CosConfig, CosS3Client import os from concurrent.futures.thread import ThreadPoolExecutor import threading import datetime from time import sleep from threading import Thread import io from qcloud_cos.cos_exception import CosClientError, CosServiceError import subprocess import signal bucket = 'xxxx-xxxx-111111111' cos = CosS3Client( CosConfig(Region='ap-xxxx', Secret_id='xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', Secret_key='xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')) dir_logs = f'/data/mosaic/output/mmpeg' dir_tmp_files = f'/data/mosaic/output/tmp' dir_target_files = f'/data/mosaic/output/target' os.makedirs(dir_logs, exist_ok=True) os.makedirs(dir_tmp_files, exist_ok=True) os.makedirs(dir_target_files, exist_ok=True) start_task_num = 0 finish_task_num = 0 pooled_task_num = 0 task_run_lock = threading.RLock() executor = ThreadPoolExecutor(max_workers=32, thread_name_prefix='object_check_') log_lock = threading.RLock() cos_process_log_fi = open(dir_logs + '/cos_process_log.csv', mode='a') cos_erro_log_fi = open(dir_logs + '/cos_erro_log.txt', mode='a') prefix = '视频/' prefix_output = '视频-output/' running_lock = threading.RLock() running = True def list_cos_subobject(Prefix="", Marker="") -> ([], [], str, bool): response = cos.list_objects(Bucket=bucket, Prefix=Prefix, Delimiter='/', Marker=Marker, MaxKeys=100, ContentType='text/html; charset=utf-8') dirs = [] keys = [] if 'CommonPrefixes' in response: for folder in response['CommonPrefixes']: dirs.append(folder['Prefix']) if 'Contents' in response: for content in response['Contents']: keys.append(content['Key']) isTruncated = response['IsTruncated'] == 'true' nextMarker = "" if isTruncated: nextMarker = response['NextMarker'] return (dirs, keys, nextMarker, isTruncated) def get_obj(bucket: str, key: str, tmp_file: str): for i in range(0, 10): try: response = cos.download_file(Bucket=bucket, Key=key, DestFilePath=tmp_file) break except CosClientError or CosServiceError as e: print(e) def put_obj(bucket: str, key: str, file_path: str): # with open(file_path, 'rb') as fp: # response = cos.put_object(Bucket=bucket, Body=fp, Key=key) # print(response['ETag']) for i in range(0, 10): try: response = cos.upload_file(Bucket=bucket, Key=key, LocalFilePath=file_path) break except CosClientError or CosServiceError as e: print(e) log_lock.acquire() try: cos_erro_log_fi.write("%s\t%s\n" % (file_path, key)) cos_erro_log_fi.flush() finally: log_lock.release() def set_not_running(): running_lock.acquire() try: global running running = False finally: running_lock.release() def is_running(): # 停止了后,不再新增加任务 running_lock.acquire() try: global running return running finally: running_lock.release() def is_not_running(): # 停止了后,不再新增加任务 running_lock.acquire() try: global running return not running finally: running_lock.release() def get_pooled_task_num(): task_run_lock.acquire() try: global pooled_task_num return pooled_task_num finally: task_run_lock.release() def inc_pooled_task_num(): # pooled task_run_lock.acquire() try: global pooled_task_num pooled_task_num += 1 return pooled_task_num finally: task_run_lock.release() def get_start_task_num(): task_run_lock.acquire() try: global start_task_num return start_task_num finally: task_run_lock.release() def inc_start_task_num(): task_run_lock.acquire() try: global start_task_num start_task_num += 1 return start_task_num finally: task_run_lock.release() def inc_finish_task_num(): task_run_lock.acquire() try: global finish_task_num finish_task_num += 1 return finish_task_num finally: task_run_lock.release() def process_one_key(dir: str, key: str, cur_index: int): if key.endswith('.mp4'): global dir_logs global dir_tmp_files global dir_target_files #------------------------------------------------- # if key.endswith('.jpg') and not key.endswith('-des.jpg'): # tmp_file = key.replace(prefix, dir_tmp_files + "/") # execd_file = key.replace(prefix, dir_target_files + "/") _, name = os.path.split(key) _, ext = os.path.splitext(key) tmp_file = "%s/%07d%s" % (dir_tmp_files, cur_index, ext) execd_file = "%s/%07d%s" % (dir_target_files, cur_index, ext) target_path = key.replace(prefix, prefix_output) os.makedirs(os.path.dirname(tmp_file), exist_ok=True) os.makedirs(os.path.dirname(execd_file), exist_ok=True) if cos.object_exists(Bucket=bucket, Key=target_path): return #------------------------------------------------- download get_obj(bucket, key, tmp_file) #------------------------------------------------- 处理文件 # 清理旧文件 if dir_target_files in execd_file and os.path.exists(execd_file): os.remove(execd_file) # 处理文件 # ffmpeg -i input.mp4 -vcodec copy -an output.mp4 # cmd = "cp %s %s && ls -l %s" % (tmp_file, execd_file, execd_file) cmd = "ffmpeg -i %s -vcodec copy -an %s && ls -l %s" % ( tmp_file, execd_file, execd_file) # cmd = "ffmpeg -i %s -vcodec copy -acodec copy %s && ls -l %s" % (tmp_file, execd_file, execd_file) # os.system("cp %s %s" % (tmp_file, execd_file)) subprocess.call(cmd, shell=True) # 打印文件日志 #------------------------------------------------- 上传到新cos地址 put_obj(bucket=bucket, key=target_path, file_path=execd_file) #------------------------------------------------- TODO:打印处理文件的日志 file_stats = os.stat(tmp_file) file_size = file_stats.st_size log_lock.acquire() try: cos_process_log_fi.write("%s\t%s\n" % (file_size, target_path)) cos_process_log_fi.flush() finally: log_lock.release() # 清理本地文件 if dir_tmp_files in tmp_file and os.path.exists(tmp_file): os.remove(tmp_file) if dir_target_files in execd_file and os.path.exists(execd_file): os.remove(execd_file) def get_object_meta_target(dir, key): # 未开始的任务就不用做了 if is_not_running(): return cur_index = inc_start_task_num() print("开始处理:%07d %s" % (cur_index, key)) try: process_one_key(dir, key, cur_index) except Exception as e: print(e) finally: finish_total = inc_finish_task_num() print("完成处理:%07d %s 总计:%07d" % (cur_index, key, finish_total)) def check(prefix): global executor marker = "" while True: (dirs, keys, nextMarker, isTruncated) = list_cos_subobject(Prefix=prefix, Marker=marker) # 文件 for key in keys: if is_not_running(): return while get_pooled_task_num() - get_start_task_num() > 10: stat_log_print() if is_not_running(): return sleep(1) inc_pooled_task_num() # 执行任务 executor.submit(get_object_meta_target, prefix, key) # 目录 for dir in dirs: check(dir) # 是否处理完 if not isTruncated: break # 下一步 marker = nextMarker def onSig(signo, frame): set_not_running() pass def stat_log_print(): global pooled_task_num global start_task_num global finish_task_num print("进度:POOL: %5d START: %5d FINISH: %5d" % (pooled_task_num, start_task_num, finish_task_num)) if __name__ == "__main__": # signal.signal(signal.SIGCHLD, onSigChld) signal.signal(signal.SIGINT, onSig) signal.signal(signal.SIGTERM, onSig) check(prefix) while (True): # 判断是否处理完 stat_log_print() pre_start_task_num = start_task_num if pre_start_task_num == 0: continue if pre_start_task_num == finish_task_num: sleep(10) if pre_start_task_num == start_task_num: break sleep(1) print("finish") # cat cos_process_log.csv |awk '{print $1}' |awk 'BEGIN{sum=0;count=0;}{sum+=$1;count+=1;}END{print sum / 1024 / 1024 / 1024 "GB " count}'