首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >计算任务的Akka模型

计算任务的Akka模型
EN

Stack Overflow用户
提问于 2016-09-13 19:16:31
回答 2查看 238关注 0票数 7

我有以下要求

  • 使用用户名和密码连接到get服务器,并获得自动令牌。
  • 读取文件以获得不同的参数
  • 使用步骤1中的auth令牌和步骤2中的参数向web服务器发送http请求。

现在,我有一个执行上述所有任务的参与者,如下所示

代码语言:javascript
复制
package akka.first.java;

import akka.actor.UntypedActor;

public class MySingleActor extends UntypedActor {

    public void onReceive(Object msg) {

        if( msg instanceof sendRequest ) {

            //Connect to a webserver with a username and password and get an authetication token
            String token = getToken();
           // Read file to get different parameters
            Param param = readFile();
           // Use the auth token fro step 1 and parameters from step 2 to send an http request to the web server
            Response response = sendRequest (server, token, param);


        }

    }

    private Param readFile() {
        // reads file 
    }

    private String getToken() {
        //gets token 
    }
}

readFile操作包含各种子任务,我认为它们应该是单独的参与者。但是,由于readFile()操作的返回是参与者执行其发送请求的主要任务所必需的,因此这可能是阻塞,根据docs的建议,这样做的最佳方法是什么?期货?

EN

回答 2

Stack Overflow用户

发布于 2016-09-19 14:52:33

正式文件提供以下解决方案:

  • 在参与者(或由路由器Java、Scala管理的一组参与者)中执行阻塞调用,确保配置一个专门用于此目的或大小足够大的线程池。
  • 在未来中执行阻塞调用,确保在任何时间点都有这样的调用次数的上限(提交无限制的此类任务数量将耗尽内存或线程限制)。
  • 在未来中执行阻塞调用,为线程池提供对线程数量的上限,这对于运行应用程序的硬件是合适的。
  • 指定一个线程来管理一组阻塞资源(例如,NIO选择器驱动多个通道),并在事件作为参与者消息发生时进行分派。

使用未来是官方建议的方法之一,不过要格外小心。

让我们考虑第一种方法,因为它更一致。

首先,将所有阻塞IO操作提取到只执行一个阻塞IO操作的新参与者中。假设只有一个这样的操作是为了简洁:

代码语言:javascript
复制
public class MyBlockingIOActor extends UntypedActor {
    public void onReceive(Object msg) {
        // do blocking IO call here and send the result back to sender
    }
}

添加调度员配置,在参与者系统配置文件 (通常是application.conf)中处理阻塞参与者:

代码语言:javascript
复制
#Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO
blocking-io-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 32
  }
  throughput = 1
}

请确保在创建参与者系统时使用配置文件(特别是如果您决定为配置使用非标准文件名):

代码语言:javascript
复制
ActorSystem actorSystem = ActorSystem.create("my-actor-system", ConfigFactory.load("application.conf"));

在此之后,您希望将执行阻塞IO的参与者分配给专用调度程序。您可以在这里描述的配置中或在创建参与者时这样做:

代码语言:javascript
复制
ActorRef blockingActor = context().actorOf(Props.create(MyBlockingIOActor.class).withDispatcher("blocking-io-dispatcher"));

为了获得更大的吞吐量,请考虑将阻塞参与者封装到池中:

代码语言:javascript
复制
SupervisorStrategy strategy = new OneForOneStrategy(
        5,
        Duration.create(1, TimeUnit.MINUTES),
        Collections.singletonList(Exception.class)
);
ActorRef blockingActor = context().actorOf(new SmallestMailboxPool(5).withSupervisorStrategy(strategy).props(Props.create(MyBlockingIOActor.class).withDispatcher("blocking-io-dispatcher")));

您可以确保参与者以下列方式使用正确的dispatcher:

代码语言:javascript
复制
public class MyBlockingIOActor extends UntypedActor {
    public void preStart() {
        LOGGER.debug("using dispatcher: {}", ((Dispatcher)context().dispatcher()).id());
    }
}
票数 5
EN

Stack Overflow用户

发布于 2016-09-14 13:35:50

你可以用“期货”,也可以用“RxJava”来观察和观察。或不同的角色并将最终响应转发给orginial发送者

代码语言:javascript
复制
  public class MySingleActor extends UntypedActor{

private ActorRef tokenActor;
private ActorRef readFileActor;

public MySingleActor(){
    tokenActor = context().actorOf(Props.create(TokenActor.class),"tokenActor");
    readFileActor = context().actorOf(Props.create(ReadFileActor.class),"readFileActor");
}
public void onReceive(Object msg) {
    if( msg instanceof sendRequest ) {
        Future<String> f= Futures.future(new Callable<String>() {
            @Override public String call() throws Exception {
                return getToken();
            }            },context().dispatcher());Patterns.pipe(f,context().dispatcher()).to(tokenActor).pipeTo(readFileActor,self());
    }       
}}

或者代替管道

f.onComplete(new OnComplete<String>(){ public void onComplete(Throwable t, String result){ readFileActor.tell(result,self()); } }, context().system().dispatcher());

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39477447

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档