我有以下要求
现在,我有一个执行上述所有任务的参与者,如下所示
package akka.first.java;
import akka.actor.UntypedActor;
public class MySingleActor extends UntypedActor {
public void onReceive(Object msg) {
if( msg instanceof sendRequest ) {
//Connect to a webserver with a username and password and get an authetication token
String token = getToken();
// Read file to get different parameters
Param param = readFile();
// Use the auth token fro step 1 and parameters from step 2 to send an http request to the web server
Response response = sendRequest (server, token, param);
}
}
private Param readFile() {
// reads file
}
private String getToken() {
//gets token
}
}readFile操作包含各种子任务,我认为它们应该是单独的参与者。但是,由于readFile()操作的返回是参与者执行其发送请求的主要任务所必需的,因此这可能是阻塞,根据docs的建议,这样做的最佳方法是什么?期货?
发布于 2016-09-19 14:52:33
正式文件提供以下解决方案:
使用未来是官方建议的方法之一,不过要格外小心。
让我们考虑第一种方法,因为它更一致。
首先,将所有阻塞IO操作提取到只执行一个阻塞IO操作的新参与者中。假设只有一个这样的操作是为了简洁:
public class MyBlockingIOActor extends UntypedActor {
public void onReceive(Object msg) {
// do blocking IO call here and send the result back to sender
}
}添加调度员配置,在参与者系统配置文件 (通常是application.conf)中处理阻塞参与者:
#Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO
blocking-io-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 1
}请确保在创建参与者系统时使用配置文件(特别是如果您决定为配置使用非标准文件名):
ActorSystem actorSystem = ActorSystem.create("my-actor-system", ConfigFactory.load("application.conf"));在此之后,您希望将执行阻塞IO的参与者分配给专用调度程序。您可以在这里描述的配置中或在创建参与者时这样做:
ActorRef blockingActor = context().actorOf(Props.create(MyBlockingIOActor.class).withDispatcher("blocking-io-dispatcher"));为了获得更大的吞吐量,请考虑将阻塞参与者封装到池中:
SupervisorStrategy strategy = new OneForOneStrategy(
5,
Duration.create(1, TimeUnit.MINUTES),
Collections.singletonList(Exception.class)
);
ActorRef blockingActor = context().actorOf(new SmallestMailboxPool(5).withSupervisorStrategy(strategy).props(Props.create(MyBlockingIOActor.class).withDispatcher("blocking-io-dispatcher")));您可以确保参与者以下列方式使用正确的dispatcher:
public class MyBlockingIOActor extends UntypedActor {
public void preStart() {
LOGGER.debug("using dispatcher: {}", ((Dispatcher)context().dispatcher()).id());
}
}发布于 2016-09-14 13:35:50
你可以用“期货”,也可以用“RxJava”来观察和观察。或不同的角色并将最终响应转发给orginial发送者
public class MySingleActor extends UntypedActor{
private ActorRef tokenActor;
private ActorRef readFileActor;
public MySingleActor(){
tokenActor = context().actorOf(Props.create(TokenActor.class),"tokenActor");
readFileActor = context().actorOf(Props.create(ReadFileActor.class),"readFileActor");
}
public void onReceive(Object msg) {
if( msg instanceof sendRequest ) {
Future<String> f= Futures.future(new Callable<String>() {
@Override public String call() throws Exception {
return getToken();
} },context().dispatcher());Patterns.pipe(f,context().dispatcher()).to(tokenActor).pipeTo(readFileActor,self());
}
}}或者代替管道
f.onComplete(new OnComplete<String>(){ public void onComplete(Throwable t, String result){ readFileActor.tell(result,self()); } }, context().system().dispatcher());
https://stackoverflow.com/questions/39477447
复制相似问题