Commit b9c30c55 authored by Seung-Hwan Lim's avatar Seung-Hwan Lim
Browse files

query

parent 07b5409e
Loading
Loading
Loading
Loading
+18 −3
Original line number Diff line number Diff line
@@ -8,8 +8,25 @@ def reader(spark,path):
	# coalesce to prevent from generating too small files.
	return spark.read.parquet(path)

def analysis(spark,df,viewname):
	df.createOrReplaceTempView(viewname)
# count the number of entries per project
	spark.sql("SELECT PROJECT, COUNT(*) FROM FSDATA GROUP BY PROJECT ORDER BY PROJECT").show()
# count the number of files per project
	spark.sql("SELECT PROJECT, COUNT(*) FROM FSDATA WHERE ITEMSIZE=0 GROUP BY PROJECT ORDER BY PROJECT").show()

# burstiness of file operations (write)
	spark.sql("SELECT AVG(MTIME) AS AVG_MTIME, STDDEV(MTIME) AS STDDEV_MTIME FROM FSDATA WHERE ITEMSIZE=0").show()

# burstiness of file operations (read)
	spark.sql("SELECT AVG(ATIME) AS AVG_ATIME, STDDEV(ATIME) AS STDDEV_ATIME FROM FSDATA WHERE ITEMSIZE=0").show()
	
def main():
"""
This program shows a few example analysis queries.
"""
	appName="lustreDU"
	viewName="FSDATA"
	path="file:///<path>/lustredu-analysis-code/data/example.parquet"

# spark 2.x
@@ -27,9 +44,7 @@ def main():
		.getOrCreate()
		
	df=reader(spark,path)
	df.createOrReplaceTempView("FSDATA")
	spark.sql("SELECT PROJECT, COUNT(*) FROM FSDATA GROUP BY PROJECT ORDER BY PROJECT").show()

	analysis(spark,df,viewname)

if __name__ == "__main__":
    main()
+6 −2
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@ def parser(spark,infileName,outputPath):
# Remove files generated by roots or system daemons
	fsdataDF=spark.sql("SELECT CAST(Atime AS TIMESTAMP), CAST(Ctime AS TIMESTAMP), CAST(Mtime AS TIMESTAMP), CAST(Uid AS BIGINT), CAST(Gid AS BIGINT), CAST(Permission AS BIGINT), CAST(ItemSize AS BIGINT), Unknown, OST, PATH, SPLIT(PATH,'/')[2] AS PROJECT,SPLIT(PATH,'/')[3] AS AREA \
				FROM SNAPSHOT WHERE PATH IS NOT NULL AND CAST(Uid AS BIGINT) != 0 AND CAST(Gid AS BIGINT) != 0 AND PATH LIKE '/ROOT/%'  AND SPLIT(PATH,'/')[2] IS NOT NULL") 
	fsdataDF.withColumn("SNAPSHOT_DATE",lit("20170130"))
	fsdataDF.createOrReplaceTempView("FSDATA")

# Removed unwanted projects
@@ -43,9 +44,12 @@ def parser(spark,infileName,outputPath):
	writer(outDF,outputPath)

def main():
"""
this program does the warehousing tasks.
"""
	appName="lustreDU"
	infileName="file:///<path>/lustredu-analysis-code/data/example.csv"
	outputPath="file:///<path>/lustredu-analysis-code/data/example.parquet"
#	infileName="file:///<path>/lustredu-analysis-code/data/example.csv"
#	outputPath="file:///<path>/lustredu-analysis-code/data/example.parquet"

# spark 2.x: change configuration according to the system. This is the configuration that we used throughout the project.
	spark=SparkSession.builder.appName(appName)\