One of the really nice things about spark is the ability to read input files of different formats right out of the box. Though this is a nice to have feature, reading files in spark is not always consistent and seems to keep changing with different spark releases. This article will show you how to read files in csv
and json
to compute word counts on selected fields. This example assumes that you would be using spark 2.0+ with python 3.0 and above. Full working code can be found in this repository.
Data files
To illustrate by example let’s make some assumptions about data files. Let’s assume that we have data files containing a title
field and a corresponding text
field. The toy example format in json is as follows:
{"title":"Data","text":"Data (/ˈdeɪtə/ DAY-tə, /ˈdætə/ DA-tə, or /ˈdɑːtə/ DAH-tə)[1] is a set of values of qualitative or quantitative variables. An example of qualitative data is an anthropologist's handwritten note about his or her interviews with indigenous people. Pieces of data are individual pieces of information. While the concept of data is commonly associated with scientific research, data is collected by a huge range of organizations and institutions, including businesses (e.g., sales data, revenue, profits, stock price), governments (e.g., crime rates, unemployment rates, literacy rates) and non-governmental organizations (e.g., censuses of the number of homeless people by non-profit organizations).Data is measured, collected and reported, and analyzed, whereupon it can be visualized using graphs, images or other analysis tools. Data as a general concept refers to the fact that some existing information or knowledge is represented or coded in some form suitable for better usage or processing."} {"title":"Big Data","text":"Big data is a term for data sets that are so large or complex that traditional data processing application software is inadequate to deal with them. Big data challenges include capturing data, data storage, data analysis, search, sharing, transfer, visualization, querying, updating and information privacy."}
And, the format in csv is as follows:
"title","text" "Data","Data (/ˈdeɪtə/ DAY-tə, /ˈdætə/ DA-tə, or /ˈdɑːtə/ DAH-tə)[1] is a set of values of qualitative or quantitative variables. An example of qualitative data is an anthropologist's handwritten note about his or her interviews with indigenous people. Pieces of data are individual pieces of information. While the concept of data is commonly associated with scientific research, data is collected by a huge range of organizations and institutions, including businesses (e.g., sales data, revenue, profits, stock price), governments (e.g., crime rates, unemployment rates, literacy rates) and non-governmental organizations (e.g., censuses of the number of homeless people by non-profit organizations).Data is measured, collected and reported, and analyzed, whereupon it can be visualized using graphs, images or other analysis tools. Data as a general concept refers to the fact that some existing information or knowledge is represented or coded in some form suitable for better usage or processing." "Big Data","Big data is a term for data sets that are so large or complex that traditional data processing application software is inadequate to deal with them. Big data challenges include capturing data, data storage, data analysis, search, sharing, transfer, visualization, querying, updating and information privacy."
Assume that we want to compute word counts based on the text
field.
Reading JSON File
Reading the json file is actually pretty straightforward, first you create an SQLContext from the spark context. This gives you the capability of querying the json file in regular SQL type syntax.
# Create an sql context so that we can query data files in sql like syntax sqlContext = SQLContext (sparkcontext)
In this next step, you use the sqlContext to read the json file and select only the text
field. Remember that we have two fields, title
and text
and in this case we are only going to process the text
field. This step returns a spark data frame where each entry is a Row object. In order to access the text
field in each row, you would have to use row.text
. Note that the select here is conceptually the same as traditional SQL where you would do: select text from ....
.
# read the json data file and select only the field labeled as "text" # this returns a spark data frame df = sqlContext.read.json ("json_datafile").select("text")
To view what you have just read, you can use df.show()
# just for the heck of it, show 2 results without truncating the fields df.show (2, False)
You should see something like this:
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |text | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |Data (/ˈdeɪtə/ DAY-tə, /ˈdætə/ DA-tə, or /ˈdɑːtə/ DAH-tə)[1] is a set of values of qualitative or quantitative variables. An example of qualitative data is an anthropologist's handwritten note about his or her interviews with indigenous people. Pieces of data are individual pieces of information. While the concept of data is commonly associated with scientific research, data is collected by a huge range of organizations and institutions, including businesses (e.g., sales data, revenue, profits, stock price), governments (e.g., crime rates, unemployment rates, literacy rates) and non-governmental organizations (e.g., censuses of the number of homeless people by non-profit organizations).Data is measured, collected and reported, and analyzed, whereupon it can be visualized using graphs, images or other analysis tools. Data as a general concept refers to the fact that some existing information or knowledge is represented or coded in some form suitable for better usage or processing.| |Big data is a term for data sets that are so large or complex that traditional data processing application software is inadequate to deal with them. Big data challenges include capturing data, data storage, data analysis, search, sharing, transfer, visualization, querying, updating and information privacy. | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
SQL Query to Read JSON file
Note that you can achieve the same results, by issuing an actual SQL query on the dataset. For this, you first register the dataset as a view, then you issue the query. This also returns the same DataFrame as above.
df = sqlContext.read.json ("json_datafile")
# this creates a view of the json dataset
df.createOrReplaceTempView("json_view")
# issue the SQL query to select only the 'text' field
dfNew=sqlContext.sql("select text from json_view")
# show some output
dfNew.show()
Reading CSV File
Reading the csv file is similar to json, with a small twist to it, you would use sqlContext.read.load(...)
and provide a format to it as below. Note that this method of reading is also applicable to different file types including json
, parquet
and csv
and probably others as well.
# Create an sql context so that we can query data files in sql like syntax sqlContext = SQLContext (sparkcontext) # read the CSV data file and select only the field labeled as "text" # this returns a spark data frame df = sqlContext.read.load ("csv_file", format='com.databricks.spark.csv', header='true', inferSchema='true').select("text")
Since the csv data file in this example has a header row, this can be used to infer schema and thus header='true'
as seen above. In this example, we are again selecting only the text
field. This method of reading a file also returns a data frame identical to the previous example on reading a json file.
Generating Word Counts
Now that we know that reading the csv file or the json file returns identical data frames, we can use a single method to compute the word counts on the text
field. The idea here is to break words into tokens for each row entry in the data frame, and return a count of 1 for each token (line 4). This function returns a list of lists where each internal list contains just the word and a count of 1 ([w, 1]
). The tokenized words would serve as the key and the corresponding count would be the value. Then when you reduce by key, you can add up all counts on a per word (key) basis to get total counts for each word (see line 8). Note that add
here is a python function from the operator module.
# for each text entry, get it into tokens and assign a count of 1 # we need to use flat map because we are going from 1 entry to many mapped_rdd = df.rdd.flatMap (lambda row: get_keyval (row)) # for each identical token (i.e. key) add the counts # this gets the counts of each word counts_rdd = mapped_rdd.reduceByKey (add) # get the final output into a list word_count = counts_rdd.collect ()
As you can see below, accessing the text
field is pretty simple if you are dealing with data frames.
def get_keyval(row): # get the text from the row entry text=row.text #lower case text and split by space to get the words words=text.lower().split(" ") #for each word, send back a count of 1 #send a list of lists return [[w, 1] for w in words]
And whoala, now you know how to read files with pyspark and use it for some basic processing! For the full source code please see links below.
Source Code