Skip to content

Commit

Permalink
add 1d, 2d, hybrid partitioners; driver
Browse files Browse the repository at this point in the history
  • Loading branch information
atrostan committed Nov 4, 2021
1 parent f55d963 commit 6fe8351
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ ThisBuild / assemblyMergeStrategy := {
case PathList("org", "apache", "hadoop", "util", "provider", "package-info.class") => MergeStrategy.discard
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case PathList("org", "aopalliance", "intercept", "MethodInvocation.class") => MergeStrategy.first
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case x =>
val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
oldStrategy(x)
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/com/preprocessing/edgeList/Compressor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ runMain com.preprocessing.edgeList.Compressor --nNodes 5 --nEdges 7 --inputFilen
runMain com.preprocessing.edgeList.Compressor --nNodes 1005 --nEdges 25571 --inputFilename "src/main/resources/graphs/email-Eu-core/orig.net" --outputFilename "src/main/resources/graphs/email-Eu-core/reset" --sep " "
runMain com.preprocessing.partitioning.Driver --nNodes 1005 --nEdges 24929 --inputFilename "src/main/resources/graphs/email-Eu-core/reset/part-00000" --outputFilename "src/main/resources/graphs/email-Eu-core/tmp/" --sep " " --threshold 100
Sort an input edge list by ascending source id. For each source id, the destination ids are also sorted in
ascending order.
Self directed edges are removed.
Expand Down
80 changes: 80 additions & 0 deletions src/main/scala/com/preprocessing/partitioning/Driver.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.preprocessing.partitioning

//import akka.protobufv3.internal.UInt32Value
//import jdk.nashorn.internal.ir.debug.ObjectSizeCalculator.getObjectSize
import com.preprocessing.partitioning.Util.{createPartitionDir, hybridPartitioningPreprocess, parseArgs, readEdgeList}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import java.lang.instrument.Instrumentation

// runMain com.preprocessing.partitioning.Driver --nNodes 1005 --nEdges 24929 --inputFilename "src/main/resources/graphs/email-Eu-core/reset/part-00000" --outputDirectoryName "src/main/resources/graphs/email-Eu-core/partitioned/hybrid" --sep " " --partitioner 3 --threshold 100 --numPartitions 4 --partitionBy 0

object Driver {

def main(args: Array[String]): Unit = {

val appName: String = "edgeList.partitioning.Driver"

val conf = new SparkConf()
.setAppName(appName)
.setMaster("local[*]")
val sc = new SparkContext(conf)

val pArgs = parseArgs(args)
val threshold = pArgs.threshold
val infile = pArgs.inputFile
val sep = pArgs.separator
val partitioner = pArgs.partitioner
val numPartitions = pArgs.numPartitions
val outdir = pArgs.outputDirectory
val partitionBy = pArgs.partitionBy

println("reading edge list...")
val edgeList = readEdgeList(sc, infile, sep)

partitioner match {
case 1 =>
println("1D Partitioning")
val partitionDir = outdir + "/1d/"
createPartitionDir(partitionDir, )
val partitioner = new OneDimPartitioner(numPartitions, partitionBy)


case 2 =>
println("2D Partitioning")
val partitionDir = outdir + "/2d/"
val partitioner = new TwoDimPartitioner(numPartitions, partitionBy)

case 3 =>
println("Hybrid Partitioning")
val partitionDir = outdir + "/hybrid/"
val partitioner = new HybridCutPartitioner(numPartitions, partitionBy)

try {
val flaggedEdgeList = hybridPartitioningPreprocess(edgeList, threshold)
flaggedEdgeList
.partitionBy(partitioner)
.mapPartitionsWithIndex {
(index, itr) => itr.toList.map(x => x + "#" + index).iterator
}.saveAsTextFile(partitionDir)
} catch {
case e: org.apache.hadoop.mapred.FileAlreadyExistsException => println("File already exists, please delete the existing file")
}
}
sc.stop()

// val inEdgeFile = "src/main/resources/graphs/email-Eu-core/reset/inedge"
// val degreeMap = "src/main/resources/graphs/email-Eu-core/reset/degreeMap"
//
// inEdgeList.map(e => s"${e._1} ${e._2}").coalesce(1, false).saveAsTextFile(inEdgeFile)
// sc.parallelize(inDegrees.toSeq).map(e => s"${e._1} ${e._2}").coalesce(1, false).saveAsTextFile(degreeMap)


// flaggedEdgeList
// .partitionBy(new HybridCutPartitioner(numPartitions))
// .map(el => el._1)
// .saveAsTextFile(outdir)

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.preprocessing.partitioning

import org.apache.spark.Partitioner

class HybridCutPartitioner(nPartitions: Int, partitionBySource: Boolean) extends Partitioner {
val numPartitions = nPartitions

override def getPartition(key: Any): Int = key match {
case ((u: Int, v: Int), vertexIsHighDegree: Boolean) =>
if (partitionBySource) {
val uIsHighDegree = vertexIsHighDegree
if (uIsHighDegree) v % numPartitions
else u % numPartitions
} else {
val vIsHighDegree = vertexIsHighDegree
if (vIsHighDegree) u % numPartitions
else v % numPartitions
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.preprocessing.partitioning

import org.apache.spark.Partitioner

class OneDimPartitioner(nPartitions: Int, partitionBySource: Boolean) extends Partitioner {
val numPartitions = nPartitions

override def getPartition(key: Any): Int = key match {
case (u: Int, v: Int) =>
if (partitionBySource) u % numPartitions
else v % numPartitions
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.preprocessing.partitioning

class PartitionerArgs(
nNodes: Int,
nEdges: Int,
infile: String,
outdir: String,
sep: String,
part: Int,
thresh: Int,
nPartitions: Int,
pb: Boolean
) {
val numNodes = nNodes
val numEdges = nEdges
val inputFile = infile
val outputDirectory = outdir
val separator = sep
val partitioner = part
val threshold = thresh
val numPartitions = nPartitions
val partitionBy = pb

override def toString() : String = {
var argStr = ""
argStr += s"$numNodes\t"
argStr += s"$numEdges\t"
argStr += s"$inputFile\t"
argStr += s"$outputDirectory\t"
argStr += s"$separator\t"
argStr += s"$partitioner\t"
argStr += s"$threshold\t"
argStr += s"$numPartitions\t"
argStr += s"$partitionBy\t"
argStr
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.preprocessing.partitioning

import org.apache.spark.Partitioner

class TwoDimPartitioner(nPartitions: Int, partitionBySource: Boolean) extends Partitioner {
val numPartitions = nPartitions

override def getPartition(key: Any): Int = key match {
case (x: Int, y: Int) =>
val partitioningMatrixSideLength: Int = math.sqrt(numPartitions).toInt
val sx: Int = x % partitioningMatrixSideLength
val sy: Int = y % partitioningMatrixSideLength
if (partitionBySource) sx * partitioningMatrixSideLength + sy
else sx + partitioningMatrixSideLength * sy
}
}
141 changes: 141 additions & 0 deletions src/main/scala/com/preprocessing/partitioning/Util.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package com.preprocessing.partitioning

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import java.nio.file.Files.createDirectory
import java.io.{File, IOException}
import java.nio.file.{Path, Paths}
import org.apache.commons.io.FileUtils.cleanDirectory

object Util {
/**
* Parse
* @param args
* @return a PartitionerArgs object: a Wrapper for the arguments the partitioner driver expects
*/
def parseArgs(args: Array[String]): PartitionerArgs = {

var nNodes = 0
var nEdges = 0
var infile = ""
var outdir = ""
var sep = ""
var partitioner = 0 // 1 = 1d, 2 = 2d, 3 = hybrid-cut
val threshold = 100
var numPartitions = 0
var partitionBy = false // 0: "source", 1: "destination"

// TODO; add an additional argument and functionality to handle weighted graphs
args.sliding(2, 2).toList.collect {
case Array("--nNodes", argNNodes: String) => nNodes = argNNodes.toInt
case Array("--nEdges", argNEdges: String) => nEdges = argNEdges.toInt
case Array("--graphDir", argInFile: String) => infile = argInFile
case Array("--outputDirectoryName", argOutDir: String) => outdir = argOutDir
case Array("--sep", argSep: String) => sep = argSep
case Array("--partitioner", argPartitioner: String) => partitioner = argPartitioner.toInt
case Array("--numPartitions", argNumPartitions: String) => numPartitions = argNumPartitions.toInt
case Array("--partitionBy", argPartitionBy: String) => partitionBy = argPartitionBy.toBoolean
}
val partitionerArgs = new PartitionerArgs(
nNodes,
nEdges,
infile,
outdir,
sep,
partitioner,
threshold,
numPartitions,
partitionBy
)
partitionerArgs
}

/**
*
* @param edgeList
* @param threshold
* @return
*/
def hybridPartitioningPreprocess(
edgeList: RDD[(Int, Int)],
threshold: Int
): RDD[(((Int, Int), Boolean), Long)] = {
edgeList
.map(e => (e._2, e._1)) // reverse directions of edges; i.e. convert from out-edge list to in-edge list
.countByKey() // count number of in-edges per vertex
.filter(p => p._2 > threshold)
.map(p => p._1).toSet //
val inEdgeList = edgeList.map(e => (e._2, e._1))
val inDegrees: scala.collection.Map[Int, Long] = inEdgeList.countByKey()
val highInDegreeVertices: Set[Int] = inDegrees.filter(p => p._2 > threshold).map(p => p._1).toSet
edgeList.map(e => (e, highInDegreeVertices.contains(e._2))).zipWithUniqueId()
}


/**
*
* @param sc
* @param infile
* @param sep
* @return
*/
def readEdgeList(sc: SparkContext, infile: String, sep: String): RDD[(Int, Int)] = {
val distFile: RDD[String] = sc.textFile(infile)

val edgeList: RDD[(Int, Int)] = distFile
.filter(s => !s.contains("#")) // ignore comments
.map(s => {
val split = s.split(sep)
(split(0).toInt, split(1).toInt)
})
edgeList
}

/**
* Try to create a directory that will store the partitions of a graph.
* If the directory already exists, optionally erase all contents.
*
* @param dirPath
* @param flush
*/
def createPartitionDir(dirPath: String, flush: Boolean) = {
val directoryPath: Path = Paths.get(dirPath)
try {
createDirectory(directoryPath)
} catch {
case e: IOException =>
println(s"$dirPath already Exists.")
if (flush) {
println(s"Cleaning $dirPath")
val dirPathFile: File = new File(dirPath)
cleanDirectory(dirPathFile)
}
}
}

}
























0 comments on commit 6fe8351

Please sign in to comment.