使用 ADB(Android Debug Bridge)实现双向 IPC(进程间通信)通信可以通过多种方式实现。以下是一个示例,展示了如何使用 ADB 和 Python 实现双向 IPC 通信。
在这个示例中,我们将使用 ADB Shell 命令和 Python 脚本来实现双向通信。我们将通过 ADB 向 Android 设备发送命令,并从设备接收响应。
首先,确保 ADB Server 已经启动。你可以在命令行中运行以下命令来启动 ADB Server:
adb start-server
以下是一个 Python 脚本示例,展示了如何通过 ADB 实现双向通信:
import subprocess
import time
def adb_command(command):
"""通过 ADB 发送命令并返回输出"""
result = subprocess.run(['adb', 'shell', command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
return result.stdout.strip()
def main():
# 检查设备连接
devices = adb_command('devices')
if 'device' not in devices:
print("No devices connected.")
return
# 发送命令并接收响应
while True:
command = input("Enter ADB command (or 'exit' to quit): ")
if command.lower() == 'exit':
break
response = adb_command(command)
print(f"Response: {response}")
# 模拟双向通信
if command == 'date':
print("Sending date command to device...")
response = adb_command('date')
print(f"Device date: {response}")
time.sleep(1)
if __name__ == "__main__":
main()
adb_command
函数:这个函数通过 ADB 发送命令并返回输出。它使用 subprocess.run
来执行 ADB 命令,并捕获标准输出和标准错误。main
函数:这个函数是脚本的主入口。它首先检查是否有设备连接,然后进入一个循环,等待用户输入 ADB 命令。输入的命令通过 ADB 发送到设备,并返回响应。如果用户输入 exit
,循环将终止。你还可以在 Android 设备上运行一个脚本,通过 ADB 与 Python 脚本进行通信。以下是一个简单的示例,展示了如何在 Android 设备上运行一个 Shell 脚本,并通过 ADB 与 Python 脚本进行通信。
在 Android 设备上创建一个 Shell 脚本,例如 device_script.sh
:
#!/bin/sh
while true; do
echo "Waiting for command..."
read command
if [ "$command" = "exit" ]; then
echo "Exiting..."
break
fi
echo "Received command: $command"
response=$(eval $command)
echo "Response: $response"
done
将这个脚本推送到 Android 设备:
adb push device_script.sh /data/local/tmp/
adb shell chmod +x /data/local/tmp/device_script.sh
修改 Python 脚本以与 Android 设备上的 Shell 脚本进行通信:
import subprocess
import time
def adb_command(command):
"""通过 ADB 发送命令并返回输出"""
result = subprocess.run(['adb', 'shell', command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
return result.stdout.strip()
def main():
# 检查设备连接
devices = adb_command('devices')
if 'device' not in devices:
print("No devices connected.")
return
# 启动设备上的脚本
adb_command('/data/local/tmp/device_script.sh &')
# 发送命令并接收响应
while True:
command = input("Enter command for device (or 'exit' to quit): ")
if command.lower() == 'exit':
adb_command('echo exit > /data/local/tmp/device_script.sh')
break
adb_command(f'echo {command} > /data/local/tmp/device_script.sh')
time.sleep(1)
response = adb_command('cat /data/local/tmp/device_script.sh')
print(f"Response: {response}")
if __name__ == "__main__":
main()使用 ADB(Android Debug Bridge)实现双向 IPC(进程间通信)通信可以通过以下几种方式实现:
下面是一个示例,展示了如何使用 ADB Shell 命令和 ADB Forwarding 实现双向 IPC 通信。
首先,在 Android 设备上创建一个简单的服务,该服务可以通过 ADB Shell 命令进行交互。
// MainActivity.java
package com.example.adbipc;
import android.app.Service;
import android.content.Intent;
import android.os.IBinder;
import android.os.Binder;
import android.util.Log;
public class IPCService extends Service {
private final IBinder binder = new LocalBinder();
public class LocalBinder extends Binder {
IPCService getService() {
return IPCService.this;
}
}
@Override
public IBinder onBind(Intent intent) {
return binder;
}
public String executeCommand(String command) {
Log.d("IPCService", "Received command: " + command);
// 处理命令并返回结果
return "Executed: " + command;
}
}
在主机上编写一个脚本,通过 ADB Shell 命令与 Android 设备上的服务进行通信。
import subprocess
def adb_shell(command):
result = subprocess.run(['adb', 'shell', command], stdout=subprocess.PIPE)
return result.stdout.decode('utf-8')
def main():
# 启动服务
adb_shell('am startservice -n com.example.adbipc/.IPCService')
# 发送命令并获取结果
command = 'your_command_here'
result = adb_shell(f'echo "{command}" | nc -U /data/local/tmp/ipc_socket')
print(f'Result: {result}')
if __name__ == '__main__':
main()
在 Android 设备上创建一个简单的服务器,该服务器可以通过网络进行通信。
// MainActivity.java
package com.example.adbipc;
import android.app.Service;
import android.content.Intent;
import android.os.IBinder;
import android.os.Binder;
import android.util.Log;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.io.InputStream;
import java.io.OutputStream;
public class IPCService extends Service {
private final IBinder binder = new LocalBinder();
private ServerSocket serverSocket;
public class LocalBinder extends Binder {
IPCService getService() {
return IPCService.this;
}
}
@Override
public IBinder onBind(Intent intent) {
return binder;
}
@Override
public void onCreate() {
super.onCreate();
new Thread(new Runnable() {
@Override
public void run() {
try {
serverSocket = new ServerSocket(12345);
while (true) {
Socket clientSocket = serverSocket.accept();
InputStream input = clientSocket.getInputStream();
OutputStream output = clientSocket.getOutputStream();
byte[] buffer = new byte[1024];
int bytesRead = input.read(buffer);
String command = new String(buffer, 0, bytesRead);
Log.d("IPCService", "Received command: " + command);
String response = "Executed: " + command;
output.write(response.getBytes());
clientSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
@Override
public void onDestroy() {
super.onDestroy();
try {
if (serverSocket != null) {
serverSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
在主机上编写一个脚本,通过 ADB Forwarding 与 Android 设备上的服务器进行通信。
import socket
import subprocess
def adb_forward(local_port, remote_port):
subprocess.run(['adb', 'forward', f'tcp:{local_port}', f'tcp:{remote_port}'])
def send_command(command):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('127.0.0.1', 12345))
s.sendall(command.encode())
response = s.recv(1024)
return response.decode('utf-8')
def main():
# 设置端口转发
adb_forward(12345, 12345)
# 发送命令并获取结果
command = 'your_command_here'
result = send_command(command)
print(f'Result: {result}')
if __name__ == '__main__':
main()
领取专属 10元无门槛券
手把手带您无忧上云