<em id="pn7p8"><acronym id="pn7p8"><u id="pn7p8"></u></acronym></em>

    <th id="pn7p8"></th>

    <button id="pn7p8"></button>

      <dd id="pn7p8"></dd>
      <progress id="pn7p8"><track id="pn7p8"></track></progress>

      Linux培訓
      達內IT學院

      400-111-8989

      編寫一個Spark應用

      • 發布:Linux培訓
      • 來源:Linux教程
      • 時間:2016-11-01 14:18

      編寫Spark應用與通過交互式控制臺使用Spark類似。API是相同的。首先,你需要訪問

      使用Spark編寫Spark應用的一個基本模板如下:

      ## Spark Application - execute with spark-submit
      
      ## Imports
      from pyspark import SparkConf, SparkContext
      
      ## Module Constants
      APP_NAME = "My Spark Application"
      
      ## Closure Functions
      
      ## Main functionality
      
      def main(sc):
      pass
      
      if __name__ == "__main__":
      
      # Configure Spark
      conf = SparkConf().setAppName(APP_NAME)
      conf = conf.setMaster("local[*]")
      sc   = SparkContext(conf=conf)
      
      
      # Execute Main functionality
      main(sc)

      這個模板列出了一個Spark應用所需的東西:導入Python庫,模塊常量,用于調試和Spark UI的可識別的應用名稱,還有作為驅動程序運行的一些主要分析方法學。在ifmain中,我們創建了SparkContext,使用了配置好的context執行main。我們可以簡單地導入驅動代碼到pyspark而不用執行。注意這里Spark配置通過setMaster方法被硬編碼到SparkConf,一般你應該允許這個值通過命令行來設置,所以你能看到這行做了占位符注釋。

      使用<sc.stop()或<sys.exit(0)來關閉或退出程序。

      ## Spark Application - execute with spark-submit
      
      ## Imports
      import csv
      import matplotlib.pyplot as plt
      
      from StringIO import StringIO
      from datetime import datetime
      from collections import namedtuple
      from operator import add, itemgetter
      from pyspark import SparkConf, SparkContext
      
      ## Module Constants
      APP_NAME = "Flight Delay Analysis"
      DATE_FMT = "%Y-%m-%d"
      TIME_FMT = "%H%M"
      
      fields   = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
      'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
      Flight   = namedtuple('Flight', fields)
      
      ## Closure Functions
      def parse(row):
      
      """
      
      Parses a row and returns a named tuple.
      
      """
      
      row[0]  = datetime.strptime(row[0], DATE_FMT).date()
      row[5]  = datetime.strptime(row[5], TIME_FMT).time()
      row[6]  = float(row[6])
      row[7]  = datetime.strptime(row[7], TIME_FMT).time()
      row[8]  = float(row[8])
      row[9]  = float(row[9])
      row[10] = float(row[10])
      return Flight(*row[:11])
      
      def split(line):
      
      """
      
      Operator function for splitting a line with csv module
      
      """
      reader = csv.reader(StringIO(line))
      return reader.next()
      
      def plot(delays):
      
      """
      
      Show a bar chart of the total delay per airline
      
      """
      airlines = [d[0] for d in delays]
      minutes  = [d[1] for d in delays]
      index    = list(xrange(len(airlines)))
      
      fig, axe = plt.subplots()
      bars = axe.barh(index, minutes)
      
      
      # Add the total minutes to the right
      for idx, air, min in zip(index, airlines, minutes):
      if min > 0:
      bars[idx].set_color('#d9230f')
      axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
      else:
      bars[idx].set_color('#469408')
      axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')
      
      
      # Set the ticks
      ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
      xt = plt.xticks()[0]
      plt.xticks(xt, [' '] * len(xt))
      
      
      # minimize chart junk
      plt.grid(axis = 'x', color ='white', linestyle='-')
      
      plt.title('Total Minutes Delayed per Airline')
      plt.show()
      
      ## Main functionality
      def main(sc):
      
      
      # Load the airlines lookup dictionary
      airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())
      
      
      # Broadcast the lookup dictionary to the cluster
      airline_lookup = sc.broadcast(airlines)
      
      
      # Read the CSV Data into an RDD
      flights = sc.textFile("ontime/flights.csv").map(split).map(parse)
      
      
      # Map the total delay to the airline (joined using the broadcast value)
      delays  = flights.map(lambda f: (airline_lookup.value[f.airline],
                        add(f.dep_delay, f.arv_delay)))
      
      
      # Reduce the total delay for the month to the airline
      delays  = delays.reduceByKey(add).collect()
      delays  = sorted(delays, key=itemgetter(1))
      
      
      # Provide output from the driver
      for d in delays:
      print "%0.0f minutes delayed\t%s" % (d[1], d[0])
      
      
      # Show a bar chart of the delays
      plot(delays)
      
      if __name__ == "__main__":
      
      # Configure Spark
      conf = SparkConf().setMaster("local[*]")
      conf = conf.setAppName(APP_NAME)
      sc   = SparkContext(conf=conf)
      
      
      # Execute Main functionality
      main(sc)

      使用<spark-submit命令來運行這段代碼(假設你已有ontime目錄,目錄中有兩個CSV文件):
      
      ~$ spark-submit app.py

      這個Spark作業使用本機作為master,并搜索app.py同目錄下的ontime目錄下的2個CSV文件。最終結果顯示,4月的總延誤時間(單位分鐘),既有早點的(如果你從美國大陸飛往夏威夷或者阿拉斯加),但對大部分大型航空公司都是延誤的。注意,我們在app.py中使用matplotlib直接將結果可視化出來了:

      這段代碼做了什么呢?我們特別注意下與Spark最直接相關的main函數。首先,我們加載CSV文件到RDD,然后把split函數映射給它。split函數使用csv模塊解析文本的每一行,并返回代表每行的元組。最后,我們將collect動作傳給RDD,這個動作把數據以Python列表的形式從RDD傳回驅動程序。本例中,airlines.csv是個小型的跳轉表(jump table),可以將航空公司代碼與全名對應起來。我們將轉移表存儲為Python字典,然后使用sc.broadcast廣播給集群上的每個節點。

      接著,main函數加載了數據量更大的flights.csv([譯者注]作者筆誤寫成fights.csv,此處更正)。拆分CSV行完成之后,我們將parse函數映射給CSV行,此函數會把日期和時間轉成Python的日期和時間,并對浮點數進行合適的類型轉換。每行作為一個NamedTuple保存,名為Flight,以便高效簡便地使用。

      有了Flight對象的RDD,我們映射一個匿名函數,這個函數將RDD轉換為一些列的鍵值對,其中鍵是航空公司的名字,值是到達和出發的延誤時間總和。使用reduceByKey動作和add操作符可以得到每個航空公司的延誤時間總和,然后RDD被傳遞給驅動程序(數據中航空公司的數目相對較少)。最終延誤時間按照升序排列,輸出打印到了控制臺,并且使用matplotlib進行了可視化。

      這個例子稍長,但是希望能演示出集群和驅動程序之間的相互作用(發送數據進行分析,結果取回給驅動程序),以及Python代碼在Spark應用中的角色。

      預約申請免費試聽課

      填寫下面表單即可預約申請免費試聽!怕錢不夠?可就業掙錢后再付學費! 怕學不會?助教全程陪讀,隨時解惑!擔心就業?一地學習,可全國推薦就業!

      上一篇:Spark的執行
      下一篇:高薪Linux運維工程師的十個基本技能點
      • 掃碼領取資料

        回復關鍵字:視頻資料

        免費領取 達內課程視頻學習資料

      • 視頻學習QQ群

        添加QQ群:1143617948

        免費領取達內課程視頻學習資料

      Copyright ? 2021 Tedu.cn All Rights Reserved 京ICP備08000853號-56 京公網安備 11010802029508號 達內時代科技集團有限公司 版權所有

      選擇城市和中心
      黑龍江省

      吉林省

      河北省

      湖南省

      貴州省

      云南省

      廣西省

      海南省

      高清特黄a大片,日本真人真做爰,特级做人爱C级,免费a级毛片 百度 好搜 搜狗
      <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <文本链> <文本链> <文本链> <文本链> <文本链> <文本链>