下面分享了几个python脚本,在日常工作中提高效率。
环境介绍
OS:MAC PRO
脚本语言:python3.7
编辑器:vscode/sublimetext3
python使用相对简单,快速便捷,很适合作为脚本开发;作为"资深"的sub3/vscode控,使用编辑器鞋脚本再也适合不过,vscode中的调试功能太好用啦:
生成海量csv数据文件
测试同学为了压测接口,让我帮忙提供不重复的数据,正好用python写了一个简单脚本:
# -*- coding: utf-8 -*-
import requests
import sys
import re
import csv
import random
'''
从csv文件中读取数据
'''
def readCsv():
# 读取csv至字典
csvFile = open("/Users/lioswong/LiosWong/sublimetext/python/脚本/bindPhone.csv", "r")
reader = csv.reader(csvFile)
# 建立空字典
result = {}
for item in reader:
# 忽略第一行
if reader.line_num == 1:
continue
result[item[0]] = item[1]
csvFile.close()
print(result)
'''
往csv文件中写入数据
'''
def writerCsv():
fileHeader = ["customerPhone", "orderNo","driverPhone","driverNo"]
csvFile = open("/Users/lioswong/LiosWong/sublimetext/python/脚本/test6.csv", "w")
writer = csv.writer(csvFile)
d1 = [0]*4
line = 1
for i in range(0,1000001):
if line==1:
writer.writerow(fileHeader)
d1[0]=random.choice(['177','156','159','188','199','139','152','188','133','185','170','136','189','158','178','151'])+"".join(random.choice("0123456789") for i in range(8))
d1[1]="".join(random.choice("0123456789") for i in range(10))
d1[2]=random.choice(['152','139','199','188','190','185','156','136','133','158','136','151','153'])+"".join(random.choice("0123456789") for i in range(8))
d1[3]="".join(random.choice("0123456789") for i in range(8))
# 写入的内容都是以列表的形式传入函数
writer.writerow(d1)
print(line)
line+=1
csvFile.close()
writerCsv()
# readCsv()
sql查询导出csv文件脚本
由于公司内部有严格的权限控制,sql查询导出需要提工单,流程繁琐,为了方便工作,写了下面脚本,可以支持任意sql的查询导出,只限于工作导出,当然大批量的爬取数据,公司的数据中心同学可能随时查水表:
#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
import logging
import math
import time
import re
import requests
import sys
import csv
from lxml import etree
def exec(csrf_token, sql_content, dbase, dbconfig):
headers = {
# cookie需要用最新的,否则会失败
"cookie": "UM_distinctid=1b396-06251-13c680-1727b86bdbca57; _ga=GA1.22421.1591582910; JSESSIONID=BaxMuUVZRUou-y4XBkUdcdNc_DMkGc4qhwSkOi59AoZhjxfHfhU3WAqr8pCrEQ1omC3IttHRV4bz2j5s1NW-l88_gC9V3A5.EeXDfg.DB2JoXb7DZUDLJcRSpQLALJtq8M",
}
url = "https://xxxxx/query/mysql"
data = {'csrf_token': csrf_token, 'sql_content': sql_content,
'dbase': dbase, 'dbconfig': dbconfig}
re = requests.post(url, data=data, headers=headers)
# 深坑,可能td节点数据为空,需要特殊处理
new_text = re.text.replace(r'<td style="white-space: pre-wrap;"></td>',
'<td style="white-space: pre-wrap;">None</td>')
html = etree.HTML(new_text)
thead = html.xpath("//div[@class='x_content sql_result']/table/thead")[0]
th = thead.xpath("//tr/th/text()")
filterStr = ['数据库信息', '名称', '内容预览', '是否开放', '备注', '操作']
fileHeader = []
for x in th:
if x not in filterStr:
fileHeader.append(x)
pass
writerToCSV(fileHeader, html)
def writerToCSV(fileHeader, html):
csvFile = open(
"/Users/lioswong/LiosWong/sublimetext/Script-languages/python/draft/cc_sql_to_file.csv", "w", encoding='utf-8-sig')
writer = csv.writer(csvFile)
writer.writerow(fileHeader)
hLen = len(fileHeader)
tdData = html.xpath(
"//div[@class='x_content sql_result']/table/tbody/tr/td/text()")
col = 0
d1 = []
for i in tdData:
if col == 0:
d1 = [0 for i in range(hLen)]
if col + 1 == hLen:
d1[col] = i
writer.writerow(d1)
col = 0
pass
else:
d1[col] = i
col += 1
csvFile.close()
exec('IjdhNjljZGI1OGTJiNWUyODIwZjEyNGI0Njg2YmYi.EeXC9g.egKcK0rKVP0aCMZCqDQvUUU0lRg',"select * from test order by id desc limit 100", 'xxxx', '从库-生产')
kafka Consumer Producer
开发环境、测试环境会遇到手动发送消息、消费消息的需要:
#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
import json
import logging
logging.basicConfig(level=logging.INFO)
client = KafkaClient(hosts="10.0.0.1:9092") # 可接受多个Client,多个broker
def sendDevKafkaMsg(topic, message):
try:
topic = client.topics[topic] # 选择一个topic
producer = topic.get_producer(delivery_reports=True)
producer.produce(bytes(message, encoding="utf8"))
producer.get_delivery_report() # 返回之前发送失败的消息和结果
except Exception as e:
print(e)
data = {"appVersion":"34900","cityCode":"0532"}
sendDevKafkaMsg("XXX_INFO_CHANGE",json.dumps(data2))
#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
import _thread
import threading
import json
import logging
# logging.basicConfig(level=logging.INFO)
client = KafkaClient(hosts="10.0.0.1:9092")
def receiveMsg(topics):
topic = client.topics[topics] # 选择一个topic
consumer = topic.get_simple_consumer(consumer_group='dev26-dc',
reset_offset_on_start=False)
partitions = topic.partitions
offset_list = consumer.held_offsets
print("当前消费者分区offset情况{}".format(offset_list)) # 消费者拥有的分区offset的情况
consumer.reset_offsets([(partitions[0], 0)]) # 设置offset
msg = consumer.consume()
print("消费 :{}".format(msg.value.decode()))
while True:
receiveMsg("DRIVER_SIGNING_TYPE_CHANGE")
pass
dubbo_telnet自动化测试脚本
由于平时对外提供都是RPC接口,可以用单元测试、invoke调试本地接口,有时本地、开发环境、测试环境为了批量测试、压测RPC接口,该脚本可能会有用:
#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
import telnetlib
import unittest
import time
import re
import datetime
class Dubbo(telnetlib.Telnet):
prompt = 'dubbo>'
coding = 'utf-8'
def __init__(self, host=None, port=0):
super().__init__(host, port)
self.write(b'\n')
def command(self, flag, str_=""):
data = self.read_until(flag.encode())
self.write(str_.encode() + b"\n")
return data
def invoke(self, service_name, method_name, args):
command_str = "invoke {0}.{1}({2})".format(
service_name, method_name, args)
self.command(Dubbo.prompt, command_str)
data = self.command(Dubbo.prompt, "")
data = json.loads(data.decode(
Dubbo.coding, errors='ignore').split('\n')[0].strip())
return data
class qqTest(unittest.TestCase):
#setUp 用于设置初始化的部分,在测试用例执行前,这个方法中的函数将先被调用 #
def setUp(self):
'''
dev: Dubbo('120.26.98.15', 6666)
'''
self.dubbo_conn = Dubbo('127.0.0.1', 6666)
self.verificationErrors = [] # 脚本运行时,错误的信息将被打印到这个列表中#
self.accept_next_alert = True # 是否继续接受下一个警告#
# 司机开始服务
def test_start_service(self):
data = {"class": "com.xxxxx.qq.api.dto.input.DriverStartServiceParam", "driverNo": 019,"lat": 30.206907583333, "lng": 120.22090913, "orderLabel": 128, "orderNo": 541, "bizType": 1}
result = self.dubbo_conn.invoke(
"com.xxxxx.qq.api.app.response.DriverOrderResponseApi", "startService", data)
print(result)
time.sleep(2)
def tearDown(self):
'''
tearDown 方法在每个测试方法执行后调用,这个地方做所有清理工作,如退出浏览器等。
self.assertEqual([], self.verificationErrors) 是个难点,
对前面verificationErrors方法获得的列表进行比较;如查verificationErrors的列表不为空,输出列表中的报错信息。
'''
self.assertEqual([], self.verificationErrors)
class DriversssTest(unittest.TestCase):
#setUp 用于设置初始化的部分,在测试用例执行前,这个方法中的函数将先被调用 #
def setUp(self):
'''
local: Dubbo('127.0.0.1', 8951)
'''
self.dubbo_conn = Dubbo('127.0.0.1', 8951)
self.verificationErrors = [] # 脚本运行时,错误的信息将被打印到这个列表中#
self.accept_next_alert = True # 是否继续接受下一个警告#
def test_old_bind_phone(self):
data = [201689047,1221,"18058783246","18058755045"]
result = self.dubbo_conn.invoke(
"com.xxxxx.driver.sss.api.DriverVirtPhoneApi", "bindPhone", data)
print(result)
def queryVirtPhone(self):
data = [782919,'18790899859']
result = self.dubbo_conn.invoke(
"com.xxxxx.driver.sss.api.DriverVirtPhoneApi", "queryVirtPhone", data)
print(result)
def tearDown(self):
'''
tearDown 方法在每个测试方法执行后调用,这个地方做所有清理工作,如退出浏览器等。
self.assertEqual([], self.verificationErrors) 是个难点,
对前面verificationErrors方法获得的列表进行比较;如查verificationErrors的列表不为空,输出列表中的报错信息。
'''
self.assertEqual([], self.verificationErrors)
'''
local
'''
class LocalTest(unittest.TestCase):
#setUp 用于设置初始化的部分,在测试用例执行前,这个方法中的函数将先被调用 #
def setUp(self):
'''
local1: Dubbo('127.0.0.1', 20880)
'''
self.dubbo_conn = Dubbo('127.0.0.1', 20880)
self.verificationErrors = [] # 脚本运行时,错误的信息将被打印到这个列表中#
self.accept_next_alert = True # 是否继续接受下一个警告#
# 测试sayHello
def test_say_hello(self):
for x in range(0,1):
data = {"class":"com.alibaba.dubbo.demo.BeanParam","name":"lios","age":25+x}
result = self.dubbo_conn.invoke(
"com.alibaba.dubbo.demo.DemoService", "hello", data)
print(result)
pass
time.sleep(2)
def test_2(self):
print(self.dubbo_conn)
def tearDown(self):
'''
tearDown 方法在每个测试方法执行后调用,这个地方做所有清理工作,如退出浏览器等。
self.assertEqual([], self.verificationErrors) 是个难点,
对前面verificationErrors方法获得的列表进行比较;如查verificationErrors的列表不为空,输出列表中的报错信息。
'''
self.assertEqual([], self.verificationErrors)
'''
qq suite
'''
def qq_suite():
suite = unittest.TestSuite()
# suite.addTest(qqTest("test_update_driver_status"))
# suite.addTest(qqTest("test_start_service"))
suite.addTest(qqTest("test_orderConfirmV2"))
return suite
pass
'''
driver-sss suite
'''
def driver_sss_suite():
suite = unittest.TestSuite()
# suite.addTest(DriversssTest("test_pre_bind_phone"))
# suite.addTest(DriversssTest("test_bind_phone"))
# suite.addTest(DriversssTest("queryVirtPhone"))
suite.addTest(DriversssTest("test_old_bind_phone"))
return suite
pass
'''
local suite
'''
def local_suite():
suite = unittest.TestSuite()
suite.addTest(LocalTest("test_say_hello"))
return suite
pass
if __name__ == "__main__":
## 指定suite套件
unittest.main(defaultTest='driver_sss_suite')