Skip to content

Commit

Permalink
add spark sql;
Browse files Browse the repository at this point in the history
parse paths of worker/partitions nodes from yml;
add rmat, weighted email graph;
add types for rdd row for mains, mirrors;
add aggregation driver that unions partitioned edgelists,
assigns mains, mirrors to partitions,
and saves to partition paths;
add rdd caching to compression script;
  • Loading branch information
atrostan committed Nov 27, 2021
1 parent 2c7a1f0 commit 5e676ce
Show file tree
Hide file tree
Showing 14 changed files with 718 additions and 77 deletions.
40 changes: 28 additions & 12 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ lazy val sparkVersion = "3.1.2"

ThisBuild / assemblyMergeStrategy := {
// case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case PathList("javax", "servlet", xs@_*) => MergeStrategy.first
case PathList(ps@_*) if ps.last endsWith ".html" => MergeStrategy.first
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "application.conf" => MergeStrategy.concat
case "unwanted.txt" => MergeStrategy.discard
case PathList("org", "apache", "hadoop", "yarn", "factories", "package-info.class") => MergeStrategy.discard
case PathList("org", "apache", "hadoop", "yarn", "provider", "package-info.class") => MergeStrategy.discard
case PathList("org", "apache", "hadoop", "util", "provider", "package-info.class") => MergeStrategy.discard
case PathList("org", "apache", "hadoop", "yarn", "factories", "package-info.class") =>
MergeStrategy.discard
case PathList("org", "apache", "hadoop", "yarn", "provider", "package-info.class") =>
MergeStrategy.discard
case PathList("org", "apache", "hadoop", "util", "provider", "package-info.class") =>
MergeStrategy.discard
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case PathList("org", "aopalliance", "intercept", "MethodInvocation.class") => MergeStrategy.first

Expand All @@ -32,12 +35,25 @@ lazy val `akka-gps` = project
.settings(
// organization := "com.lightbend.akka.samples",
scalaVersion := "2.12.15",
Compile / scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint", "-target:jvm-1.8"),
Compile / scalacOptions ++= Seq(
"-deprecation",
"-feature",
"-unchecked",
"-Xlog-reflective-calls",
"-Xlint",
"-target:jvm-1.8"
),
Compile / javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
// Compile / PB.targets := Seq(
// scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
// ),
run / javaOptions ++= Seq("-Xms128m", "-Xmx8G", "-XX:+UseG1GC", "-Djava.library.path=./target/native", "-Dlog4j.configuration=src/main/resources/log4j.properties"),
run / javaOptions ++= Seq(
"-Xms128m",
"-Xmx8G",
"-XX:+UseG1GC",
"-Djava.library.path=./target/native",
"-Dlog4j.configuration=src/main/resources/log4j.properties"
),
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-typed" % akkaVersion,
Expand All @@ -50,15 +66,15 @@ lazy val `akka-gps` = project
"com.typesafe.akka" %% "akka-persistence-testkit" % akkaVersion % Test,
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-avro" % sparkVersion,
"org.yaml" % "snakeyaml" % "1.29",
"org.scala-graph" %% "graph-core" % "1.12.5"
),
run / fork := false,
Global / cancelable := false,
// disable parallel tests
Test / parallelExecution := false,
licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))),
licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")))
)
.configs(MultiJvm)



14 changes: 10 additions & 4 deletions scripts/spark/preprocess.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,28 @@ akka_gps_home="/home/atrostan/Workspace/repos/akka-gps"

partitionDriverJarPath="${akka_gps_home}/out/artifacts/akka_gps_partitioner_jar/akka-gps.jar"
compressorDriverJarPath="${akka_gps_home}/out/artifacts/akka_gps_compressor_jar/akka-gps.jar"
graphName="email-Eu-core"

# directory that stores the graph

graphName="8rmat"
graphDir="${akka_gps_home}/src/main/resources/graphs/${graphName}"

# original, uncompressed edgelist
origGraphPath="\"${graphDir}/origWeighted.net\""
origGraphPath="\"${graphDir}/orig.net\""

# directory that will store the compressed edgelist
compressedDirName="compressed"
outputFilename="\"${graphDir}/${compressedDirName}\""
compressedGraphPath="\"${graphDir}/${compressedDirName}/part-00000\""

sep="\" \""
isWeighted="\"true\""

# whether the original edge list stores weights or not (true or false)
isWeighted="\"false\""

graphYaml="${graphDir}/stats.yml"
outputPartitionsPath="\"${graphDir}/partitions\""
threshold=100
threshold=7
numPartitions=4

partitioners=(
Expand Down
64 changes: 64 additions & 0 deletions src/main/resources/graphs/8rmat/orig.net
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
0 1
0 2
0 3
0 6
0 32
0 34
0 48
0 57
1 0
1 4
1 8
1 17
1 32
1 34
1 40
1 42
2 32
3 0
3 32
3 48
3 56
4 20
4 32
5 0
8 0
8 2
8 3
8 4
8 32
8 36
8 48
9 0
9 1
9 36
9 40
10 50
12 48
14 0
16 0
16 2
16 32
16 43
16 48
17 4
17 33
17 41
24 38
24 51
32 0
32 3
33 4
33 8
34 0
35 32
36 0
36 38
40 0
40 4
41 36
44 15
48 0
48 8
48 17
50 0
4 changes: 2 additions & 2 deletions src/main/resources/graphs/8rmat/stats.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Nodes: 8
Edges: 32
Nodes: 33
Edges: 64
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Nodes: 986
Edges: 24929
5 changes: 5 additions & 0 deletions src/main/resources/paths.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
workers:
- src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p0
- src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p1
- src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p2
- src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p3
15 changes: 15 additions & 0 deletions src/main/scala/com/Typedefs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,19 @@ object Typedefs {
def empty: MemberSet = collection.mutable.Set.empty
def apply(ms: (Member)*): MemberSet = collection.mutable.Set(ms: _*)
}

// an rdd row that represents all the information we need to instantiate a main vertex in akka cluster sharding
type MainRow = (
(Int, Int), // (vid, pid)
Set[_ <: Int], // partitions that contain mirrors
List[(Int, Int)], // list of outgoing edges on partition pid
Int // indegree in partition pid
)

type MirrorRow = (
(Int, Int), // (vid, pid)
Int, // partition that contains main
List[(Int, Int)], // list of outgoing edges on partition pid
Int // indegree in partition pid
)
}
5 changes: 5 additions & 0 deletions src/main/scala/com/preprocessing/aggregation/Aggregator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.preprocessing.aggregation

class Aggregator(numPartitions: Int, partitionRoot: String, isWeighted: Boolean, sep: String) {

}
66 changes: 66 additions & 0 deletions src/main/scala/com/preprocessing/aggregation/Driver.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.preprocessing.aggregation

import com.preprocessing.partitioning.Util.{getDegreesByPartition, partitionAssignment, partitionMainsDF, partitionMirrorsDF, readMainPartitionDF, readMirrorPartitionDF, readPartitionsAndJoin, readWorkerPathsFromYaml}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}


// driver program to test partition aggregation in preparation for ingestion
// akka
object Driver {

def main(args: Array[String]): Unit = {


// local spark config
val appName: String = "preprocessing.aggregation.Driver"
val conf = new SparkConf()
.setAppName(appName)
.setMaster("local[*]")
val sc = new SparkContext(conf)

val spark: SparkSession = SparkSession.builder.master("local[*]").getOrCreate

val hadoopConfig = sc.hadoopConfiguration
hadoopConfig.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
hadoopConfig.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)

val numPartitions = 4

val workerPaths = "src/main/resources/paths.yaml"

// a map between partition ids to location on hdfs of mains, mirrors for that partition
val partitionMap = readWorkerPathsFromYaml(workerPaths: String)

val path = "src/main/resources/graphs/8rmat/partitions/hybrid/bySrc"
val mainsPartitionPath = path + "/mains"
val mirrorsPartitionPath = path + "/mirrors"
val sep = " "

// (partition id, (source, destination, weight))
val edgeList: RDD[(Int, (Int, Int, Int))] = readPartitionsAndJoin(sc, path, numPartitions, sep)

val (degrees, outNeighbors, inDegreesPerPartition) = getDegreesByPartition(edgeList)

val (mains, mirrors) = partitionAssignment(degrees, outNeighbors, inDegreesPerPartition)

// save to file
partitionMainsDF(mains, spark, partitionMap)
partitionMirrorsDF(mirrors, spark, partitionMap)

// read for testing

for ((pid, path) <- partitionMap) {
println(s"Reading partition ${pid} in ${path}")
val mains = readMainPartitionDF(path+"/mains", spark)
val mirrors = readMirrorPartitionDF(path+"/mirrors", spark)
println("mains")
mains.foreach(m => println(s"\t$m"))
println("mirrors")
mirrors.foreach(m => println(s"\t$m"))
}

sc.stop()
}
}
6 changes: 5 additions & 1 deletion src/main/scala/com/preprocessing/edgeList/Compressor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class Compressor(edgeList: EitherEdgeRDD) {
*/
def compressRdd(rdd: RDD[(Long, (Int, Int))]): RDD[(Long, (Int, Int))] = {
// get source ids

rdd.cache()

val sources: RDD[Int] = rdd
.map(row => row._2._1)
.distinct()
Expand All @@ -59,13 +62,14 @@ class Compressor(edgeList: EitherEdgeRDD) {
val vertexMap = sourceMap.union(unseenDestinations)
nNodes = vertexMap.count()
println(s"Number of nodes in compressed representation: ${nNodes}")

vertexMap.cache()
// debug
// persist(vertexMap, "src/main/resources/graphs/email-Eu-core/map", 1)

val sourcesFirst: RDD[(Int, (Int, Long))] = rdd
.map(row => (row._2._1, (row._2._2, row._1)))

sourcesFirst.cache()
// remap the edges of the graph using the vertexmap
val res = sourcesFirst
.join(vertexMap)
Expand Down
24 changes: 18 additions & 6 deletions src/main/scala/com/preprocessing/edgeList/Driver.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package com.preprocessing.edgeList

import com.preprocessing.partitioning.Util.readEdgeList
import com.Typedefs.{UnweightedEdge, WeightedEdge}
import com.preprocessing.partitioning.Util.{readEdgeList, saveUnweightedRDDAsDF, saveWeightedRDDAsDF}
import org.apache.spark.{SparkConf, SparkContext}
import java.io.{PrintWriter, File}

import java.io.{File, PrintWriter}
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.sql.types.{LongType, IntegerType, StringType, StructField, StructType}

object Driver {
"""
Expand All @@ -13,7 +17,7 @@ runMain com.preprocessing.edgeList.Compressor
--outputFilename: output path
--sep: separator used in input edge list (e.g. " ", ",", "\t")
runMain com.preprocessing.edgeList.Driver --inputFilename "src/main/resources/graphs/email-Eu-core/origWeighted.net" --outputFilename "src/main/resources/graphs/email-Eu-core/compressed" --sep " " --isWeighted "true"
runMain com.preprocessing.edgeList.Driver --inputFilename "src/main/resources/graphs/email-Eu-core/orig.net" --outputFilename "src/main/resources/graphs/email-Eu-core/compressed" --sep " " --isWeighted "false"
Sort an input edge list by ascending source id. For each source id, the destination ids are also sorted in
ascending order.
Expand Down Expand Up @@ -73,6 +77,8 @@ The edgeList.Compressor will produce
.setMaster("local[*]")
val sc = new SparkContext(conf)

val spark: SparkSession = SparkSession.builder.master("local[*]").getOrCreate

val hadoopConfig = sc.hadoopConfiguration
hadoopConfig.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
hadoopConfig.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)
Expand All @@ -90,30 +96,36 @@ The edgeList.Compressor will produce
case Array("--isWeighted", argWt: String) => isWeighted = argWt.toBoolean
}



println("reading edge list...")
val edgeList = readEdgeList(sc, infile, sep, isWeighted)
println("compressing...")
val c = new Compressor(edgeList)

val compressed = c.compress()
exportYML(infile, c)

try {
compressed match {
case Left(compressed) => // RDD[WeightedEdge]
compressed
.sortBy(r => (r._2._1, r._2._2, r._2._3))
.map(r => s"${r._2._1} ${r._2._2} ${r._2._3}")
.coalesce(1, false) // TODO; partition into multiple files/numPartitions HERE
.coalesce(1, false)
.saveAsTextFile(outfile)
val rdd = compressed
.sortBy(r => (r._2._1, r._2._2, r._2._3))
saveWeightedRDDAsDF(rdd, spark, outfile)

case Right(compressed) => // RDD[UnweightedEdge]
compressed
.sortBy(r => (r._2._1, r._2._2))
.map(r => s"${r._2._1} ${r._2._2}")
.coalesce(1, false)
.saveAsTextFile(outfile)
val rdd = compressed
.sortBy(r => (r._2._1, r._2._2))
saveUnweightedRDDAsDF(rdd, spark, outfile)

}
} catch {
case e: org.apache.hadoop.mapred.FileAlreadyExistsException =>
Expand Down
Loading

0 comments on commit 5e676ce

Please sign in to comment.