Skip to content

Commit

Permalink
[scala] Add Scalastyle, use scalastyle-config.xml from Spark
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Sep 22, 2014
1 parent fd28098 commit 0385651
Show file tree
Hide file tree
Showing 32 changed files with 627 additions and 386 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.cache
scalastyle-output.xml
.classpath
.idea
.metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ object ConnectedComponents {
// assign the initial components (equal to the vertex id)
val vertices = getVerticesDataSet(env).map { id => (id, id) }

// undirected edges by emitting for each input edge the input edges itself and an inverted version
// undirected edges by emitting for each input edge the input edges itself and an inverted
// version
val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }

// open a delta iteration
Expand Down Expand Up @@ -106,20 +107,22 @@ object ConnectedComponents {

private def parseParameters(args: Array[String]): Boolean = {
if (args.length > 0) {
fileOutput = true
fileOutput = true
if (args.length == 4) {
verticesPath = args(0)
edgesPath = args(1)
outputPath = args(2)
maxIterations = args(3).toInt
} else {
System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>")
System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path>" +
" <max number of iterations>")
false
}
} else {
System.out.println("Executing Connected Components example with built-in default data.")
System.out.println(" Provide parameters to read input data from a file.")
System.out.println(" Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>")
System.out.println(" Usage: ConnectedComponents <vertices path> <edges path> <result path>" +
" <max number of iterations>")
}
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,111 +65,113 @@ import scala.collection.mutable
*
*/
object EnumTrianglesBasic {

def main(args: Array[String]) {
if (!parseParameters(args)) {
return
}

// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// read input data
val edges = getEdgeDataSet(env)

// project edges by vertex id
val edgesById = edges map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1) )

val triangles = edgesById
// build triads
.groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder())
// filter triads
.join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t }

// emit result
if (fileOutput) {
triangles.writeAsCsv(outputPath, "\n", ",")
} else {
triangles.print()
}

// execute program
env.execute("TriangleEnumeration Example")
}

// *************************************************************************
// USER DATA TYPES
// *************************************************************************

case class Edge(v1: Int, v2: Int) extends Serializable
case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable


// *************************************************************************
// USER FUNCTIONS
// *************************************************************************

/**
* Builds triads (triples of vertices) from pairs of edges that share a vertex.
* The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId.
* Assumes that input edges share the first vertex and are in ascending order of the second vertex.
*/
class TriadBuilder extends GroupReduceFunction[Edge, Triad] {

val vertices = mutable.MutableList[Integer]()

override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = {

// clear vertex list
vertices.clear()

// build and emit triads
for(e <- edges.asScala) {

// combine vertex with all previously read vertices
for(v <- vertices) {
out.collect(Triad(e.v1, v, e.v2))
}
vertices += e.v2
}
}
}

// *************************************************************************
// UTIL METHODS
// *************************************************************************

private def parseParameters(args: Array[String]): Boolean = {
if (args.length > 0) {
fileOutput = true
if (args.length == 2) {
edgePath = args(0)
outputPath = args(1)
} else {
System.err.println("Usage: EnumTriangleBasic <edge path> <result path>")
false
}
} else {
System.out.println("Executing Enum Triangles Basic example with built-in default data.")
System.out.println(" Provide parameters to read input data from files.")
System.out.println(" See the documentation for the correct format of input files.")
System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>")
}
true
}

private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
if (fileOutput) {
env.readCsvFile[Edge](edgePath, fieldDelimiter = ' ', includedFields = Array(0, 1))
} else {
val edges = EnumTrianglesData.EDGES.map{ case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int]) }
env.fromCollection(edges)
}
}


private var fileOutput: Boolean = false
private var edgePath: String = null
private var outputPath: String = null

def main(args: Array[String]) {
if (!parseParameters(args)) {
return
}

// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// read input data
val edges = getEdgeDataSet(env)

// project edges by vertex id
val edgesById = edges map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1) )

val triangles = edgesById
// build triads
.groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder())
// filter triads
.join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t }

// emit result
if (fileOutput) {
triangles.writeAsCsv(outputPath, "\n", ",")
} else {
triangles.print()
}

// execute program
env.execute("TriangleEnumeration Example")
}

// *************************************************************************
// USER DATA TYPES
// *************************************************************************

case class Edge(v1: Int, v2: Int) extends Serializable
case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable


// *************************************************************************
// USER FUNCTIONS
// *************************************************************************

/**
* Builds triads (triples of vertices) from pairs of edges that share a vertex. The first vertex
* of a triad is the shared vertex, the second and third vertex are ordered by vertexId. Assumes
* that input edges share the first vertex and are in ascending order of the second vertex.
*/
class TriadBuilder extends GroupReduceFunction[Edge, Triad] {

val vertices = mutable.MutableList[Integer]()

override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = {

// clear vertex list
vertices.clear()

// build and emit triads
for(e <- edges.asScala) {

// combine vertex with all previously read vertices
for(v <- vertices) {
out.collect(Triad(e.v1, v, e.v2))
}
vertices += e.v2
}
}
}

// *************************************************************************
// UTIL METHODS
// *************************************************************************

private def parseParameters(args: Array[String]): Boolean = {
if (args.length > 0) {
fileOutput = true
if (args.length == 2) {
edgePath = args(0)
outputPath = args(1)
} else {
System.err.println("Usage: EnumTriangleBasic <edge path> <result path>")
false
}
} else {
System.out.println("Executing Enum Triangles Basic example with built-in default data.")
System.out.println(" Provide parameters to read input data from files.")
System.out.println(" See the documentation for the correct format of input files.")
System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>")
}
true
}

private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
if (fileOutput) {
env.readCsvFile[Edge](edgePath, fieldDelimiter = ' ', includedFields = Array(0, 1))
} else {
val edges = EnumTrianglesData.EDGES.map {
case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int])
}
env.fromCollection(edges)
}
}


private var fileOutput: Boolean = false
private var edgePath: String = null
private var outputPath: String = null

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ object PageRankBasic {
// initialize lists
.map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
// concatenate lists
.groupBy("sourceId").reduce((l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds))
.groupBy("sourceId").reduce {
(l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
}

// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
Expand Down
Loading

0 comments on commit 0385651

Please sign in to comment.