Search This Blog

Thursday, February 27, 2014

Amazon Simple Storage Service (Amazon S3)

Amazon S3 is storage for the Internet. It is designed to make web-scale computing easier for developers.

Amazon S3 provides a simple web-services interface that can be used to store and retrieve any amount of data, at any time, from anywhere on the web. It gives any developer access to the same highly scalable, reliable, secure, fast, inexpensive infrastructure that Amazon uses to run its own global network of web sites. The service aims to maximize benefits of scale and to pass those benefits on to developers.

Use Cases

Content Storage and Distribution
Amazon S3 provides a highly durable and available store for a variety of content, ranging from web applications to media files. It allows you to offload your entire storage infrastructure onto the cloud, where you can take advantage of Amazon S3’s scalability and pay-as-you-go pricing to handle your growing storage needs. You can distribute your content directly from Amazon S3 or use Amazon S3 as an origin store for pushing content to your Amazon CloudFront edge locations.
For sharing content that is either easily reproduced or where you’re storing an original copy elsewhere, Amazon S3’s Reduced Redundancy Storage (RRS) feature provides a compelling solution. For example, if you’re storing media content in-house but you need to provide accessibility to your customers, channel partners, or employees, RRS is a low-cost solution for storing and sharing this content.

Storage for Data Analysis
Whether you’re storing pharmaceutical data for analysis, financial data for computation and pricing, or photo images for resizing, Amazon S3 is an ideal location to store your original content. You can then send this content to Amazon EC2 for computation, resizing, or other large scale analytics – without incurring any data transfer charges for moving the data between the services. You can then choose to store the resulting, reproducible content using Amazon S3’s Reduced Redundancy Storage feature (or, of course, you can store it using Amazon S3’s standard storage as well).

Backup, Archiving and Disaster Recovery
Amazon S3 offers a highly durable, scalable, and secure solution for backing up and archiving your critical data. You can use Amazon S3’s Versioning capability to provide even further protection for your stored data. If you have data sets of significant size, you can useAWS Import/Export to move large amounts of data into and out of AWS with physical storage devices. This is ideal for moving large quantities of data for periodic backups, or quickly retrieving data for disaster recovery scenarios. You can also define rules to archive sets of Amazon S3 objects to Amazon Glacier’s extremely low-cost storage service based on object lifetimes. As your data ages, these rules enable you to ensure that it’s automatically stored on the storage option that is most cost-effective for your needs.

Static Website Hosting
You can host your entire static website on Amazon S3 for an inexpensive, highly available hosting solution that scales automatically to meet traffic demands. Self-hosting a highly available website that can handle peak traffic loads can be challenging and costly. With Amazon S3, you can reliably serve your traffic and handle unexpected peaks without worrying about scaling your infrastructure. Amazon S3 is designed for 99.99% availability and 99.999999999% durability, and it gives you access to the same highly scalable, reliable, and fast infrastructure that Amazon uses to run its own global network of web sites. You also benefit from pay-as-you-go pricing. You pay only for the capacity you use. Amazon S3’s website hosting solution is ideal for websites with static content, including html files, images, videos, and client-side scripts such as JavaScript. (Amazon EC2 is recommended for websites with server-side scripting and database interaction).


Amazon S3 Design Requirements
Amazon S3 is based on the idea that quality Internet-based storage should be taken for granted. It helps free developers from worrying about how they will store their data, whether it will be safe and secure, or whether they will have enough storage available. It frees them from the upfront costs of setting up their own storage solution as well as the ongoing costs of maintaining and scaling their storage servers. The functionality of Amazon S3 is simple and robust: Store any amount of data inexpensively and securely, while ensuring that the data will always be available when you need it. Amazon S3 enables developers to focus on innovating with data, rather than figuring out how to store it.

Amazon S3 was built to fulfill the following design requirements:

Secure
Built to provide infrastructure that allows the customer to maintain full control over who has access to their data. Customers must also be able to easily secure their data in transit and at rest.

Reliable
Store data with up to 99.999999999% durability, with 99.99% availability. There can be no single points of failure. All failures must be tolerated or repaired by the system without any downtime.

Scalable
Amazon S3 can scale in terms of storage, request rate, and users to support an unlimited number of web-scale applications. It uses scale as an advantage: Adding nodes to the system increases, not decreases, its availability, speed, throughput, capacity, and robustness.

Fast
Amazon S3 must be fast enough to support high-performance applications. Server-side latency must be insignificant relative to Internet latency. Any performance bottlenecks can be fixed by simply adding nodes to the system.

Inexpensive
Amazon S3 is built from inexpensive commodity hardware components. All hardware will eventually fail and this must not affect the overall system. It must be hardware-agnostic, so that savings can be captured as Amazon continues to drive down infrastructure costs.

Simple
Building highly scalable, reliable, fast, and inexpensive storage is difficult. Doing so in a way that makes it easy to use for any application anywhere is more difficult. Amazon S3 must do both.

A forcing-function for the design was that a single Amazon S3 distributed system must support the needs of both internal Amazon applications and external developers of any application. This means that it must be fast and reliable enough to run Amazon.com’s websites, while flexible enough that any developer can use it for any data storage need.

Wednesday, February 26, 2014

Difference between shallow copy, deep copy and normal assignment operation

Normal assignment operations will simply point the new variable towards the existing object.

The difference between shallow and deep copying is only relevant for compound objects (objects that contain other objects, like lists or class instances):

    •    A shallow copy constructs a new compound object and then (to the extent possible) inserts references into it to the objects found in the original.
    •    A deep copy constructs a new compound object and then, recursively, inserts copies into it of the objects found in the original.

Here's a little demonstration:
import copy

a = [1, 2, 3]
b = [4, 5, 6]
c = [a, b]

Using normal assignment operatings to copy:
d = c

print id(c) == id(d)          # True - d is the same object as c
print id(c[0]) == id(d[0])    # True - d[0] is the same object as c[0]

Using a shallow copy:
d = copy.copy(c)

print id(c) == id(d)          # False - d is now a new object
print id(c[0]) == id(d[0])    # True - d[0] is the same object as c[0]

Using a deep copy:
d = copy.deepcopy(c)

print id(c) == id(d)          # False - d is now a new object
print id(c[0]) == id(d[0])    # False - d[0] is now a new object


If you run a "shallow copy" on e, copying it to e1, you will find that the id of the list changes, but each copy of the list contains references to the same three lists -- the lists with integers inside. That means that if you were to do e[0].append(3), then e would be [[1, 2, 3],[4, 5, 6],[7, 8, 9]]. But e1 would also be [[1, 2, 3],[4, 5, 6],[7, 8, 9]]. On the other hand, if you subsequently did e.append([10, 11, 12]), e would be [[1, 2, 3],[4, 5, 6],[7, 8, 9],[10, 11, 12]]. But e1 would still be [[1, 2, 3],[4, 5, 6],[7, 8, 9]]. That's because the outer lists are separate objects that initially each contain three references to three inner lists. If you modify the inner lists, you can see those changes no matter if you are viewing them through one copy or the other. But if you modify one of the outer lists as above, then e contains three references to the original three lists plus one more reference to a new list. And e1 still only contains the original three references.

A 'deep copy' would not only duplicate the outer list, but it would also go inside the lists and duplicate the inner lists, so that the two resulting objects do not contain any of the same references (as far as mutable objects are concerned). If the inner lists had further lists (or other objects such as dictionaries) inside of them, they too would be duplicated. That's the 'deep' part of the 'deep copy'.

Write an algorithm to print all ways of arranging eight queens on a chess board so that none of them share the same row, column or diagonal.

__author__ = 'nitin'

def print_queens():
    print "Result Queen Board:",column_for_row

def place_queen(row):
    if row==8:
        print_queens()
        return True
    for i in range(8):
        column_for_row[row]=i
        if check(row):
            done=place_queen(row+1)
            if done:
                return True

def check(row):
    for i in range(row):
        diff=abs(column_for_row[i] - column_for_row[row])
        if diff ==0 or diff == row -i:
            return False
    return True

if __name__=='__main__':
    column_for_row=[-1]*8
    place_queen(0)

Tuesday, February 25, 2014

Why use Binary Search Trees if we have Linked Lists ?

It mostly depends on scenario. If tail of linked list is maintained then insertion is fast in linked list. Deletion is quite fast in linked list but in case of searching it is better in trees( o(log(n) for height balance tree ) while o(n) in linked list.

A linked list is often unsorted and so addition of new nodes is simply an O(1) operation normally by appending to the tail of the list.

On the other hand a binary tree must store nodes in a specific ordering mechanism (and possibly enforce balancing) to provide for a more efficient searching/retrieval operation.

If your algorithm does NOT need to retrieve items very efficiently while also provide efficient sorting of the items, a linked list is probably all you need.

Queues and Stacks are examples of data structures that can be happily implemented using a linked list.

Note: Insertion to a linked list is a different (slower) operation than basic addition/append. Insertion often requires traversal along the list until the correct position is found, O(n) where n is the list length. An append is simply adding to the tail of the list (hence O(1))

Implement random function 7 if you are given with random function 5

__author__ = 'nitin'

""" Implement rand_7() if you are given rand_5() """

import random

def rand_seven():
    result_num=random.randint(1,5) + (random.randint(1,5))%3
    return result_num

print rand_seven()

Find Longest Palindrome in a string

__author__ = 'nitin'

given_word='anitins'

def isPalindrome(word):
    i = 0
    j = len(word) - 1
    while i < j:
        if word[i] != word[j]:
            return False
        i+=1
        j-=1
    return True

def longestPalindrome(string):
    length = len(string)
    maxLength = len(string)
    while maxLength > 1:
        start = 0
        while start <= length - maxLength:
            end = start + maxLength
            if isPalindrome(string[start:end]):
                return string[start:end]
            start += 1
        maxLength -= 1
    return False

if __name__=="__main__":
    print longestPalindrome(given_word)

Write a program to find the longest word made of other words in a list of words.

__author__ = 'nitin'

   
EXAMPLE 
Input: test, tester, testertest, testing, testingtester
Output: testingtester

import sys

DEFAULT_FILE = 'words.txt'

def recsol(word, first=False):
    if not word or (not first and word in wordset): return True
    for i in range(1, len(word)):
        start = word[0:i]
        end = word[i:]
        if start in wordset and recsol(end):
            return True
    return False

filename = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_FILE

header = '# Largest to Smallest Composable Strings #'
print '\n' + '#'*len(header) + '\n' + header + '\n' + '#'*len(header)
with open(filename, 'r') as f:
    wordstr = f.read()
wordlst = wordstr.split()
wordlst.sort(key=len, reverse=True)
wordset = set(wordlst)
cnt = 1
for s in wordlst:
    if recsol(s, True):
        print str(cnt) + '. ' + s
        cnt += 1

You are given an array of integers (both positive and negative). Find the continuous sequence with the largest sum. Return the sum.

__author__ = 'nitin'

def get_max_sum(a):
    max_sum=0
    sum_int=0
    i=0
    iter_a=len(a)
    result_list=[]
    while iter_a!=0:
        sum_int=sum_int+ a[i]
        if max_sum < sum_int:
            result_list.append(a[i])
        if max_sum>sum_int and sum_int >0:
            result_list.append(a[i])
        elif sum_int<0 and="" iter_a="">1:
            sum_int=0
            sum_int, result_list=negative_check(iter_a,i,sum_int,a,result_list)
        i=i+1
        iter_a=iter_a -1
    return max_sum, result_list

def negative_check(sec_iter_a,j,sum_int,a,result_list):
    while sec_iter_a>1:
        sum_int=sum_int + a[j+1]
        if sum_int>0:
            result_list=[]
            break
        j=j+1
        sec_iter_a=sec_iter_a -1
    sum_int=0
    return sum_int, result_list

if __name__=="__main__":
    a_list=[-2,-3,-4,3,4]
    result,result_list=get_max_sum(a_list)
    print "Result :",result, "and Result List :",result_list

Given a list of 'N' coins, their values being in an array A[], return the minimum number of coins required to sum to 'S'

__author__ = 'nitin'

import sys

sys.setrecursionlimit(100000)

coins_list=[1,5,10]

def get_coins_sum(given_num,iter_list):
    result_coins=0
    if iter_list>0:
        if given_num>=coins_list[iter_list-1]:
            given_num=given_num- coins_list[iter_list-1]
            if given_num>=coins_list[iter_list-1]:
                result_coins=get_coins_sum(given_num,iter_list)
                result_coins=result_coins+1
            else:
                iter_list=iter_list-1
                result_coins=get_coins_sum(given_num,iter_list)
                result_coins=result_coins+1
        else:
            iter_list=iter_list-1
            result_coins=get_coins_sum(given_num,iter_list)
    return result_coins

if __name__=='__main__':
    iter_list=len(coins_list)
    print get_coins_sum(16,iter_list)

Given an integer between 0 and 99,999, print an English phrase that describes the integer (eg, “One Thousand, Two Hundred and Thirty Four”).

__author__ = 'nitin'

import math

def num_to_string(given_num):
    result_list=[]
    list1=["","One","Two","Three","Four","Five","Six","Seven","Eight","Nine"]
    list11=["","Eleven","Twelve","Thirteen","Fourteen","Fifteen","Sixteen","Seventeen","Eighteen","Nineteen"]
    list10=["","Ten","Twenty","Thirty","Forty","Fifty","Sixty","Seventy","Eighty","Ninety"]
    list100=["","Hundred","Thousand"]
    len=1
    while int(math.pow(10,len))        len=len+1
    if given_num==0:
        result_list.append("Zero")
    else:
        if len>3 and len%2==0:
            len=len+1
        while len>0:
            if len>3:
                tmp=given_num/int(math.pow(10,len-2))
                if tmp/10==1 and tmp%10 !=0:
                    result_list.append(list11[tmp%10])
                else:
                    result_list.append(list10[tmp/10])
                    result_list.append(list1[tmp%10])
                if tmp>0:
                    result_list.append(list100[len/2])
                given_num=given_num % int(math.pow(10,len-2))
                len=len-2
            elif given_num==1000 and len==3:
                result_list.append(list1[len-2])
                result_list.append(list100[len-1])
                len=0
            else:
                tmp=given_num/100
                if tmp!=0:
                    result_list.append(list1[tmp])
                    result_list.append(list100[len/2])
                tmp=given_num%100
                if tmp/10==1 and tmp%10 !=0:
                    result_list.append(list11[tmp%10])
                else:
                    result_list.append(list10[tmp/10])
                    result_list.append(list1[tmp%10])
                len=0
    return " ".join(result_list)

if __name__=="__main__":
    given_num=1000
    print num_to_string(given_num)

Given two words of equal length that are in a dictionary, write a method to transform one word into another word by changing only one letter at a time. The new word you get in each step must be in the dictionary.

__author__ = 'nitin'

import copy

string_1='damp'
string_2='like'
strings_list=['damp','lamp','limp','lime','like','nitin','nitzz']

def convert_word():
    list_1=list(string_1)
    list_2=list(string_2)
    print "\nBefore Conversion list_1 :", "".join(list_1)
    print "Before Conversion list_2 :", "".join(list_2), "\n"
    iter_list=len(list_2)
    i=0
    record_list=[]
    while iter_list>0:
        temp_list=copy.deepcopy(list_1)
        iter_temp=len(temp_list)
        while iter_temp>0:
            temp_list=copy.deepcopy(list_1)
            if i not in record_list:
                temp_list[i]=list_2[i]
                res_str="".join(temp_list)
                if res_str in strings_list:
                    list_1=temp_list
                    record_list.append(i)
                    i=0
                    break
                else:
                    i=i+1
                    iter_temp=iter_temp -1
            else:
                i=i+1
                iter_temp=iter_temp -1
            if i==len(list_1):
                i=0
        iter_list=iter_list -1
    print "After Conversion list_1 :", "".join(list_1)
    print "After Conversion list_2 :", "".join(list_2), "\n"
    if list_1==list_2:
        print "Successfully Converted"
    else:
        print "One of the combination was not present in dictionary"

if __name__=='__main__':
    convert_word()

Given a BST, replace each node with the sum of the values of all the nodes that are greater than that node.

public static int chSum(TNode node){
        if(node == null){
            return 0;
        }else if(node.left == null && node.right == null){
            return 0;  // return 0,since we dont have anything left
        }else{
            int  tmp = node.value;
            node.value = chSum(node.right);
            return tmp + node.value + chSum(node.left);
        }
    }

Flatten an iterator of iterators in Python. If the input is [ [1,2], [3,[4,5]], 6], it should return [1,2,3,4,5,6].

__author__ = 'nitin'

def return_list(a_list):
    i=0
    result_list=[]
    tmp_len=len(a_list)
    while tmp_len>0:
        if type(a_list[i]) is list:
            tmp_list=return_list(a_list[i])
            for j in tmp_list:
                result_list.append(j)
        else:
            result_list.append(a_list[i])
        tmp_len=tmp_len -1
        i=i+1
    return result_list

def iter_of_iterators():
    len_of_list=len(given_list)
    result_list=[]
    i=0
    while len_of_list>0:
        tmp=given_list[i]
        if type(tmp) is list:
            tmp_list=return_list(tmp)
            for j in tmp_list:
                result_list.append(j)
        else:
            result_list.append(given_list[i])
        i=i+1
        len_of_list=len_of_list -1
    return result_list

if __name__=="__main__":
    given_list=[[1,2],[3,[4,5]],6]
    print iter_of_iterators()

Monday, February 24, 2014

High Availability for the Hadoop Distributed File System (HDFS)

Background
Apache Hadoop consists of two primary components: HDFS and MapReduce. HDFS, the Hadoop Distributed File System, is the primary storage system of Hadoop, and is responsible for storing and serving all data stored in Hadoop. MapReduce is a distributed processing framework designed to operate on data stored in HDFS.
HDFS has long been considered a highly reliable file system.  An empirical study done at Yahoo! concluded that across Yahoo!’s 20,000 nodes running Apache Hadoop in 10 different clusters in 2009, HDFS lost only 650 blocks out of 329 million total blocks. The vast majority of these lost blocks were due to a handful of bugs which have long since been fixed.
Despite this very high level of reliability, HDFS has always had a well-known single point of failure which impacts HDFS’s availability: the system relies on a single Name Node to coordinate access to the file system data. In clusters which are used exclusively for ETL or batch-processing workflows, a brief HDFS outage may not have immediate business impact on an organization; however, in the past few years we have seen HDFS begin to be used for more interactive workloads or, in the case of HBase, used to directly serve customer requests in real time. In cases such as this, an HDFS outage will immediately impact the productivity of internal users, and perhaps result in downtime visible to external users. For these reasons, adding high availability (HA) to the HDFS Name Node became one of the top priorities for the HDFS community.

High-level Architecture
The goal of the HA Name Node project is to add support for deploying two Name Nodes in an active/passive configuration. This is a common configuration for highly-available distributed systems, and HDFS’s architecture lends itself well to this design. Even in a non-HA configuration, HDFS already requires both a Name Node and another node with similar hardware specs which performs checkpointing operations for the Name Node. The design of the HA Name Node is such that the passive Name Node is capable of performing this checkpointing role, thus requiring no additional Hadoop server machines beyond what HDFS already requires.



The HDFS Name Node is primarily responsible for serving two types of file system metadata: file system namespace information and block locations. Because of the architecture of HDFS, these must be handled separately.

Namespace Information

All mutations to the file system namespace, such as file renames, permission changes, file creations, block allocations, etc, are written to a persistent write-ahead log by the Name Node before returning success to a client call. In addition to this edit log, periodic checkpoints of the file system, called the fsimage, are also created and stored on-disk on the Name Node. Block locations, on the other hand, are stored only in memory. The locations of all blocks are received via “block reports” sent from the Data Nodes when the Name Node is started.
The goal of the HA Name Node is to provide a hot standby Name Node that can take over serving the role of the active Name Node with no downtime. To provide this capability, it is critical that the standby Name Node has the most complete and up-to-date file system state possible in memory. Empirically, starting a Name Node from cold state can take tens of minutes to load the namespace information (fsimage and edit log) from disk, and up to an hour to receive the necessary block reports from all Data Nodes in a large cluster.
The Name Node has long supported the ability to write its edit logs to multiple, redundant local directories. To address the issue of sharing state between the active and standby Name Nodes, the HA Name Node feature allows for the configuration of a special shared edits directory. This directory should be available via a network file system, and should be read/write accessible from both Name Nodes. This directory is treated as being required by the active Name Node, meaning that success will not be returned to a client call unless the file system change has been written to the edit log in this directory. The standby Name Node polls the shared edits directory frequently, looking for new edits written by the active Name Node, and reads these edits into its own in-memory view of the file system state.
Note that requiring a single shared edits directory does not necessarily imply a new single point of failure. It does, however, mean that the filer providing this shared directory must itself be HA, and that multiple network routes should be configured between the Name Nodes and the service providing this shared directory. Plans to improve this situation are discussed further below.

Block Locations
The other part of keeping the standby Name Node hot is making sure that it has up-to-date block location information. Since block locations aren’t written to the Name Node edit log, reading from the shared edits directory is not sufficient to share this file system metadata between the two Name Nodes. To address this issue, when HA is enabled, all Data Nodes in the cluster are configured with the network addresses of both Name Nodes. Data Nodes send all block reports, block location updates, and heartbeats to both Name Nodes, but Data Nodes will only act on block commands issued by the currently-active Name Node.
With both up-to-date namespace information and block locations in the standby Name Node, the system is able to perform a failover from the active Name Node to the standby with no delay.

Client Failover
Since multiple distinct daemons are now capable of serving as the active Name Node for a single cluster, the HDFS client must be able to determine which Name Node to communicate with at any given time. The HA Name Node feature does not support an active-active configuration, and thus all client calls must go to the active Name Node in order to be served.
To implement this feature, the HDFS client was extended to support the configuration of multiple network addresses, one for each Name Node, which collectively represent the HA name service. The name service is identified by a single logical URI, which is mapped to the two network addresses of the HA Name Nodes via client-side configuration. These addresses are tried in order by the HDFS client. If a client makes a call to the standby Name Node, a special result is returned to the client, indicating that it should retry elsewhere. The configured addresses are tried in order by the client until an active Name Node is found.
In the event that the active Name Node crashes while in the middle of processing a request, the client will be unable to determine whether or not the request was processed. For many operations such as reads (or idempotent writes such as setting permissions, setting modification time, etc), this is not a problem — the client may simply retry after the failover has completed. For others, the error must be bubbled up to the caller to be correctly handled. In the course of the HA project, we extended the Hadoop IPC system to be able to classify each operation’s idempotence using special annotations.

How the hadoop system finds that namenode is failed or not working ??

It depends which version of Hadoop you are talking about. Before Hadoop 2, the Namenode was a single point of failure, so if it failed that meant your cluster became unusable. Even the SecondaryNameNode doesn't help in that case since it's only used for checkpoints, not as a backup for the NameNode. When the NameNode fails, someone like an administrator would have to manually restart the NameNode.

But since Hadoop 2, you have a better way to handle failures in the NameNode. You can run 2 redundant NameNodes alongside one another, so that if one of the Namenodes fails, the cluster will quickly failover to the other NameNode.

The way it works is pretty transparent, basically the DataNodes will send reports to both NameNodesso that if one fails, the other one will be ready to be used in active mode. And for the client, it simply contacts every NameNode configured until it finds the active one. So if it gets a reply saying to try elsewhere, or if the NameNode doesn't reply, it knows that it needs to use a different NameNode.

Sunday, February 23, 2014

Azkaban

A batch job scheduler can be seen as a combination of the cron and make Unix utilities combined with a friendly UI. Batch jobs need to be scheduled to run periodically. They also typically have intricate dependency chains—for example, dependencies on various data extraction processes or previous steps. Larger processes might have 50 or 60 steps, of which some might run in parallel and others must wait for the output of earlier steps. Combining all these processes into a single program allows you to control the dependency management, but can lead to sprawling monolithic programs that are difficult to test or maintain. Simply scheduling the individual pieces to run at different times avoids the monolithic problem, but introduces many timing assumptions that are inevitably broken. Azkaban is a workflow scheduler that allows the independent pieces to be declaratively assembled into a single workflow, and for that workflow to be scheduled to run periodically.
A good batch workflow system allows a program to be built out of small reusable pieces that need not know about one another. By declaring dependencies, you can control sequencing. Other functionality available from Azkaban can then be declaratively layered on top of the job without having to add any code. This includes things like email notifications of success or failure, resource locking, retry on failure, log collection, historical job run time information, and so on.

Azkaban consists of 3 key components:
    •    Relational Database (MySQL)
    •    AzkabanWebServer
    •    AzkabanExecutorServer

Relational Database (MySQL)
Azkaban uses MySQL to store much of its state. Both the AzkabanWebServer and the AzkabanExecutorServer access the DB.
How does AzkabanWebServer use the DB?

The web server uses the db for the following reasons:
    •    Project Management - The projects, the permissions on the projects as well as the uploaded files.
    •    Executing Flow State - Keep track of executing flows and which Executor is running them.
    •    Previous Flow/Jobs - Search through previous executions of jobs and flows as well as access their log files.
    •    Scheduler - Keeps the state of the scheduled jobs.
    •    SLA - Keeps all the sla rules 


How does the AzkabanExecutorServer use the DB?
The executor server uses the db for the following reasons:
    •    Access the project - Retrieves project files from the db.
    •    Executing Flows/Jobs - Retrieves and updates data for flows and that are executing
    •    Logs - Stores the output logs for jobs and flows into the db.
    •    Interflow dependency - If a flow is running on a different executor, it will take state from the DB.
There is no reason why MySQL was chosen except that it is a widely used DB. We are looking to implement compatibility with other DB’s, although the search requirement on historically running jobs benefits from a relational data store.

AzkabanWebServer
The AzkabanWebServer is the main manager to all of Azkaban. It handles project management, authentication, scheduler, and monitoring of executions. It also serves as the web user interface.
Using Azkaban is easy. Azkaban uses *.job key-value property files to define individual tasks in a work flow, and the dependenciesproperty to define the dependency chain of the jobs. These job files and associated code can be archived into a *.zip and uploaded through the web server through the Azkaban UI or through curl.

AzkabanExecutorServer
Previous versions of Azkaban had both the AzkabanWebServer and the AzkabanExecutorServer features in a single server. The Executor has since been separated into its own server. There were several reasons for splitting these services: we will soon be able to scale the number of executions and fall back on operating Executors if one fails. Also, we are able to roll our upgrades of Azkaban with minimal impact on the users. As Azkaban’s usage grew, we found that upgrading Azkaban became increasingly more difficult as all times of the day became ‘peak’.

By declaring dependencies you can control sequencing. Other functionality available from Azkaban can then be layered on top of the job--email notifications of success or failure, resource locking, retry on failure, log collection, historical job runtime information, and so on.

Friday, February 21, 2014

About Recursion Limit in Python

Python lacks the tail recursion optimizations common in functional languages like lisp. In Python, recursion is limited to 999 calls (see sys.getrecursionlimit).
I dare to say that in Python, pure recursive algorithm implementations are not correct/safe. A fib() implementation limited to 999 is not really correct. It is always possible to convert recursive into iterative, and doing so is trivial.
If you expect to get more than 999 deep, my advice is to convert the algorithm from recursive to iterative. If not, check for a runaway bug (the implementation lacks a condition that stops recursion, or this condition is wrong).

How to set Recursion Limit
You can increment the stack depth allowed - with this, deeper recursive calls will be possible, like this:
import sys
sys.setrecursionlimit(10000) # 10000 is an example, try with different values
... But I'd advise you to first try to optimize your code, for instance, using iteration instead of
recursion.

How to decide Recursion Limit
It is based on the TOTAL stack depth and not really the depth of any particular single function. You are probably already at a stack depth of 5 when you make the first call to rec().
Take for example 5 recursive functions. Each makes 98 recursive calls with the last one being to the next recursive function. With a recursion limit of 100 do you really want to allow each recursive function to make 99 calls for a total depth of ~500 calls? No, that might crash the interpreter at those depths.
Therefore the recursion limit is the maximum depth of all functions globally, not any single named function.

Thursday, February 20, 2014

The Hadoop Distributed File System

Introduction
HDFS, the Hadoop Distributed File System, is a distributed file system designed to hold very large amounts of data (terabytes or even petabytes), and provide high-throughput access to this information. Files are stored in a redundant fashion across multiple machines to ensure their durability to failure and high availability to very parallel applications. This module introduces the design of this distributed file system and instructions on how to operate it.

Distributed File System Basics
A distributed file system is designed to hold a large amount of data and provide access to this data to many clients distributed across a network. There are a number of distributed file systems that solve this problem in different ways.

NFS, the Network File System, is the most ubiquitous distributed file system. It is one of the oldest still in use. While its design is straightforward, it is also very constrained. NFS provides remote access to a single logical volume stored on a single machine. An NFS server makes a portion of its local file system visible to external clients. The clients can then mount this remote file system directly into their own Linux file system, and interact with it as though it were part of the local drive.

One of the primary advantages of this model is its transparency. Clients do not need to be particularly aware that they are working on files stored remotely. The existing standard library methods like open(), close(),fread(), etc. will work on files hosted over NFS.

But as a distributed file system, it is limited in its power. The files in an NFS volume all reside on a single machine. This means that it will only store as much information as can be stored in one machine, and does not provide any reliability guarantees if that machine goes down (e.g., by replicating the files to other servers). Finally, as all the data is stored on a single machine, all the clients must go to this machine to retrieve their data. This can overload the server if a large number of clients must be handled. Clients must also always copy the data to their local machines before they can operate on it.

HDFS is designed to be robust to a number of the problems that other DFS's such as NFS are vulnerable to. In particular:

    •    HDFS is designed to store a very large amount of information (terabytes or petabytes). This requires spreading the data across a large number of machines. It also supports much larger file sizes than NFS.
    •    HDFS should store data reliably. If individual machines in the cluster malfunction, data should still be available.
    •    HDFS should provide fast, scalable access to this information. It should be possible to serve a larger number of clients by simply adding more machines to the cluster.
    •    HDFS should integrate well with Hadoop MapReduce, allowing data to be read and computed upon locally when possible.

But while HDFS is very scalable, its high performance design also restricts it to a particular class of applications; it is not as general-purpose as NFS. There are a large number of additional decisions and trade-offs that were made with HDFS. In particular:

    •    Applications that use HDFS are assumed to perform long sequential streaming reads from files. HDFS is optimized to provide streaming read performance; this comes at the expense of random seek times to arbitrary positions in files.
    •    Data will be written to the HDFS once and then read several times; updates to files after they have already been closed are not supported. (An extension to Hadoop will provide support for appending new data to the ends of files; it is scheduled to be included in Hadoop 0.19 but is not available yet.)
    •    Due to the large size of files, and the sequential nature of reads, the system does not provide a mechanism for local caching of data. The overhead of caching is great enough that data should simply be re-read from HDFS source.
    •    Individual machines are assumed to fail on a frequent basis, both permanently and intermittently. The cluster must be able to withstand the complete failure of several machines, possibly many happening at the same time (e.g., if a rack fails all together). While performance may degrade proportional to the number of machines lost, the system as a whole should not become overly slow, nor should information be lost. Data replication strategies combat this problem.

The design of HDFS is based on the design of GFS, the Google File System. Its design was described in a paperpublished by Google.

HDFS is a block-structured file system: individual files are broken into blocks of a fixed size. These blocks are stored across a cluster of one or more machines with data storage capacity. Individual machines in the cluster are referred to as DataNodes. A file can be made of several blocks, and they are not necessarily stored on the same machine; the target machines which hold each block are chosen randomly on a block-by-block basis. Thus access to a file may require the cooperation of multiple machines, but supports file sizes far larger than a single-machine DFS; individual files can require more space than a single hard drive could hold.

If several machines must be involved in the serving of a file, then a file could be rendered unavailable by the loss of any one of those machines. HDFS combats this problem by replicating each block across a number of machines (3, by default).

Most block-structured file systems use a block size on the order of 4 or 8 KB. By contrast, the default block size in HDFS is 64MB -- orders of magnitude larger. This allows HDFS to decrease the amount of metadata storage required per file (the list of blocks per file will be smaller as the size of individual blocks increases). Furthermore, it allows for fast streaming reads of data, by keeping large amounts of data sequentially laid out on the disk. The consequence of this decision is that HDFS expects to have very large files, and expects them to be read sequentially. Unlike a file system such as NTFS or EXT, which see many very small files, HDFS expects to store a modest number of very large files: hundreds of megabytes, or gigabytes each. After all, a 100 MB file is not even two full blocks. Files on your computer may also frequently be accessed "randomly," with applications cherry-picking small amounts of information from several different locations in a file which are not sequentially laid out. By contrast, HDFS expects to read a block start-to-finish for a program. This makes it particularly useful to the MapReduce style of programming described in Module 4. That having been said, attempting to use HDFS as a general-purpose distributed file system for a diverse set of applications will be suboptimal.

Because HDFS stores files as a set of large blocks across several machines, these files are not part of the ordinary file system. Typing ls on a machine running a DataNode daemon will display the contents of the ordinary Linux file system being used to host the Hadoop services -- but it will not include any of the files stored inside the HDFS. This is because HDFS runs in a separate namespace, isolated from the contents of your local files. The files inside HDFS (or more accurately: the blocks that make them up) are stored in a particular directory managed by the DataNode service, but the files will named only with block ids. You cannot interact with HDFS-stored files using ordinary Linux file modification tools (e.g., ls, cp, mv, etc). However, HDFS does come with its own utilities for file management, which act very similar to these familiar tools. A later section in this tutorial will introduce you to these commands and their operation.

It is important for this file system to store its metadata reliably. Furthermore, while the file data is accessed in a write once and read many model, the metadata structures (e.g., the names of files and directories) can be modified by a large number of clients concurrently. It is important that this information is never desynchronized. Therefore, it is all handled by a single machine, called the NameNode. The NameNode stores all the metadata for the file system. Because of the relatively low amount of metadata per file (it only tracks file names, permissions, and the locations of each block of each file), all of this information can be stored in the main memory of the NameNode machine, allowing fast access to the metadata.

To open a file, a client contacts the NameNode and retrieves a list of locations for the blocks that comprise the file. These locations identify the DataNodes which hold each block. Clients then read file data directly from the DataNode servers, possibly in parallel. The NameNode is not directly involved in this bulk data transfer, keeping its overhead to a minimum.

Of course, NameNode information must be preserved even if the NameNode machine fails; there are multiple redundant systems that allow the NameNode to preserve the file system's metadata even if the NameNode itself crashes irrecoverably. NameNode failure is more severe for the cluster than DataNode failure. While individual DataNodes may crash and the entire cluster will continue to operate, the loss of the NameNode will render the cluster inaccessible until it is manually restored. Fortunately, as the NameNode's involvement is relatively minimal, the odds of it failing are considerably lower than the odds of an arbitrary DataNode failing at any given point in time.

Tuesday, February 18, 2014

What is ActiveMQ used for?

It's used to reliably communicate between two distributed processes. Yes you could store messages in a database to communicate between to processes, but as soon as the message is received you'd have to delete the message. That means a row insert and delete for each message. When you try to scale that up communicating thousands of messages per second, databases tend to fall over.

Message oriented middleware like ActiveMQ on the other hand are build to handle those use cases. They asume that messages in a healthy system will be deleted very quickly and can do optimizations to avoid the overhead. It can also push messages to consumers instead of a consumer having to poll for new message by doing a SQL query. This further reduces the latency involved in processing new messages being sent into the system.

Definition of STOMP :
Simple (or Streaming) Text Oriented Message Protocol (STOMP), formerly known as TTMP, is a simple text-based protocol, designed for working with message-oriented middleware. It provides an interoperable wire format that allows STOMP clients to talk with any message broker supporting the protocol. It is thus language-agnostic, meaning a broker developed for one language or platform can receive communications from client software developed in another language.

Why is Apache ZooKeeper used along with Hadoop ?

ZooKeeper will help you with coordination between Hadoop nodes.

For example, it makes it easier to:
    •    Manage configuration across nodes. If you have dozens or hundreds of nodes, it becomes hard to keep configuration in sync across nodes and quickly make changes. ZooKeeper helps you quickly push configuration changes.
    •    Implement reliable messaging. With ZooKeeper, you can easily implement a producer/consumer queue that guarantees delivery, even if some consumers or even one of the ZooKeeper servers fails.
    •    Implement redundant services. With ZooKeeper, a group of identical nodes (e.g. database servers) can elect a leader/master and let ZooKeeper refer all clients to that master server. If the master fails, ZooKeeper will assign a new leader and notify all clients.
    •    Synchronize process execution. With ZooKeeper, multiple nodes can coordinate the start and end of a process or calculation. This ensures that any follow-up processing is done only after all nodes have finished their calculations.

The interface provided by ZooKeeper is quite low-level. For example, in the configuration management example, the actual processing of the configuration changes must be developed as part of the application. However, ZooKeeper will ensure all clients are notified reliably and the order of configuration messages is maintained.

The functionality provided by ZooKeeper is often developed as part of Hadoop applications. However, these are tricky matters to get right, and it is easy to get errors in the implementation. ZooKeeper provides a solid foundation that helps build higher-level services. It also performs well in high-load situations, and it was used in several Yahoo! products, including the main crawler.

The purpose of Zookeepr is cluster management. This fits with the general philosophy of *nix of using smaller specialized components - so components of Hadoop that want clustering capabilities rely on Zookeeper for that rather than develop their own.

Zookeeper is a distributed storage that provides the following guarantees :
    •    Sequential Consistency - Updates from a client will be applied in the order that they were sent.
    •    Atomicity - Updates either succeed or fail. No partial results.
    •    Single System Image - A client will see the same view of the service regardless of the server that it connects to.
    •    Reliability - Once an update has been applied, it will persist from that time forward until a client overwrites the update.
    •    Timeliness - The clients view of the system is guaranteed to be up-to-date within a certain time bound.
You can use these to implement different "recipes" that are required for cluster management like locks, leader election etc.

Monday, February 17, 2014

Oozie: Workflow Engine for Hadoop

Outline
1. What is oozie
2. Do you need oozie
3. How to use oozie
4. Use case sharing

What Is Oozie ?
- Originally designed at Yahoo!
- Apache incubator project since 2011
- A web service that launches your jobs based on:
 - Time dependency
 - Data dependency
- Ability to rerun from last point of failure
- Monitoring

Do You Need Oozie ?
Q1: Having multiple jobs with dependency ?
Q2: Need to run jobs regularly ?
Q3: Need to check data availability ?
Q4: Need monitoring and operational support ?

If any one of your answer is YES,
then you should consider Oozie!

How To Use Oozie
1. Deploy your workflow on HDFS, this includes:
 - oozie job definitions (workflow.xml)
 - your codes: MR/pig/streaming/java etc.
 - libraries (.so & .jar)

2. Submit your job
 $ oozie job -run -config job.properties
 Workflow ID: 0123-123456-oozie-wrkf-W

3. Check job status
 $ oozie job -info 0123-123456-oozie-wrkf-W
 $ oozie job -log 0123-123456-oozie-wrkf-W

(submit coordinator using the same way)

Use Case Sharing
- Was using crontab + python scripts

- After porting to oozie:
 - Reduce code size (4906 -> 1708 lines)
 - More smooth processing (1 week delay -> 3 days)
 - More stable

What is Oozie?

About Oozie
Oozie is an open source project that simplifies workflow and coordina­tion between jobs. It provides users with the ability to define actions and dependencies between actions. Oozie will then schedule actions to execute when the required dependencies have been met.

A workflow in Oozie is defined in what is called a Directed Acyclical Graph (DAG). Acyclical means there are no loops in the graph (in other words, there’s a starting point and an ending point to the graph), and all tasks and dependencies point from start to end without going back. A DAG is made up of action nodes and dependency nodes. An action node can be a MapReducejob, a Pig application, a file system task, or a Java application. Flow control in the graph is represented by node elements that provide logic based on the input from the preceding task in the graph. Examples of flow control nodes are decisions, forks, and join nodes.

What is Oozie?   
•  Oozie    allows    a    user    to    create    Directed    Acyclic   
Graphs    of    workflows    and    these    can    be    ran    in   
parallel    and    sequential    in    Hadoop.   
•  Oozie    can    also    run    plain    java    classes,    Pig   
workflows,    and    interact    with    the    HDFS   
– Nice    if    you    need    to    delete    or    move    files    before    a   
job    runs   
•  Oozie    can    run    job’s    sequentially    (one after the other)    and    in    parallel    (multiple at a time)   

  Why    use    Oozie    instead    of    just   
cascading    a    jobs    one    after    another?   
•  Major    flexibility   
– Start,    Stop,    Suspend,    and    re-run    jobs   
•  Oozie    allows    you    to    restart    from    a    failure   
– You    can    tell    Oozie    to    restart    a    job    from    a    specific   
node    in    the    graph    or    to    skip    specific    failed    nodes   

Other    Features   
•  Java    Client    API    /    Command    Line    Interface   
– Launch,    control,    and    monitor    jobs    from    your    Java   
Apps   
•  Web    Service    API   
– You    can    control    jobs    from    anywhere   
•  Run    Periodic    jobs   
– Have    jobs    that    you    need    to    run    every    hour,    day,   
week?    Have    Oozie    run    the    jobs    for    you   
•  Receive    an    email    when    a    job    is    complete   

   How    do    you    make    a    workflow?   
•  First    make    a    Hadoop    job    and    make    sure    that    it    works   
using    the    jar    command    in    Hadoop   
–  This    ensures    that    the    configura)on    is    correct    for    your    job   
•  Make    a    jar    out    of    your    classes   
•  Then    make    a    workflow.xml    file    and    copy    all    of    the    job   
configura)on    proper)es    into    the    xml    file.        These   
include:   
–  Input    files   
– Output    files   
–  Input    readers    and    writers   
–  Mappers    and    reducers   
–  Job    specific    arguments   

    How    do    you    make    a    workflow?   
•  You    also    need    a    job.proper)es    file.        This    file   
defines    the    Name    node,    Job    tracker,    etc.   
•  It    also    gives    the    loca)on    of    the    shared    jars    and   
other    files   
•  When    you    have    these    files    ready,    you    need    to   
copy    them    into    the    HDFS    and    then    you    can   
run    them    from    the    command    line   

Concepts about usage of HBASE

NoSQL?
HBase is a type of "NoSQL" database. "NoSQL" is a general term meaning that the database isn't an RDBMS which supports SQL as its primary access language, but there are many types of NoSQL databases: BerkeleyDB is an example of a local NoSQL database, whereas HBase is very much a distributed database. Technically speaking, HBase is really more a "Data Store" than "Data Base" because it lacks many of the features you find in an RDBMS, such as typed columns, secondary indexes, triggers, and advanced query languages, etc.
However, HBase has many features which supports both linear and modular scaling. HBase clusters expand by adding RegionServers that are hosted on commodity class servers. If a cluster expands from 10 to 20 RegionServers, for example, it doubles both in terms of storage and as well as processing capacity. RDBMS can scale well, but only up to a point - specifically, the size of a single database server - and for the best performance requires specialized hardware and storage devices. HBase features of note are:
    •    Strongly consistent reads/writes: HBase is not an "eventually consistent" DataStore. This makes it very suitable for tasks such as high-speed counter aggregation.
    •    Automatic sharding: HBase tables are distributed on the cluster via regions, and regions are automatically split and re-distributed as your data grows.
    •    Automatic RegionServer failover
    •    Hadoop/HDFS Integration: HBase supports HDFS out of the box as its distributed file system.
    •    MapReduce: HBase supports massively parallelized processing via MapReduce for using HBase as both source and sink.
    •    Java Client API: HBase supports an easy to use Java API for programmatic access.
    •    Thrift/REST API: HBase also supports Thrift and REST for non-Java front-ends.
    •    Block Cache and Bloom Filters: HBase supports a Block Cache and Bloom Filters for high volume query optimization.
    •    Operational Management: HBase provides build-in web-pages for operational insight as well as JMX metrics.

When Should I Use HBase?
HBase isn't suitable for every problem.
First, make sure you have enough data. If you have hundreds of millions or billions of rows, then HBase is a good candidate. If you only have a few thousand/million rows, then using a traditional RDBMS might be a better choice due to the fact that all of your data might wind up on a single node (or two) and the rest of the cluster may be sitting idle.
Second, make sure you can live without all the extra features that an RDBMS provides (e.g., typed columns, secondary indexes, transactions, advanced query languages, etc.) An application built against an RDBMS cannot be "ported" to HBase by simply changing a JDBC driver, for example. Consider moving from an RDBMS to HBase as a complete redesign as opposed to a port.
Third, make sure you have enough hardware. Even HDFS doesn't do well with anything less than 5 DataNodes (due to things such as HDFS block replication which has a default of 3), plus a NameNode.
HBase can run quite well stand-alone on a laptop - but this should be considered a development configuration only.

What Is The Difference Between HBase and Hadoop/HDFS?
HDFS is a distributed file system that is well suited for the storage of large files. It's documentation states that it is not, however, a general purpose file system, and does not provide fast individual record lookups in files. HBase, on the other hand, is built on top of HDFS and provides fast record lookups (and updates) for large tables. This can sometimes be a point of conceptual confusion. HBase internally puts your data in indexed "StoreFiles" that exist on HDFS for high-speed lookups. See the Chapter 5, Data Model and the rest of this chapter for more information on how HBase achieves its goals.

Thursday, February 13, 2014

Some Important Definitions using in Hadoop

Distributed Processing :
Distributed processing is a phrase used to refer to a variety ofcomputer systems that use more than one computer (or processor) to run an application. This includes parallel processing in which a single computer uses more than one CPU to execute programs.
More often, however, distributed processing refers to local-area networks (LANs) designed so that a single program can run simultaneously at various sites. Most distributed processingsystems contain sophisticated software that detects idle CPUs on the network and parcels out programs to utilize them.
Another form of distributed processing involves distributed databases. This is databases in which the data is stored across two or more computer systems. The database system keeps track of where the data is so that the distributed nature of the database is not apparent to users.

Parallel Processing :
The simultaneous use of more than one CPU to execute aprogram. Ideally, parallel processing makes a program run faster because there are more engines (CPUs) running it. In practice, it is often difficult to divide a program in such a way that separate CPUs can execute different portions without interfering with each other.
Most computers have just one CPU, but some models have several. There are even computers with thousands of CPUs. With single-CPU computers, it is possible to perform parallel processing by connecting the computers in a network. However, this type of parallel processing requires very sophisticated software calleddistributed processing software.
Note that parallel processing differs from multitasking, in which a single CPU executes several programs at once.
Parallel processing is also called parallel computing.

MPP :
Short for massively parallel processing, a type of computing that uses many separate CPUs running in parallel to execute a singleprogram. MPP is similar to symmetric processing (SMP), with the main difference being that in SMP systems all the CPUs share the same memory, whereas in MPP systems, each CPU has its own memory. MPP systems are therefore more difficult to program because the application must be divided in such a way that all the executing segments can communicate with each other. On the other hand, MPP don't suffer from the bottleneck problems inherent in SMP systems when all the CPUs attempt to access the same memory at once.

Scalable Parallel Processor :
Abbreviated as SPP, a computer that utilizes parallel processingthat can be upgraded by adding more CPUs to it, effectively increasing its computing power.
Basically, scalability is determined by the ability to add to (or subtract from) an environment without having any adverse (mainly performance based) problems.

Scalability :
Basically, scalability is determined by the ability to add to (or subtract from) an environment without having any adverse (mainly performance based) problems.

One more definition of Distributed Processing :
The distribution of applications and business logic across multiple processing platforms.  Distributed processing implies that processing will occur on more than one processor in order for a transaction to be completed. In other words, processing is distributed across two or more machines and the processes are most likely not running at the same time, i.e. each process performs part of an application in a sequence. Often the data used in a distributed processing environment is also distributed across platforms.


Data Warehouse Definition

A data warehouse is a subject-oriented, integrated, time-variant and non-volatile collection of data in support of management's decision making process.

Subject-Oriented: A data warehouse can be used to analyze a particular subject area. For example, "sales" can be a particular subject.

Integrated: A data warehouse integrates data from multiple data sources. For example, source A and source B may have different ways of identifying a product, but in a data warehouse, there will be only a single way of identifying a product.

Time-Variant: Historical data is kept in a data warehouse. For example, one can retrieve data from 3 months, 6 months, 12 months, or even older data from a data warehouse. This contrasts with a transactions system, where often only the most recent data is kept. For example, a transaction system may hold the most recent address of a customer, where a data warehouse can hold all addresses associated with a customer.

Non-volatile: Once data is in the data warehouse, it will not change. So, historical data in a data warehouse should never be altered.

More concise definition of a data warehouse:
A data warehouse is a copy of transaction data specifically structured for query and analysis.

In computing, a data warehouse (DW, DWH), or an enterprise data warehouse (EDW), is a database used for reporting (1) and data analysis (2). Integrating data from one or more disparate sources creates a central repository of data, a data warehouse (DW). Data warehouses store current and historical data and are used for creating trending reports for senior management reporting such as annual and quarterly comparisons.

One more definition:
Abbreviated DW, a collection of data designed to support management decision making. Data warehouses contain a wide variety of data that present a coherent picture of business conditions at a single point in time.

Development of a data warehouse includes development of systems to extract data from operating systems plus installation of a warehouse database system that provides managers flexible access to the data.

The term data warehousing generally refers to the combination of many different databases across an entire enterprise. Contrast withdata mart.
 

Hadoop Vs Cassandra and HBase

The Vanilla hadoop consists of a Distributed File System (DFS) at the core and libraries to support Map Reduce model to write programs to do analysis. DFS is what enables Hadoop to be scalable. It takes care of chunking data into multiple nodes in a multi node cluster so that Map Reduce can work on individual chunks of data available nodes thus enabling parallelism.

Cassandra is a highly scalable, eventually consistent, distributed, structured key-value store. It is not a conventional database but is more like Hashtable or HashMap which stores a key/value pair. Both Cassandra and HBase are implementations of Google's BigTable. Paper for Google BigTable can be found here.

BigTable makes use of a String Sorted Table (SSTable) to store key/value pairs. SSTable is just a File in HDFS which stores key followed by value. Furthermore BigTable maintains a index which has key and offset in the File for that key which enables reading of value for that key using only a seek to the offset location. SSTable is effectively immutable which means after creating the File there is no modifications can be done to existing key/value pairs. New key/value pairs are appended to the file. Update and Delete of records are appended to the file, update with a newer key/value and deletion with a key and tombstone value. Duplicate keys are allowed in this file for SSTable.  The index is also modified with whenever update or delete take place so that offset for that key points to the latest value or tombstone value.

Thus you can see Cassandra's/HBase's internals allow fast read/write which is crucial for real time data handling. Whereas Vanilla Hadoop with Map Reduce can be used to process batch oriented passive data.

When to use HBase and when to use Hive

MapReduce is just a computing framework. HBase has nothing to do with it. That said, you can efficiently put or fetch data to/from HBase by writing MapReduce jobs. Alternatively you can write sequential programs using other HBase APIs, such as Java, to put or fetch the data. But we use Hadoop, HBase etc to deal with gigantic amounts of data, so that doesn't make much sense. Using normal sequential programs would be highly inefficient when your data is too huge.

Coming back to the first part of your question, Hadoop is basically 2 things - a Distributed FileSystem(HDFS)+a Computation or Processing framework(MapReduce). Like all other FS, HDFS also provides us storage, but in a fault tolerant manner with high throughput and lower risk of data loss(because of the replication). But, being a FS, HDFS lacks random read and write accees. This is where HBase comes into picture. It's a distributed, scalable, big data store, modelled after Google's BigTable. It stores data as key/value pairs.

Coming to Hive. It provides us data warehousing facilities on top of an existing Hadoop cluster. Along with that it provides an SQL like interface which makes your work easier, in case you are coming from an SQL background. You can create tables in Hive and store data there. Along with that you can even map your existing HBase tables to Hive and operate on them.

Consider that you work with RDBMS and have to select what to use - full table scans, or index access - but only one of them.
If you select full table scan - use hive. If index access - HBase.

Wednesday, February 12, 2014

Content Delivery Network (CDN)

A content delivery network (CDN) is an interconnected system of computers on the Internet that provides Web content rapidly to numerous users by duplicating the content on multipleserver s and directing the content to users based on proximity. CDNs are used by Internet service providers (ISPs) to deliver static or dynamic Web pages but the technology is especially well suited to streaming audio, video, and Internet television ( IPTV ) programming.

In a CDN, content exists in multiple copies on strategically dispersed servers. This is known as content replication. A large CDN can have thousands of servers, making it possible to provide identical content to many users efficiently and reliably even at times of maximum Internet traffic or during sudden demand "spikes." When a specific page, file, or program is requested by a user, the server closest to that user (in terms of the minimum number ofnode s between the server and the user) is dynamically determined. This optimizes the speed with which the content is delivered to that user.

The use of CDN technology has obvious economic advantages to enterprises who expect, or experience, large numbers of hits on their Web sites from locations all over the world. If dozens or hundreds of other users happen to select the same Web page or content simultaneously, the CDN sends the content to each of them without delay or time-out. Problems with excessive latency , as well as large variations in latency from moment to moment (which can cause annoying "jitter" in streaming audio and video), are minimized. The bandwidth each user "sees" is maximized. The difference is noticed most by users with high-speed Internet connections who often demand streaming content or large files.

Another advantage of CDN technology is content redundancy that provides a fail-safe feature and allows for graceful degradation in the event of damage to, or malfunction of, a part of the Internet. Even during a large-scale attack that disables many servers, content on a CDN will remain available to at least some users. Still another advantage of CDN technology is the fact that it inherently offers enhanced data backup, archiving, and storage capacity. This can benefit individuals and enterprises who rely on online data backup services.

Definition of Node :

In a network, a node is a connection point, either a redistribution point or an end point for data transmissions. In general, a node has programmed or engineered capability to recognize and process or forward transmissions to other nodes.

Large-Scale Data Sets Clustering Based on MapReduce and Hadoop


Abstract
MapReduce is a simplified programming model of distributed parallel computing.It is an important technology of
Google,and is commonly used for data—intensive distributed parallel computing.Cluster analysis is the most important
data mining methods. Efficient parallel algorithms and frameworks are the key to meeting the scalability and performance
requirements entailed in such scientific data analyses. In order to improve the deficiency of the long time in large-scale data
sets clustering on the single computer, we design and realize a parallel K- Means algorithm based on MapReduce
framework. This algorithm runs on Hadoop cluster. The results show that the MapReduce framework K-Means clustering
algorithm can obtain a higher performance when handling large-scale document automatic classification, and they prove
the effectiveness and accuracy of the algorithm.

Keywords: MapReduce; Hadoop; Parallel K-Means Clustering; Large-Scale data Sets

1. Introduction
Cluster analysis is one of the most important data mining methods. Document clustering is the act of
collecting similar documents into classes, where similarity is some function on a document. Document
clustering does not need separate training process, nor manual tagging group in advance. The documents in
the same clusters are more similar, while the documents in different clusters are more dissimilar.
MapReduce is a parallel programming technique derived from the functional programming concepts and
is proposed by Google for large-scale data processing in a distributed computing environment. Hadoop
project[1] provides a distributed file system(HDFS). Hadoop is a Java-based software framework that
enables data-intensive application in a distributed environment. Hadoop enables applications to work with
thousands of nodes and terabyte of data, without concerning the user with too much detail on the allocation
and distribution of data and calculation. Hadoop and MapReduce[2] realize the mass data storage, analysis
and exchange management technology.
In this paper, we introduce MapReduce parallel programming model; propose a MapReduce and the
Hadoop distributed clustering algorithm, the design and the implementation of the large-scale data
processing model based on MapReduce and Hadoop; and give some experimental results of the distributed
clustering based on MapReduce and Hadoop , and discuss the results.


2. Background
2.1. Hadoop Overview
When data sets go beyond a single storage capacity, it is necessary to distribute them to multiple
independent computers. Trans-computer network storage file management system is called distributed file
system. A typical Hadoop distributed file system contains thousands of servers, each server stores partial
data of file system. HDFS cluster configuration is simple. It just needs more servers and some simple
configuration to improve the Hadoop cluster computing power, storage capacity and IO bandwidth. In
addition HDFS achieves reliable data replication, fast fault detection and automatic recovery,etc..

2.2. MapReduce Overview
In a distributed data storage, when parallel processing the data, we need to consider much, such as
synchronization, concurrency, load balancing and other details of the underlying system. It makes the
simple calculation become very complex. MapReduce programming model [3] was proposed in 2004 by
the Google, which is used in processing and generating large data sets implementation. This framework
solves many problems, such as data distribution, job scheduling, fault tolerance, machine to machine
communication, etc.
MapReduce is applied in Google's Web search. Programmers need to write many programs for the
specific purpose to deal with the massive data distributed and stored in the server cluster, such as crawled
documents, web request logs, etc., in order to get the results of different data, such as inverted indices, web
document, different views, worms collected the number of pages for each host a summary of a given date
within the collection of the most common queries and so on.

3. MapReduce Programming Model
MapReduce programming model, by map and reduce function realize the Mapper and Reducer interfaces.
They form the core of task.
3.1. Mapper
Map function requires the user to handle the input of a pair of key value and produces a group of
intermediate key and value pairs. consists of two parts, value stands for the data related to the
task, key stands for the "group number " of the value . MapReduce combine the intermediate values with
same key and then send them to reduce function.
Map algorithm process is described as follows:


Step1. Hadoop and MapReduce framework produce a map task for each InputSplit, and each InputSplit
is generaed by the InputFormat of job. Each corresponds to a map task.

Step2. Execute Map task, process the input to form a new . This process is
called "divide into groups". That is, make the correlated values correspond to the same key words. Output
key value pairs that do not required the same type of the input key value pairs. A given input value pair can
be mapped into 0 or more output pairs.


Step3. Mapper's output is sorted to be allocated to each Reducer. The total number of blocks and the 
number of job reduce tasks is the same. Users can implement Partitioner interface to control which key is
assigned to which Reducer.

3.2. Reducer
Reduce function is also provided by the user, which handles the intermediate key pairs and the value set
relevant to the intermediate key value. Reduce function mergers these values, to get a small set of values.
the process is called "merge ". But this is not simple accumulation. There are complex operations in the
process. Reducer makes a group of intermediate values set that associated with the same key smaller.
In MapReduce framework, the programmer does not need to care about the details of data
communication, so is the communication interface for the programmer in MapReduce model.
can be seen as a "letter", key is the letter’s posting address, value is the letter’s content. With
the same address letters will be delivered to the same place. Programmers only need to set up correctly
, MapReduce framework can automatically and accurately cluster the values with the same key
together.
Reducer algorithm process is described as follows:


Step1. Shuffle. Input of Reducer is the output of sorted Mapper. In this stage, MapReduce will assign
related block for each Reducer.

Step2. Sort. In this stage, the input of reducer is grouped according to the key (because the output of
different mapper may have the same key). The two stages of Shuffle and Sort are synchronized;

Step3. Secondary Sort. If the key grouping rule in the intermediate process is different from its rule
before reduce. we can define a Comparator. The comparator is used to group intermediate keys for the
second time.
Map tasks and Reduce task is a whole, can not be separated. They should be used together in the
program. We call a MapReduce the process as an MR process. In an MR process, Map tasks run in parallel,
Reduce tasks run in parallel, Map and Reduce tasks run serially. An MR process and the next MR process
run in serial, synchronization between these operations is guaranteed by the MR system, without
programmer’s involvement.


4. MapReduce Parallel K-Means Based Clustering Algorithm
k-means clustering is a method of vector quantization, originally from signal processing, that is popular for cluster analysis in data mining. k-means clustering aims to partition n observations into k clusters in which each observation belongs to the cluster with the nearest mean, serving as a prototype of the cluster.

4.1. Document Vector Representation
Vector space model is the most widely used method in information retrieval. This model uses feature
entries and their weights to express the document information. Vector ( , , , , ) w1 w2 w3 wm
d = ⋯ stands for
the feature entry and its weight in document d, m is the number of all feature entries. (iw m),,1 i = ⋯ is the
weight of the entry ti
 in document d. The document vector set is the pattern or data object of the document
clustering.
Web document vector similarity function is a cosine function, whose returned value is [0,1]. The greater
the returned value is, the greater the similarity is.

4.2. Document Preprocessing Based on MapReduce
We introduce the calculation of word frequency by MapReduce. When calculating TF, map function reads
text. Each line of the text represents a document and it’s type. Key is the document’s type, value is the
document’s content. In the Intermediate output data, keys are the document’s type and the word in the
document. Values are the frequency of the word in the document. Reduce function accumulates all the
values with the same key to get the frequency of the word t in all documents. The calculation of DF, cluster
number and the document corpus and the total number of words is similar to that of TF.
First, we need to preprocess the document. That is, extract every word of the document, and calculate its
frequency. In the Map stage, when we meet a word, >. Is output. Hadoop
platform ensures that all the values with the same key are assigned to the same reducer.
 Doci figure above that documents, wij that word, nij is word frequency.
 In the Map phase, each input of Map is a document type idn and its content d. First, map function creates
a set H, then calculate t the frequency in H. Finally, output all elements of H. The keys of output are
pairs, values is the t frequency list.
 In Reduce stage, every reduce receives a key value pair. INITIALIZE function creates a list PostingList,
then processes each input. If meet a new word, output the existing list, otherwise, then add the document id
and document frequency into the list PostingList. Finally, output all the words and their corresponding
PostingList.


4.3. K-Means Clustering Methods
K-Means algorithm[4,5,6] is widely used clustering algorithm. Firstly, the algorithm randomly select 
initial objects. Each one represents a cluster center. The rest of the objects will be assigned to the nearest
cluster, according to their distances to different centers. Then calculate every center again. This operation is
repeated until the criterion function converges.
 Algorithm description as following:
 Input: The number of clusters k and n documents
 Output: k clusters
 Step1. Randomly select k documents from n documents as the initial cluster centers.
 Step2. Calculate the distances of the rest documents to the every center of the clusters, and assign each
of the rest documents to the nearest cluster.
 Step3. Calculate and adjust each cluster center.
 Step4. Iterate Step2 ~ Step3 until the criterion function convege. The program ends.

4.4. Parallel K-Means Clustering Algorithm Based on MapReduce and Hadoop
 Step1. The first stage is the document preprocessing. we divide a data set D into m subsets. There are
has two MapRedue jobs in this stage. The first job calculates the parameters that are required in the next
step. The following MapReduce job calculates the DFR and TFIDF of each term, and then extracts terms
and generates VSM. We create a file which contains : iteration number、cluster id、cluster coordinates、
number of documents assigned to the cluster.

 Step2. The second stage is the map function, the distance between each document and each cluster
center is calculated, then reads the input data and calculates the distance to each center. For each document,
it produces an output pair with: . It produces a lot of
data in this stage. we can use a combine function to reduce the size before sending it to Reduce. Combine
function is described as following.
 The Combine function calculates the average of the coordinates for each cluster id, along with the
number of documents. All data of the same current cluster are sent to a single reducer.


 Step3. The third stage is reduce function. In the reduce function, compute new cluster center coordinates .
Its output is written to the cluster file, and contain: iteration number、cluster id、the cluster center
coordinates、the size of the cluster.

Step4. At last the new cluster coordinates are compared to the original ones. If the criterion function
converges, the program end, and we have found the clusters. If not, use the newly generated cluster centers
and iterate Step 2 to 4.

6. Conclusions and Future Work
This work represents only a small first step in using the MapReduce programming technique in the process
of large-scale data Sets.
We take the advantage of the parallelIism of MapReduce to design a parallel K-Means clustering
algorithm based on MapReduce. This algorithm can automatically cluster the massive data, making full use
of the Hadoop cluster performance. It can finish the text clustering in a relatively short period of time.
Experiments show that it achieves high accuracy.
The main deficiency: we have to set the number k (the number of clusters to be generated) in advance,
and this algorithm is sensitive to initial values. Different initial values may lead to different results. And it
is sensitive to the "noise" and the outlier data.
We need to optimize from three aspects to reduce the running time based on Hadoop platform:,
The following three aspects can be optimized to reduce the level of the Hadoop cluster platform in the
run-time.

(1) Reduce program runs among multi-nodes instead of a single node.
(2) Platform optimization, we have not optimized the platform yet. The platform spends extra    management time in each iteration. We can reduce platform management time.
(3) Optimization of the algorithm itself. We need to custom outputs of mapper and reducer functions.
There is more efficient data format that the intermediate results can be stored. These options will enable
faster read and write. 




 

My Profile

My photo
can be reached at 09916017317