Exploding an array with a DataFrame

case class Employee(firstName: String, lastName: String, email: String)
case class Department(id: String, name: String)
case class DepartmentWithEmployees(department: Department, employees: Seq[Employee])

val employee1 = new Employee("michael", "armbrust", "abc123@prodigy.net")
val employee2 = new Employee("chris", "fregly", "def456@compuserve.net")

val department1 = new Department("123456", "Engineering")
val department2 = new Department("789012", "Psychology")

val departmentWithEmployees1 = new DepartmentWithEmployees(department1, Seq(employee1, employee2))
val departmentWithEmployees2 = new DepartmentWithEmployees(department2, Seq(employee1, employee2))

val departmentWithEmployeesRDD = sc.parallelize(Seq(departmentWithEmployees1, departmentWithEmployees2))
departmentWithEmployeesRDD.toDF().saveAsParquetFile("dwe.parquet")

val departmentWithEmployeesDF = sqlContext.parquetFile("dwe.parquet")

// This may be replaced by explodeArray() someday
val explodedDepartmentWithEmployeesDF = departmentWithEmployeesDF.explode(departmentWithEmployeesDF("employees")) { 
	case Row(employee: Seq[Row]) => employee.map(employee => 
		Employee(employee(0).asInstanceOf[String], employee(1).asInstanceOf[String], employee(2).asInstanceOf[String])
	) 
}
 
 
Have more questions? Submit a request

Comments