Skip to content

Commit

Permalink
add spark outputs, intellij artefact build output to gitignore;
Browse files Browse the repository at this point in the history
add command example to build sbt;
add partitioning, preprocessing scripts that execute Compressor and Partitioner JARs programmatically;
add README explaining how to build Compressor and Partitioner JARs using intellij;
use Scala Either to differentiate compression, partitioning of weighted vs. unweighted edge lists;
add compression tests; Partitioning tests remain to be done;
add test resources for CompressorTest;
  • Loading branch information
atrostan committed Nov 23, 2021
1 parent ac838d2 commit bfb95f7
Show file tree
Hide file tree
Showing 30 changed files with 26,901 additions and 449 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,9 @@ project/**/metals.sbt
/sandbox/
/logs/*
tmp

_SUCCESS
._SUCCESS.crc
MANIFEST.MF
.part-*.crc
part-*
59 changes: 25 additions & 34 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
import CommandExample._
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings


name := "akka-gps"

Expand All @@ -10,55 +11,45 @@ lazy val akkaVersion = "2.6.16"
lazy val sparkVersion = "3.1.2"

ThisBuild / assemblyMergeStrategy := {
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "unwanted.txt" => MergeStrategy.discard
case PathList("org", "apache", "hadoop", "yarn", "factories", "package-info.class") => MergeStrategy.discard
case PathList("org", "apache", "hadoop", "yarn", "provider", "package-info.class") => MergeStrategy.discard
case PathList("org", "apache", "hadoop", "util", "provider", "package-info.class") => MergeStrategy.discard
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
// case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case PathList("javax", "servlet", xs@_*) => MergeStrategy.first
case PathList(ps@_*) if ps.last endsWith ".html" => MergeStrategy.first
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "unwanted.txt" => MergeStrategy.discard
case PathList("org", "apache", "hadoop", "yarn", "factories", "package-info.class") => MergeStrategy.discard
case PathList("org", "apache", "hadoop", "yarn", "provider", "package-info.class") => MergeStrategy.discard
case PathList("org", "apache", "hadoop", "util", "provider", "package-info.class") => MergeStrategy.discard
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case PathList("org", "aopalliance", "intercept", "MethodInvocation.class") => MergeStrategy.first
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard

case x =>
val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
oldStrategy(x)
}

val myRun = taskKey[Unit]("...")

myRun := Def.taskDyn {
val appName = name.value
Def.task {
(Compile / runMain)
.toTask(s" com.softwaremill.MyMain $appName")
.value
}
}.value

lazy val `akka-gps` = project
.in(file("."))
.settings(multiJvmSettings: _*)
.settings(
// organization := "com.lightbend.akka.samples",
// organization := "com.lightbend.akka.samples",
scalaVersion := "2.12.15",
Compile / scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint", "-target:jvm-1.8"),
Compile / javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
// Compile / PB.targets := Seq(
// scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
// ),
run / javaOptions ++= Seq("-Xms128m", "-Xmx8G", "-XX:+UseG1GC", "-Djava.library.path=./target/native", "-Dlog4j.configuration=src/main/resources/log4j.properties"),
run / javaOptions ++= Seq("-Xms128m", "-Xmx8G", "-XX:+UseG1GC", "-Djava.library.path=./target/native", "-Dlog4j.configuration=src/main/resources/log4j.properties"),
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion % Test,
"org.scalatest" %% "scalatest" % "3.0.8" % Test,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-testkit" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion % Test,
"org.scalatest" %% "scalatest" % "3.0.8" % Test,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-testkit" % akkaVersion % Test,
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.scala-graph" %% "graph-core" % "1.12.5"
Expand All @@ -71,7 +62,7 @@ lazy val `akka-gps` = project
commands ++= Seq(hello, changeColor, partitionBySource1D)

)
.configs (MultiJvm)
.configs(MultiJvm)



47 changes: 47 additions & 0 deletions scripts/spark/partition.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/bin/bash

function parse_yaml {
local prefix=$2
local s='[[:space:]]*' w='[a-zA-Z0-9_]*' fs=$(echo @|tr @ '\034')
sed -ne "s|^\($s\):|\1|" \
-e "s|^\($s\)\($w\)$s:$s[\"']\(.*\)[\"']$s\$|\1$fs\2$fs\3|p" \
-e "s|^\($s\)\($w\)$s:$s\(.*\)$s\$|\1$fs\2$fs\3|p" $1 |
awk -F$fs '{
indent = length($1)/2;
vname[indent] = $2;
for (i in vname) {if (i > indent) {delete vname[i]}}
if (length($3) > 0) {
vn=""; for (i=0; i<indent; i++) {vn=(vn)(vname[i])("_")}
printf("%s%s%s=\"%s\"\n", "'$prefix'",vn, $2, $3);
}
}'
}

akka_gps_home="/home/atrostan/Workspace/repos/akka-gps"
partitionDriverJarPath="${akka_gps_home}/out/artifacts/akka_gps_jar/akka-gps.jar"
graphDir="${akka_gps_home}/src/main/resources/graphs/email-Eu-core"
graphPath="\"${graphDir}/orig.net\""
graphYaml="${graphDir}/stats.yml"
outputPartitionsPath="\"${graphDir}/partitions\""
sep="\" \""
threshold=100
numPartitions=4
partitioners=(
1 # 1d
2 # 2d
3 # Hybrid
)
partitionBys=(
"\"true\"" # partition by source
"\"false\"" # partition by destination
)
eval $(parse_yaml $graphYaml)

# programmatically partition an input graph
for partitioner in "${partitioners[@]}"; do
for partitionBySource in "${partitionBys[@]}"; do
javaJarStr="java -jar ${partitionDriverJarPath} --nNodes ${Nodes} --nEdges ${Edges} --inputFilename ${graphPath} --outputDirectoryName ${outputPartitionsPath} --sep ${sep} --partitioner ${partitioner} --threshold ${threshold} --numPartitions ${numPartitions} --partitionBySource ${partitionBySource}"
echo ${javaJarStr}
# eval ${javaJarStr}
done
done
107 changes: 107 additions & 0 deletions scripts/spark/preprocess.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/bin/bash

helpFunction()
{
echo ""
echo "Usage: $0 -c compressParam -p partitionParam"
echo -e "\t-c 1 or 0. Whether to compress or not"
echo -e "\t-p 1 or 0. Whether to partition or not"
exit 1 # Exit script after printing help
}

function parse_yaml
{
local prefix=$2
local s='[[:space:]]*' w='[a-zA-Z0-9_]*' fs=$(echo @|tr @ '\034')
sed -ne "s|^\($s\):|\1|" \
-e "s|^\($s\)\($w\)$s:$s[\"']\(.*\)[\"']$s\$|\1$fs\2$fs\3|p" \
-e "s|^\($s\)\($w\)$s:$s\(.*\)$s\$|\1$fs\2$fs\3|p" $1 |
awk -F$fs '{
indent = length($1)/2;
vname[indent] = $2;
for (i in vname) {if (i > indent) {delete vname[i]}}
if (length($3) > 0) {
vn=""; for (i=0; i<indent; i++) {vn=(vn)(vname[i])("_")}
printf("%s%s%s=\"%s\"\n", "'$prefix'",vn, $2, $3);
}
}'
}

while getopts "c:p:" opt
do
case "$opt" in
c ) compressParam="$OPTARG" ;;
p ) partitionParam="$OPTARG" ;;
? ) helpFunction ;; # Print helpFunction in case parameter is non-existent
esac
done

# Print helpFunction in case parameters are empty
if [ -z "$compressParam" ] || [ -z "$partitionParam" ]
then
echo "Some or all of the parameters are empty";
helpFunction
fi

# Compress a graph
# Then, partition the graph using all partition algorithms
akka_gps_home="/home/atrostan/Workspace/repos/akka-gps"

partitionDriverJarPath="${akka_gps_home}/out/artifacts/akka_gps_partitioner_jar/akka-gps.jar"
compressorDriverJarPath="${akka_gps_home}/out/artifacts/akka_gps_compressor_jar/akka-gps.jar"
graphName="email-Eu-core"
graphDir="${akka_gps_home}/src/main/resources/graphs/${graphName}"

# original, uncompressed edgelist
origGraphPath="\"${graphDir}/origWeighted.net\""

# directory that will store the compressed edgelist
compressedDirName="compressed"
outputFilename="\"${graphDir}/${compressedDirName}\""
compressedGraphPath="\"${graphDir}/${compressedDirName}/part-00000\""

sep="\" \""
isWeighted="\"true\""
graphYaml="${graphDir}/stats.yml"
outputPartitionsPath="\"${graphDir}/partitions\""
threshold=100
numPartitions=4

partitioners=(
1 # 1d
2 # 2d
3 # Hybrid
)

partitionBys=(
"\"true\"" # partition by source
"\"false\"" # partition by destination
)

# compress the graph
compressJavaJarStr="java -jar ${compressorDriverJarPath} --inputFilename ${origGraphPath} --outputFilename ${outputFilename} --sep ${sep} --isWeighted ${isWeighted}"
# echo ${compressJavaJarStr}

if [ $compressParam -eq 1 ]
then
eval ${compressJavaJarStr}
fi
# read nNodes, nEdges from stats.yml
eval $(parse_yaml $graphYaml)

if [ $partitionParam -eq 1 ]
then
# programmatically partition an input graph
for partitioner in "${partitioners[@]}"; do
for partitionBySource in "${partitionBys[@]}"; do
printf "\n"
logStr="Partitioning ${compressedGraphPath} with ${Nodes} Nodes and ${Edges} Edges.\nPartitioner: ${partitioner}, Partitioning By Source: ${partitionBySource}, Number of Partitions: ${numPartitions}, Threshold: ${threshold}, isWeighted: ${isWeighted}"
echo -e ${logStr}
printf "\n"

javaJarStr="java -jar ${partitionDriverJarPath} --nNodes ${Nodes} --nEdges ${Edges} --inputFilename ${compressedGraphPath} --outputDirectoryName ${outputPartitionsPath} --sep ${sep} --partitioner ${partitioner} --threshold ${threshold} --numPartitions ${numPartitions} --partitionBySource ${partitionBySource} --isWeighted ${isWeighted}"
# echo ${javaJarStr}
eval ${javaJarStr}
done
done
fi
32 changes: 0 additions & 32 deletions src/main/resources/graphs/8rmat/orig.net

This file was deleted.

2 changes: 2 additions & 0 deletions src/main/resources/graphs/8rmat/stats.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Nodes: 8
Edges: 32
Loading

0 comments on commit bfb95f7

Please sign in to comment.