Personalized Pagerank With Spark

i'm trying to compute personalized pagerank on 200M edges graph with spark. I was able to compute it for a single node but i can't do it for multiple nodes.

This is the code i wrote so far:

val ops : Broadcast[GraphOps[Int, Int]] = sc.broadcast(new GraphOps(graph))
vertices.map(vertex => (vertex._1, ops.value.personalizedPageRank(vertex._1, 0.00001, 0.2))) .mapValues(_.vertices.filter(_._2 > 0)) .mapValues(_.sortBy(_._2, false)) .mapValues(_.mapValues(d => "%.12f".format(d))) .mapValues(_.take(1000)) .mapValues(_.mkString("\t")) .saveAsTextFile("hdfs://localhost:9000/user/spark/out/vertices-ppr")

Where vertices is a VertexRDD[Int] and is a subset of the graph vertices. If it is small (like 1,2 or 10 elements) the code works nicelly but if it is bigger (100 elements) the code just freeze on job 2 after the first was completed. The last lines of the console are:

INFO Got job 13 (reduce at VertexRDDImpl.scala:88) with 22 output partitions

INFO Final stage: ResultStage 63 (reduce at VertexRDDImpl.scala:88)

INFO Parents of final stage: List(ShuffleMapStage 1, ShuffleMapStage 3, ShuffleMapStage 62)

INFO Missing parents: List(ShuffleMapStage 3, ShuffleMapStage 62)

INFO Removed broadcast_4_piece0 on localhost:33231 in memory (size: 2.7 KB, free: 22.7 GB)

Here is a screenshot of spark console :Jobs console

Reset to default

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Sign up or log in

Sign up using Google Sign up using Facebook Sign up using Email and Password

Post as a guest

By clicking “Post Your Answer”, you agree to our terms of service, privacy policy and cookie policy

You Might Also Like