玩转 Oryx2 (一)

准备

环境

  • CDH 5.5.2, Parcel
    • HDFS
    • YARN
    • Zookeeper
    • Kafka
    • Spark (on YARN)

下载最新版本

  • 进入下载页面,分别下载

    • compute-classpath.sh
    • oryx-batch-2.1.2.jar
    • oryx-run.sh
    • oryx-serving-2.1.2.jar
    • oryx-speed-2.1.2.jar
  • 下载conf文件

    • 以ALS为例,在源码目录oryx/app/conf下als-example.conf

    • 并将其重命名为oryx.conf(注:文件需要和每个层的JAR文件放在同一个目录下)

    • 修改conf文件

# 现阶段只需要修改这么几个字段就OK了
kafka-brokers = "data-mining-46.slave:9092,data-mining-47.slave:9092,data-mining-48.slave:9092,data-mining-49.master:9092"
zk-servers = "data-mining-46.slave:2181,data-mining-47.slave:2181,data-mining-48.slave:2181/kafka"
hdfs-base = "hdfs:///Oryx"

oryx {
id = "ALSExample"
als {
  rescorer-provider-class = null
}
input-topic {
  broker = ${kafka-brokers}
  lock = {
    master = ${zk-servers}
  }
...

开始

[root@data-mining-49 oryx]# ./oryx-run.sh kafka-setup
Input   ZK      data-mining-46.slave:2181,data-mining-47.slave:2181,data-mining-48.slave:2181/kafka
        Kafka   data-mining-46.slave:9092,data-mining-47.slave:9092,data-mining-48.slave:9092,data-mining-49.master:9092
        topic   OryxInput
Update  ZK      data-mining-46.slave:2181,data-mining-47.slave:2181,data-mining-48.slave:2181/kafka
        Kafka   data-mining-46.slave:9092,data-mining-47.slave:9092,data-mining-48.slave:9092,data-mining-49.master:9092
        topic   OryxUpdate

All available topics:


Input topic OryxInput does not exist. Create it? y
Creating topic OryxInput
Error while executing topic command : org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids
[2016-04-08 16:53:09,383] ERROR org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
NoNode for /brokers/ids
        at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:995)
        at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:675)
        at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:671)
        at kafka.utils.ZkUtils.getChildren(ZkUtils.scala:537)
        at kafka.utils.ZkUtils.getSortedBrokerList(ZkUtils.scala:172)
        at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:243)
        at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:107)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
        at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1468)
        at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1496)
        at org.I0Itec.zkclient.ZkConnection.getChildren(ZkConnection.java:114)
        at org.I0Itec.zkclient.ZkClient$4.call(ZkClient.java:678)
        at org.I0Itec.zkclient.ZkClient$4.call(ZkClient.java:675)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:985)
        ... 8 more
 (kafka.admin.TopicCommand$)
Status of topic OryxInput:
y
Update topic OryxUpdate does not exist. Create it?
Creating topic OryxUpdate
Error while executing topic command : org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids
[2016-04-08 16:53:11,094] ERROR org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids
        at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:995)
        at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:675)
        at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:671)
        at kafka.utils.ZkUtils.getChildren(ZkUtils.scala:537)
        at kafka.utils.ZkUtils.getSortedBrokerList(ZkUtils.scala:172)
        at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:243)
        at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:107)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
        at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1468)
        at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1496)
        at org.I0Itec.zkclient.ZkConnection.getChildren(ZkConnection.java:114)
        at org.I0Itec.zkclient.ZkClient$4.call(ZkClient.java:678)
        at org.I0Itec.zkclient.ZkClient$4.call(ZkClient.java:675)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:985)
        ... 8 more
 (kafka.admin.TopicCommand$)
Error while executing topic command : Topic OryxUpdate does not exist on ZK path data-mining-46.slave:2181,data-mining-47.slave:2181,data-mining-48.slave:2181/kafka
[2016-04-08 16:53:11,833] ERROR java.lang.IllegalArgumentException: Topic OryxUpdate does not exist on ZK path data-mining-46.slave:2181,data-mining-47.slave:2181,data-mining-48.slave:2181/kafka
        at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:119)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:62)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)
Status of topic OryxUpdate:

[root@data-mining-49 oryx]#

此时出现了一个问题:

Error while executing topic command : org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids

这个问题应该是:CDH没有启动Kafka服务没有启动。通过CDH后台启动Kafka服务(如果没有添加Kafka服务,则需要先添加)。

![Kafka Service](/assets/images/posts/oryx/add-kafka-service.png)

此时重新执行setup命令:

[root@data-mining-49 oryx]# ./oryx-run.sh kafka-setup
Input   ZK      data-mining-46.slave:2181,data-mining-47.slave:2181,data-mining-48.slave:2181/kafka
        Kafka   data-mining-47.slave:9092,data-mining-48.slave:9092
        topic   OryxInput
Update  ZK      data-mining-46.slave:2181,data-mining-47.slave:2181,data-mining-48.slave:2181/kafka
        Kafka   data-mining-47.slave:9092,data-mining-48.slave:9092
        topic   OryxUpdate

All available topics:


Input topic OryxInput does not exist. Create it? y
Creating topic OryxInput
Created topic "OryxInput".
Status of topic OryxInput:
Topic:OryxInput PartitionCount:4        ReplicationFactor:1     Configs:
        Topic: OryxInput        Partition: 0    Leader: 140     Replicas: 140   Isr: 140
        Topic: OryxInput        Partition: 1    Leader: 141     Replicas: 141   Isr: 141
        Topic: OryxInput        Partition: 2    Leader: 140     Replicas: 140   Isr: 140
        Topic: OryxInput        Partition: 3    Leader: 141     Replicas: 141   Isr: 141

Update topic OryxUpdate does not exist. Create it? y
Creating topic OryxUpdate
Created topic "OryxUpdate".
Updated config for topic "OryxUpdate".
Status of topic OryxUpdate:
Topic:OryxUpdate        PartitionCount:1        ReplicationFactor:1     Configs:retention.ms=86400000,max.message.bytes=16777216
        Topic: OryxUpdate       Partition: 0    Leader: 140     Replicas: 140   Isr: 140

[root@data-mining-49 oryx]#

OK!

需要将如上的二进制脚本都同步到其他集群的节点上了

[root@data-mining-49 data]# scp -r oryx/ root@192.168.1.48:/data/
compute-classpath.sh    100% 1893     1.9KB/s   00:00
oryx-run.sh             100%   13KB  13.2KB/s   00:00
oryx-batch-2.1.2.jar    100%   26MB  26.0MB/s   00:00
oryx-serving-2.1.2.jar  100%   33MB  33.0MB/s   00:01
oryx-speed-2.1.2.jar    100%   26MB  26.0MB/s   00:00
oryx.conf               100%   1884  1.8KB/s    00:00

接下来我们需要找一个可用的数据集:

MovieLens 100K Dataset

将这个数据集的格式修改下:

[root@data-mining-49 ml-100k]# tr '\t' ',' < u.data > data.csv
[root@data-mining-49 ml-100k]# tail data.csv
806,421,4,882388897
676,538,4,892685437
721,262,3,877137285
913,209,2,881367150
378,78,3,880056976
880,476,3,880175444
716,204,5,879795543
276,1090,1,874795795
13,225,2,882399156
12,203,3,879959583
[root@data-mining-49 ml-100k]#

数据准备完毕,我们可以通过

[root@data-mining-49 oryx]# ./oryx-run.sh kafka-input --input-file data.csv

命令导入系统, 同时打开另一个终端窗口,通过命令

[root@data-mining-49 oryx]# ./oryx-run.sh kafka-tail
Input   ZK      data-mining-46.slave:2181,data-mining-47.slave:2181,data-mining-48.slave:2181/kafka
        Kafka   data-mining-47.slave:9092,data-mining-48.slave:9092
        topic   OryxInput
Update  ZK      data-mining-46.slave:2181,data-mining-47.slave:2181,data-mining-48.slave:2181/kafka
        Kafka   data-mining-47.slave:9092,data-mining-48.slave:9092
        topic   OryxUpdate

279,64,1,875308510
646,750,3,888528902
654,370,2,887863914
617,582,4,883789294
913,690,3,880824288
660,229,2,891406212
421,498,4,892241344
495,1091,4,888637503
806,421,4,882388897
676,538,4,892685437
721,262,3,877137285
913,209,2,881367150
378,78,3,880056976
880,476,3,880175444
716,204,5,879795543
276,1090,1,874795795
13,225,2,882399156
12,203,3,879959583

实时跟踪输入导入情况,当数据全部导入完毕后,用户可以手动的通过Ctrl-C关闭这个命令。

如果以上全部成功了,可以关闭这些进程。集群至此已经准备好运行Oryx了。

启动服务

./oryx-run.sh batch
./oryx-run.sh speed
./oryx-run.sh serving

blablabla…..好多输出,先不要管他。

确认服务层启动成功以后,可以导入数据(难道刚才导数据是测试用的?)

wget --quiet --post-file data.csv --output-document -  --header "Content-Type: text/csv" http://192.168.1.49:8080/ingest

批处理层已经开始触发一个新的计算了。

此时看看HDFS中的数据是这样的:

[root@data-mining-49 oryx]# hdfs dfs -ls /Oryx
Found 2 items
drwxr-xr-x   - hdfs supergroup          0 2016-04-08 18:40 /Oryx/data
drwxr-xr-x   - hdfs supergroup          0 2016-04-08 18:40 /Oryx/model
^[[A[root@data-mining-49 oryx]# hdfs dfs -ls /Oryx/data
Found 1 items
drwxr-xr-x   - hdfs supergroup          0 2016-04-08 18:40 /Oryx/data/oryx-1460112000000.data
[root@data-mining-49 oryx]# hdfs dfs -ls /Oryx/model
Found 3 items
drwxr-xr-x   - hdfs supergroup          0 2016-04-08 18:41 /Oryx/model/.checkpoint
drwxr-xr-x   - hdfs supergroup          0 2016-04-08 18:40 /Oryx/model/.temporary
drwxr-xr-x   - hdfs supergroup          0 2016-04-08 18:40 /Oryx/model/1460112018440
[root@data-mining-49 oryx]#

表明数据已经ok了,通过API确认计算是否完成

[root@data-mining-49 ~]# wget --quiet --output-document - --server-response http://192.168.1.49:8080/ready
  HTTP/1.1 200 OK
  Content-Length: 0
  Date: Fri, 08 Apr 2016 10:43:27 GMT
  Server: Oryx

OK!

我们来查看下推荐结果

[root@data-mining-49 ~]# wget --quiet --output-document -  http://192.168.1.49:8080/recommend/17
50,0.8234792673029006
127,0.7889597890898585
181,0.7442612769082189
275,0.7263787714764476
258,0.7164891492575407
121,0.7006391943432391
15,0.697495789732784
288,0.6787936894688755
285,0.6696474105119705
25,0.6603603088587988
[root@data-mining-49 ~]# wget --quiet --output-document -  http://192.168.1.49:8080/recommend/806
173,1.084704713546671
64,1.0142679414711893
7,0.9939510896801949
183,0.9918235178338364
22,0.9644620651379228
69,0.945814429782331
202,0.9420167971402407
11,0.9270770070143044
135,0.9227914617804345
191,0.9215727103874087
[root@data-mining-49 ~]#

完美结束。

这貌似是走的批处理逻辑,中途实时计算层挂鸟

16/04/08 18:42:38 INFO SparkContext: Successfully stopped SparkContext
16/04/08 18:42:38 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, data-mining-47.slave): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1177)
        at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
        at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
        at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
        at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
        at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
        at scala.Option.getOrElse(Option.scala:120)

咱们下一篇再找找原因吧。

参考文献