import time from bson.objectid import ObjectId from celery_app2 import celery_app2 from pymongo import MongoClient import requests import json uri="mongodb://owner:Pheecian1@47.101.198.30:27017/baojia" mongoclient=MongoClient(uri) db = mongoclient['baojia'] dbcollection=db['Dwgc'] qdcollection=db['qdxm'] biaocollection=db['jingjibiao'] djcollection=db['Djcs'] from redis.exceptions import TimeoutError @celery_app2.task(autoretry_for=(TimeoutError,)) def process_data(data:dict)-> dict: print(data['name']) print(data['detail'])##dwgcbh biao = biaocollection.find_one({'_id': ObjectId(data['name'])}) Dxgcxx = biao['Dxgcxx'] label="" city = biao['city'] for dxgc in Dxgcxx: Dwgc = dxgc['Dwgc'] for child in Dwgc: if child['Dwgcbh'] == data['detail']: label = child['Dwgcmc'] qd = qdcollection.find({'biao_id': data['name'], 'Dwgcbh': data['detail']}) qds = {} qds2 = {} djs = {} djs2 = {} for entry in qd: qds[entry['清单编码']]= 0 qds2[entry['清单编码']]= 0 dj = djcollection.find({'biao_id': data['name'], 'Dwgcbh': data['detail']}) for entry in dj: djs[entry['清单编码']]= 0 djs2[entry['清单编码']]= 0 result = dbcollection.update_one({'biao_id': data['name'], 'Dwgcbh': data['detail']}, {'$set': {'status': '开始', 'qd_detail': qds, 'dj_detail': djs}}) maxN = 0 for entry in qds: maxN = maxN + 1 url = 'http://0.0.0.0:8000/submit/' qd_ = qdcollection.find_one({'biao_id': data['name'], 'Dwgcbh': data['detail'], '清单编码': entry}) response = requests.post(url, json={ 'bianma': entry, 'mc': qd_['名称'], 'tz': qd_['项目特征'], 'dw': qd_['单位'], 'sl': qd_['数量'], 'n': maxN, 'label': label, 'name': data['name'], 'bh': data['detail'], 'city': city }) text = response.text id_ = json.loads(text)['id'] elapse = 0 result = None while elapse < 300: time.sleep(10) elapse = elapse + 10 response = requests.get('http://0.0.0.0:8000/check/'+id_) text = json.loads(response.text) result = text['result'] if text['status'] in ['SUCCESS', 'FAILURE']: break if result is not None and text['status'] == 'SUCCESS': qds2[entry]=1 result['result'][0]['biao_id'] = qd_['biao_id'] result['result'][0]['Dwgcbh'] = qd_['Dwgcbh'] result['result'][0]['bt'] = qd_['bt'] response = requests.post('http://www.xiaozaotongxue.com/api/save2/', json={ 'name': json.dumps(result['result']), }) print(response.text) else: qds2[entry]=-1 result = dbcollection.update_one({'biao_id': data['name'], 'Dwgcbh': data['detail']}, {'$set': {'qd_detail': qds2}}) break maxN = 0 for entry in djs: maxN = maxN + 1 url = 'http://0.0.0.0:8000/submit/' qd_ = djcollection.find_one({'biao_id': data['name'], 'Dwgcbh': data['detail'], '清单编码': entry}) response = requests.post(url, json={ 'bianma': entry, 'mc': qd_['名称'], 'tz': qd_['项目特征'], 'dw': qd_['单位'], 'sl': qd_['数量'], 'n': maxN, 'label': label, 'name': data['name'], 'bh': data['detail'], 'city' : city }) text = response.text id_ = json.loads(text)['id'] elapse = 0 result = None while elapse < 300: time.sleep(10) elapse = elapse + 10 response = requests.get('http://0.0.0.0:8000/check/'+id_) text = json.loads(response.text) result = text['result'] if text['status'] in ['SUCCESS', 'FAILURE']: break if result is not None and text['status'] == 'SUCCESS': djs2[entry]=1 result['result'][0]['biao_id'] = qd_['biao_id'] result['result'][0]['Dwgcbh'] = qd_['Dwgcbh'] response = requests.post('http://www.xiaozaotongxue.com/api/savedjcs2/', json={ 'name': json.dumps(result['result']), }) print(response.text) else: djs2[entry]=-1 result = dbcollection.update_one({'biao_id': data['name'], 'Dwgcbh': data['detail']}, {'$set': {'dj_detail': djs2}}) break return {"result": []}