4Manuals

  • PDF Cloud HOME

使用ThreadPool的Phaser无法正确到达并等待。我该如何解决? Download

    春季启动-Couchbase AbstractCouchbaseConfiguration-如何避免覆盖不必要的方法 我不知道(未解决的编译问题:) 加载数据并推送到未显示的RecyclerView中 调用Model类的函数时出现java.lang.NullPointerException 字符串数组在for循环中初始化时忽略索引0 Spring MVC @ModelAttribute未填充AJAX发布请求 需要建议在Java中使用哪个数据库/存储来存储RaftLogs(实现Raft Consensus Algorithm) Flutter:Java使用或覆盖已弃用的API @ EnableOAuth2Sso和@EnableResourceServer(同一应用程序中的客户端和资源行为) 中断可运行线程的问题

可在以下位置找到有关如何使用移相器以及遇到的问题的基本信息的仓库:https://github.com/hipy/phaser/tree/master/src

我一直在努力使使用Phaser的ThreadPools更加有效地实现Dijkstra算法。我使用循环进行了很多迭代,每次迭代都需要一个Phaser,以等待ThreadPool中的线程完成后再继续当前的迭代。

我遇到了问题,移相器无法正确等待。当我使用ArriveAndDeregister()时,移相器会在每个线程完成后进入终止状态。当我调用Arrive()时,未到达的参与方的数量不会减少,因此迭代会陷入困境。

下面的所有代码都在一次调用的apply()方法中运行。

下面的这段代码为线程创建了要执行的任务。

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool((numberOfThreads));
        Phaser phaser = new Phaser();

        //Thread class
        class ClosestNodeTask implements Runnable {

            private int start;
            private int end;
            private Phaser phaser;

            public ClosestNodeTask(int start, int end, Phaser phaser) {
                this.start = start;
                this.end = end;
                this.phaser = phaser;
            }

            @Override
            public void run() {
                getNodeShortestDistanced(start, end, phaser); //method calls phaser.arrive() when done
            }
        }

        for (int t = 0; t < numberOfThreads; t++) {
            if (nodesModulo > 0 && numberOfThreads == (t + 1)) {
                start = nodesPerThread * (t);
                end = nodesPerThread * (t + 1) + nodesModulo;
              tasks[t] = new ClosestNodeTask(start, end, phaser);
            } else {
                start = nodesPerThread * t;
                end = nodesPerThread * (t + 1);
                tasks[t] = new ClosestNodeTask(start, end, phaser);
            }
        }

下面的代码在for循环中的每次迭代中执行。在这种情况下,有30.000次迭代:

     phaser.register(); //register main thread
     for(int t = 0; t < tasks.length; t++) {
        phaser.register();
     }
    System.out.println("Phaser unarrived party size is now: " + phaser.getUnarrivedParties());    

跳过一些算法代码,以下代码在for循环中执行,启动线程并等待其完成:

  for(int t = 0; t < tasks.length; t++) {
      executor.execute(tasks[t]);
  }

 phaser.arriveAndAwaitAdvance();

输出如下:

Phasecount: 0
Phaser unarrived party size is now: 3
Task size: 2
Adding: 613 //Next closest node in a sub-group, result of work done in a thread
Adding: 2870
all tasks done
-----------------done-------------
Phasecount: 1
Phaser unarrived party size is now: 6
Task size: 2
Adding: 1
Adding: 2870

第一阶段执行,进行完整的迭代。第二阶段陷入困境。未到达的参与方的数量为6。即使我调用了phaser.arrive(),也有3个新的参与方,显然还有3个未注册的未参加方。另外ArriveAndAwaitAdvance()没有等待,因为在下一次迭代中,有6个未到达的参与方,而不是3个。

我尝试使用ArrivalAndDeregister(),但这会导致阶段终止(phasecount具有较大的负值)。

我该如何解决?我不想终止一个阶段,但我确实希望在每次迭代中都将各方注册。

谢谢!

0 个答案:

没有答案



Similar searches
    如果单元格不为空且条件格式为红色,则删除或隐藏行 使用外部设备获取指纹时,应用程序崩溃 防止在AIX Unix中创建空的Tar文件 如何将计数结果插入表 从mysql表中选择最后n行而不使用order by