In today’s world the biggest challenge in processing and analyzing data is, huge volume and the high velocity of incoming data. Data is getting produced at an astonishing pace. Along with the volume and velocity, there is another factor called variety of data. So now we have unstructured data also which needs to be stored and analysed. So sometimes the data goes beyond the capabilities and storage limits of traditional relational databases. To store such type of data, we have new open source technologies like Cassandra, Mongo DB, Hadoop etc.
There are many tools which can be used to access and analyze the data stored in hadoop cluster. One such tool is Cascalog. In this article we will see how we can use cascalog to access, query and analyze the data stored in a hadoop cluster.
To go ahead with data analysis using Cascalog, let’s first understand what cascalog is. Cascalog is an open source which is fully featured for data processing. Cascalog data processing code can be written in Clojure or java. Cascalog is mainly used for processing “Big Data” on Hadoop and for analysing data residing on local computer. Cascalog is another tool for processing data similar to tools like Pig, Hive and Cascading. The major difference between the alternative tools and cascalog is that cascalog operates at a significantly higher level of abstraction than other mentioned tools.

Cascalog is a powerful and easy-to-use data analysis tool for Hadoop. It is a declarative query language that is inspired by datalog syntax. Queries are written as regular Clojure code. The queries written in Cascalog are internally converted to a number of MapReduce jobs. This conversion of Cascalog query to MapReduce jobs is done through the underlying cascading libraries during the compilation. A simple query in Cascalog is converted to a chain of multiple MapReduce jobs, which becomes a big win over writing multiple MapReduce tasks chained together. With Cascalog we just need to write a query declaratively while the underlying libraries take care to create chains of MapReduce tasks. Rather than writing a Mapper, Reducer, Combiner and job configuration for MapReduce jobs, with Cascalog we can write our job in form of a query or Clojure functions to do whatever we want with our data.

In Cascalog the entire query and functions are written in clojure, whereas alternatives like Hive and Pig have their own language. So while creating any UDFs in Hive or Pig, we need to write it in another programming language(Java in most of the cases), which requires a processing engine switch to compile and execute the UDFs. So Cascalog becomes faster than these two alternatives in this scenario because all the code of Cascalog is executed by clojure engine itself.

Cascalog Features:

  • Super simple – Cascalog uses clojure for all the queries and functions so syntax for functions, filters and aggregators is same.
  • Expressive – Logical composition is very powerful and arbitrary Clojure code can be run in query with little effort.
  • Interactive – Cascalog queries are run from Clojure REPL, which is an interactive shell.
  • Scalable – Cascalog queries are internally converted and run as a series of MapReduce jobs.
  • Query anything – Cascalog can be used to query data over hadoop and the data in local machine as well.
  • Careful handling of null values – To handle the null values, Cascalog has a feature called “non-nullable variables” which makes it easier to deal with nulls.
  • First-class interoperability with cascading – Operations defined for Cascalog can be used in a Cascading flow and vice-versa.
  • First-class interoperability with Clojure – Cascalog code is entirely written on clojure language. So we can make use of all the features available to clojure.
  • Dynamic queries – In Cascalog, we can write functions which will return queries. These queries can be formed dynamically inside the function.
  • Easy to extend with custom operations – We can define custom operations because of easy UDFs which are written in same language as Cascalog code: Clojure.
  • Arbitrary inputs and outputs.
  • Use Cascalog side by side with other code.

Getting started with Cascalog:

Cascalog is purely implemented in Clojure programming language but also provide a pure java inerface called JCascalog. Clojure cascalog queries run on clojure REPL. So to setup cascalog, we need to install basic softwares required for running clojure REPL. Below steps are requires to run clojure cascalog:

  1. Install Java 1.7
  2. Install Leiningen or Maven.
  3. In case you are working on clojure with Leiningen, you need to add below dependencies/profiles in the project.clj file:
    1. Add dependency to cascalog:
      [cascalog/cascalog-core “3.0.0”]
    2. Add development dependency to Hadoop:
      :profiles { :dev {:dependencies [[org.apache.hadoop/hadoop-core “1.2.1”]]}}
    3. Bump up heap size for running Hadoop in local mode, also in your project.clj. Make sure that your heap size is set to at least 768 MB.
      :jvm-opts [“-Xms768m” “-Xmx768m”]
  4. In case of Maven, Add following dependency/repository in your pom.xml:
    <repository>
    <id>clojars.org</id>
    <url>http://clojars.org/repo</url>
    </repository>
    <dependency>
    <groupId>cascalog</groupId>
    <artifactId>cascalog-core</artifactId>
    <version>3.0.0</version>
    </dependency>

Cascalog Queries:

To demonstrate how we can query a dataset in Cascalog, I have created two datasets: Scores and Users. The datasets can be cloned from my github repository:
https://github.com/rishabhgupta131/cascalog-examples

Now let’s start querying the datasets. To start working on the datasets defined in github, we need to clone it to our local repository and navigate to cascalog-examples folder. Now start the lein REPL in this folder and use following commands:

(use ‘cascalog-examples.data)
(use ‘cascalog.api)
(require ‘[cascalog.logic.ops :as c])

Now let’s check out the sample data from our dataset:

The cascalog data format is based on tuples. For example in the fetched data in first screenshot, there are two records of user dataset. Each of the record has 5 tuples named: name, user, age, country, active and there are total 500 records in that dataset. Now let’s start with some basic queries in Cascalog. To explain what each of the cascalog query is doing, i will be showing the SQL query, resulting the same dataset as returned by the followed cascalog query

Fetching all records:

The following query shows how we can fetch all the records present in a dataset:

SQL:

SELECT name, user, age, country, active FROM USERS;

Cascalog:

(?<- (stdout)
[?name ?user ?age ?country ?active]
(USERS ?name ?user ?age ?country ?active))

Here Cascalog adds an implicit reduce step to remove duplicates. If we really wish to have the exact same result as SQL we should add (:distinct false) on our Cascalog query.

In reality the above query is split in to parts: one is the actual query the second part is the command to execute it and tell Cascalog where to put the output.

1) query
(<- [?name ?user ?age ?country ?active]
(USERS ?name ?user ?age ?country ?active))
2) execution
(?- (output) query)

for this reason we encapsulate the execution into a Clojure function run<- which we’ll use to run our queries.

(defn run<- [query]
(?- (stdout) query))

Now we just need to pass the query to “run” function as an argument.

(run<-
(<- [?name ?user ?age ?country ?active]
(USERS ?name ?user ?age ?country ?active)))

Adding Filters:

Now let’s try adding some filters to the query:

SQL:

SELECT name, user, age, country, active
FROM USERS
WHERE active = true;

SELECT name, user, age, country, active
FROM USERS
WHERE active = true
AND country = “India”;

SELECT name, user, age, country, active
FROM USERS
WHERE active = true
OR age >= 70;

Cascalog:

(run<-
(<- [?name ?user ?age ?country ?active]
(USERS ?name ?user ?age ?country ?active)
(= ?active true)))

(run<-
(<-
[?name ?user ?age ?country ?active]
(USERS ?name ?user ?age ?country ?active)
(= ?active true)
(= ?country “India”)))

(deffilterfn active-or-senior [active age]
(or active
(>= age 70)))
(run<-
(<-
[?name ?user ?age ?country ?active]
(USERS ?name ?user ?age ?country ?active)
(active-or-senior :< ?active ?age)))

Transformation:

The optional 

 keyword denotes input fields to the function the output can be then placed into field with 

 for example if we want to perform a simple transformation on a field, such as making the username uppercase we can run:

SQL:

SELECT name, UPPER(user), age, country, active
FROM USERS;

Cascalog:

(run<-
(<-
[?name ?user2 ?age ?country ?active]
(USERS ?name ?user ?age ?country ?active)
(clojure.string/upper-case :< ?user :> ?user2)))

Aggregation and Post aggregation filtering(Having):

Following example shows how we can group/aggregate the data in cascalog and apply filter on the grouped records(Having in SQL). The corresponding SQL and cascalog queries are as given below:

SQL:

SELECT count(*), avg(age)
FROM USERS;

SELECT count(*), avg(age), country
FROM USERS
GROUP BY country;

SELECT count(*), country
FROM USERS
WHERE active = true
GROUP BY country;

SELECT count(*) as count, country
FROM USERS
WHERE active = true
GROUP BY country
HAVING count >= 25;

Cascalog:

(run<-
(<- [?count ?average]
(USERS ?name ?user ?age ?country ?active)
(c/count :> ?count)
(c/avg :< ?age :> ?average)))

(run<-
(<- [?count ?average ?country]
(USERS ?name ?user ?age ?country ?active)
(c/count :> ?count)
(c/avg :< ?age :> ?average)))

(run<-
(<- [?count ?country]
(USERS ?name ?user ?age ?country ?active)
(= ?active true)
(c/count :> ?count)))

(run<-
(<- [?count ?country]
(USERS ?name ?user ?age ?country ?active)
(= ?active true)
(c/count :> ?count)
(>= ?count 25)))

For adding a column along with the aggregated data(In case of second query), we just added the required column in the projection and no other changes are required. This works fine because in cascalog GROUP BY is implicit. so we need not to make any additional changes for the GROUP BY clause added in the SQL query.

Joins:

In the following example, we will join the score and user data sets with each other:

SQL:

SELECT u.name, s.game, s.score
FROM USERS as u, SCORES as s
WHERE u.user = s.user;

Cascalog:

(run<-
(<- [?name ?game ?score]
(USERS ?name ?user ?age ?country ?active)
(SCORES ?user ?game ?score)))

The above example shows inner join between the two data sets. If we want to perform left outer join, then we need to use “ungrounding variables”. Cascalog’s outer joins are triggered by variables that begin with “!!” and these variables are called “ungrounding variables”. A predicate that contains an ungrounding variable is called an “unground predicate”, and a predicate that does not contain an ungrounding variable is called a “ground predicate”. Joining together two unground predicates results in a full outer join, while joining a ground predicate to an unground predicate results in a left join. Example of left outer join and full outer join respectively are given below:

SQL:

SELECT u.name, s.game, s.score
FROM USERS as u LEFT OUTER JOIN SCORES as s
ON u.user = s.user;

SELECT u.name, s.game, s.score
FROM USERS as u FULL OUTER JOIN SCORES as s
ON u.user = s.user;

Cascalog:

(run<-
(<- [?name ?age !!game]
(USERS ?name ?age)
(SCORES ?user !!game)))

(run<-
(<- [?name !!age !!game]
(USERS ?name !!age)
(SCORES ?user !!game)))

Conclusion:

From all the examples given above, we have seen that cascalog is very compact, intuitive and simple. Compared to hive and pig it is easier and faster because of the UDF implementation in the same language. For deep insight about the queries and to know more about cascalog, you can visit: http://cascalog.org/articles/getting_started.html