Apache Spark Пример: Програма за броење зборови во Јава
Апачи искра
Apache Spark е рамка за обработка на податоци со отворен код која може да врши аналитички операции на Big Data во дистрибуирана средина. Тоа беше академски проект во UC Berkley и првично беше започнат од Matei Zaharia во AMPLab на UC Berkeley во 2009 година. Apache Spark беше создаден на врвот на алатката за управување со кластери позната како Mesos. Ова подоцна беше изменето и надградено за да може да работи во средина базирана на кластери со дистрибуирана обработка.
Apache Spark Пример за поставување проект
Ќе го користиме Maven за да создадеме примерок проект за демонстрацијата. За да го креирате проектот, извршете ја следнава команда во директориумот што ќе го користите како работен простор:
mvn archetype:generate -DgroupId=com.journaldev.sparkdemo -DartifactId=JD-Spark-WordCount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
Ако го користите Maven за прв пат, ќе бидат потребни неколку секунди за да се изврши командата за генерирање бидејќи maven треба да ги преземе сите потребни приклучоци и артефакти за да ја направи задачата за генерирање. Откако ќе го креирате проектот, слободно отворете го во вашиот омилен IDE. Следниот чекор е да додадете соодветни Maven Dependencies на проектот. Еве ја датотеката pom.xml
со соодветните зависности:
<dependencies>
<!-- Import Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.geekcap.javaworld.sparkexample.WordCount</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>install</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Бидејќи ова е проект базиран на мавен, всушност нема потреба да инсталирате и поставувате Apache Spark на вашата машина. Кога ќе го извршиме овој проект, ќе се стартува инстанца за извршување на Apache Spark и штом програмата ќе заврши со извршувањето, таа ќе се исклучи. Конечно, за да ги разбереме сите JAR-и кои се додаваат на проектот кога ја додадовме оваа зависност, можеме да извршиме едноставна команда Maven која ни овозможува да видиме целосно стебло на зависност за проект кога додаваме некои зависности во него. Еве команда што можеме да ја користиме:
mvn dependency:tree
Кога ќе ја извршиме оваа команда, таа ќе ни го прикаже следното стебло на зависност:
shubham:JD-Spark-WordCount shubham$ mvn dependency:tree
[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.journaldev:java-word-count:jar:1.0-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-jar-plugin is missing. @ line 41, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] -------------------< com.journaldev:java-word-count >-------------------
[INFO] Building java-word-count 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ java-word-count ---
[INFO] com.journaldev:java-word-count:jar:1.0-SNAPSHOT
[INFO] +- org.apache.spark:spark-core_2.11:jar:1.4.0:compile
[INFO] | +- com.twitter:chill_2.11:jar:0.5.0:compile
[INFO] | | \- com.esotericsoftware.kryo:kryo:jar:2.21:compile
[INFO] | | +- com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile
[INFO] | | +- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO] | | \- org.objenesis:objenesis:jar:1.2:compile
[INFO] | +- com.twitter:chill-java:jar:0.5.0:compile
[INFO] | +- org.apache.hadoop:hadoop-client:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-common:jar:2.2.0:compile
[INFO] | | | +- commons-cli:commons-cli:jar:1.2:compile
[INFO] | | | +- org.apache.commons:commons-math:jar:2.1:compile
[INFO] | | | +- xmlenc:xmlenc:jar:0.52:compile
[INFO] | | | +- commons-io:commons-io:jar:2.1:compile
[INFO] | | | +- commons-logging:commons-logging:jar:1.1.1:compile
[INFO] | | | +- commons-lang:commons-lang:jar:2.5:compile
[INFO] | | | +- commons-configuration:commons-configuration:jar:1.6:compile
[INFO] | | | | +- commons-collections:commons-collections:jar:3.2.1:compile
[INFO] | | | | +- commons-digester:commons-digester:jar:1.8:compile
[INFO] | | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
[INFO] | | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
[INFO] | | | +- org.codehaus.jackson:jackson-core-asl:jar:1.8.8:compile
[INFO] | | | +- org.apache.avro:avro:jar:1.7.4:compile
[INFO] | | | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile
[INFO] | | | +- org.apache.hadoop:hadoop-auth:jar:2.2.0:compile
[INFO] | | | \- org.apache.commons:commons-compress:jar:1.4.1:compile
[INFO] | | | \- org.tukaani:xz:jar:1.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-hdfs:jar:2.2.0:compile
[INFO] | | | \- org.mortbay.jetty:jetty-util:jar:6.1.26:compile
[INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-app:jar:2.2.0:compile
[INFO] | | | +- org.apache.hadoop:hadoop-mapreduce-client-common:jar:2.2.0:compile
[INFO] | | | | +- org.apache.hadoop:hadoop-yarn-client:jar:2.2.0:compile
[INFO] | | | | | +- com.google.inject:guice:jar:3.0:compile
[INFO] | | | | | | +- javax.inject:javax.inject:jar:1:compile
[INFO] | | | | | | \- aopalliance:aopalliance:jar:1.0:compile
[INFO] | | | | | +- com.sun.jersey.jersey-test-framework:jersey-test-framework-grizzly2:jar:1.9:compile
[INFO] | | | | | | +- com.sun.jersey.jersey-test-framework:jersey-test-framework-core:jar:1.9:compile
[INFO] | | | | | | | +- javax.servlet:javax.servlet-api:jar:3.0.1:compile
[INFO] | | | | | | | \- com.sun.jersey:jersey-client:jar:1.9:compile
[INFO] | | | | | | \- com.sun.jersey:jersey-grizzly2:jar:1.9:compile
[INFO] | | | | | | +- org.glassfish.grizzly:grizzly-https:jar:2.1.2:compile
[INFO] | | | | | | | \- org.glassfish.grizzly:grizzly-framework:jar:2.1.2:compile
[INFO] | | | | | | | \- org.glassfish.gmbal:gmbal-api-only:jar:3.0.0-b023:compile
[INFO] | | | | | | | \- org.glassfish.external:management-api:jar:3.0.0-b012:compile
[INFO] | | | | | | +- org.glassfish.grizzly:grizzly-http-server:jar:2.1.2:compile
[INFO] | | | | | | | \- org.glassfish.grizzly:grizzly-rcm:jar:2.1.2:compile
[INFO] | | | | | | +- org.glassfish.grizzly:grizzly-http-servlet:jar:2.1.2:compile
[INFO] | | | | | | \- org.glassfish:javax.servlet:jar:3.1:compile
[INFO] | | | | | +- com.sun.jersey:jersey-json:jar:1.9:compile
[INFO] | | | | | | +- org.codehaus.jettison:jettison:jar:1.1:compile
[INFO] | | | | | | | \- stax:stax-api:jar:1.0.1:compile
[INFO] | | | | | | +- com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile
[INFO] | | | | | | | \- javax.xml.bind:jaxb-api:jar:2.2.2:compile
[INFO] | | | | | | | \- javax.activation:activation:jar:1.1:compile
[INFO] | | | | | | +- org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile
[INFO] | | | | | | \- org.codehaus.jackson:jackson-xc:jar:1.8.3:compile
[INFO] | | | | | \- com.sun.jersey.contribs:jersey-guice:jar:1.9:compile
[INFO] | | | | \- org.apache.hadoop:hadoop-yarn-server-common:jar:2.2.0:compile
[INFO] | | | \- org.apache.hadoop:hadoop-mapreduce-client-shuffle:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-yarn-api:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.2.0:compile
[INFO] | | | \- org.apache.hadoop:hadoop-yarn-common:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-jobclient:jar:2.2.0:compile
[INFO] | | \- org.apache.hadoop:hadoop-annotations:jar:2.2.0:compile
[INFO] | +- org.apache.spark:spark-launcher_2.11:jar:1.4.0:compile
[INFO] | +- org.apache.spark:spark-network-common_2.11:jar:1.4.0:compile
[INFO] | +- org.apache.spark:spark-network-shuffle_2.11:jar:1.4.0:compile
[INFO] | +- org.apache.spark:spark-unsafe_2.11:jar:1.4.0:compile
[INFO] | +- net.java.dev.jets3t:jets3t:jar:0.7.1:compile
[INFO] | | +- commons-codec:commons-codec:jar:1.3:compile
[INFO] | | \- commons-httpclient:commons-httpclient:jar:3.1:compile
[INFO] | +- org.apache.curator:curator-recipes:jar:2.4.0:compile
[INFO] | | +- org.apache.curator:curator-framework:jar:2.4.0:compile
[INFO] | | | \- org.apache.curator:curator-client:jar:2.4.0:compile
[INFO] | | +- org.apache.zookeeper:zookeeper:jar:3.4.5:compile
[INFO] | | | \- jline:jline:jar:0.9.94:compile
[INFO] | | \- com.google.guava:guava:jar:14.0.1:compile
[INFO] | +- org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:compile
[INFO] | +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] | +- org.apache.commons:commons-math3:jar:3.4.1:compile
[INFO] | +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] | +- org.slf4j:slf4j-api:jar:1.7.10:compile
[INFO] | +- org.slf4j:jul-to-slf4j:jar:1.7.10:compile
[INFO] | +- org.slf4j:jcl-over-slf4j:jar:1.7.10:compile
[INFO] | +- log4j:log4j:jar:1.2.17:compile
[INFO] | +- org.slf4j:slf4j-log4j12:jar:1.7.10:compile
[INFO] | +- com.ning:compress-lzf:jar:1.0.3:compile
[INFO] | +- org.xerial.snappy:snappy-java:jar:1.1.1.7:compile
[INFO] | +- net.jpountz.lz4:lz4:jar:1.2.0:compile
[INFO] | +- org.roaringbitmap:RoaringBitmap:jar:0.4.5:compile
[INFO] | +- commons-net:commons-net:jar:2.2:compile
[INFO] | +- org.spark-project.akka:akka-remote_2.11:jar:2.3.4-spark:compile
[INFO] | | +- org.spark-project.akka:akka-actor_2.11:jar:2.3.4-spark:compile
[INFO] | | | \- com.typesafe:config:jar:1.2.1:compile
[INFO] | | +- io.netty:netty:jar:3.8.0.Final:compile
[INFO] | | +- org.spark-project.protobuf:protobuf-java:jar:2.5.0-spark:compile
[INFO] | | \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
[INFO] | +- org.spark-project.akka:akka-slf4j_2.11:jar:2.3.4-spark:compile
[INFO] | +- org.scala-lang:scala-library:jar:2.11.6:compile
[INFO] | +- org.json4s:json4s-jackson_2.11:jar:3.2.10:compile
[INFO] | | \- org.json4s:json4s-core_2.11:jar:3.2.10:compile
[INFO] | | +- org.json4s:json4s-ast_2.11:jar:3.2.10:compile
[INFO] | | \- org.scala-lang:scalap:jar:2.11.0:compile
[INFO] | | \- org.scala-lang:scala-compiler:jar:2.11.0:compile
[INFO] | | +- org.scala-lang.modules:scala-xml_2.11:jar:1.0.1:compile
[INFO] | | \- org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.1:compile
[INFO] | +- com.sun.jersey:jersey-server:jar:1.9:compile
[INFO] | | \- asm:asm:jar:3.1:compile
[INFO] | +- com.sun.jersey:jersey-core:jar:1.9:compile
[INFO] | +- org.apache.mesos:mesos:jar:shaded-protobuf:0.21.1:compile
[INFO] | +- io.netty:netty-all:jar:4.0.23.Final:compile
[INFO] | +- com.clearspring.analytics:stream:jar:2.7.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-core:jar:3.1.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-jvm:jar:3.1.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-json:jar:3.1.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-graphite:jar:3.1.0:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-databind:jar:2.4.4:compile
[INFO] | | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.4.0:compile
[INFO] | | \- com.fasterxml.jackson.core:jackson-core:jar:2.4.4:compile
[INFO] | +- com.fasterxml.jackson.module:jackson-module-scala_2.11:jar:2.4.4:compile
[INFO] | | +- org.scala-lang:scala-reflect:jar:2.11.2:compile
[INFO] | | \- com.thoughtworks.paranamer:paranamer:jar:2.6:compile
[INFO] | +- org.apache.ivy:ivy:jar:2.4.0:compile
[INFO] | +- oro:oro:jar:2.0.8:compile
[INFO] | +- org.tachyonproject:tachyon-client:jar:0.6.4:compile
[INFO] | | \- org.tachyonproject:tachyon:jar:0.6.4:compile
[INFO] | +- net.razorvine:pyrolite:jar:4.4:compile
[INFO] | +- net.sf.py4j:py4j:jar:0.8.2.1:compile
[INFO] | \- org.spark-project.spark:unused:jar:1.0.0:compile
[INFO] \- junit:junit:jar:4.11:test
[INFO] \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.987 s
[INFO] Finished at: 2018-04-07T15:50:34+05:30
[INFO] ------------------------------------------------------------------------
Со само две додадени зависности, Spark ги собра сите потребни зависности во проектот кој ги вклучува зависностите на Scala како и Apache Spark е напишан во самата Scala.
Креирање на влезна датотека
Додека ќе создадеме програма Word Counter, ќе создадеме примерок за влезна датотека за нашиот проект во root директориумот на нашиот проект со име input.txt. Ставете каква било содржина во неа, го користиме следниов текст:
Hello, my name is Shubham and I am author at JournalDev . JournalDev is a great website to ready
great lessons about Java, Big Data, Python and many more Programming languages.
Big Data lessons are difficult to find but at JournalDev , you can find some excellent
pieces of lessons written on Big Data.
Слободно можете да користите кој било текст во оваа датотека.
Структура на проектот
Креирање на WordCounter
Сега, ние сме подготвени да започнеме со пишување на нашата програма. Кога ќе почнете да работите со програмите за големи податоци, увозот може да создаде многу конфузија. За да се избегне ова, еве ги сите увози што ќе ги користиме во нашиот проект:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
Следно, еве ја структурата на нашата класа што ќе ја користиме:
package com.journaldev.sparkdemo;
...imports...
public class WordCounter {
private static void wordCount(String fileName) {
...
}
public static void main(String[] args) {
...
}
}
Целата логика ќе лежи во методот wordCount
. Ќе започнеме со дефинирање на објект за класата SparkConf
. Објектот оваа класа се користи за поставување на различни параметри на Spark како парови клуч-вредност за програмата. Ние обезбедуваме само едноставни параметри:
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");
master
одредува локално, што значи дека оваа програма треба да се поврзе со нишката Spark што работи на localhost
. Името на апликацијата е само начин да се обезбеди Spark со метаподатоците на апликацијата. Сега, можеме да конструираме објект Spark Context со овој конфигурациски објект:
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
Spark го смета секој ресурс што треба да го обработи како RDD (отпорни дистрибуирани збирки на податоци) што му помага да ги организира податоците во структура на податоци за наоѓање која е многу поефикасна за анализа. Сега ќе ја конвертираме влезната датотека во самиот објект JavaRDD
:
JavaRDD<String> inputFile = sparkContext.textFile(fileName);
Сега ќе користиме Java 8 API за да ја обработиме датотеката JavaRDD
и да ги поделиме зборовите што ги содржи датотеката на посебни зборови:
JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));
Повторно, го користиме методот Java 8 mapToPair(...)
за да ги броиме зборовите и да обезбедиме пар збор, број
кој може да се претстави како излез:
JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);
Сега, можеме да ја зачуваме излезната датотека како текстуална датотека:
countData.saveAsTextFile("CountData");
Конечно, можеме да ја обезбедиме влезната точка во нашата програма со методот main()
:
public static void main(String[] args) {
if (args.length == 0) {
System.out.println("No files provided.");
System.exit(0);
}
wordCount(args[0]);
}
Целосната датотека изгледа вака:
package com.journaldev.sparkdemo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class WordCounter {
private static void wordCount(String fileName) {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
JavaRDD<String> inputFile = sparkContext.textFile(fileName);
JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));
JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);
countData.saveAsTextFile("CountData");
}
public static void main(String[] args) {
if (args.length == 0) {
System.out.println("No files provided.");
System.exit(0);
}
wordCount(args[0]);
}
}
Сега ќе продолжиме да ја извршуваме оваа програма користејќи го самиот Maven.
Вклучување на апликацијата
За да ја стартувате апликацијата, одете во root директориумот на програмата и извршете ја следнава команда:
mvn exec:java -Dexec.mainClass=com.journaldev.sparkdemo.WordCounter -Dexec.args="input.txt"
Заклучок
Во оваа лекција, видовме како можеме да користиме Apache Spark во проект базиран на Maven за да направиме едноставна, но ефикасна програма за бројачи на Word. Прочитајте повеќе Објави за големи податоци за да стекнете подлабоко знаење за достапните алатки за големи податоци и рамки за обработка.
Преземете го изворниот код
Преземете го проектот Spark WordCounter: JD-Spark-WordCount