Files
hands-on/12_distributed_tensorflow.ipynb
Haesun Park cdf7810446 12장 실행
2018-04-17 16:37:02 +09:00

545 lines
16 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPython 3.5.5\n",
"IPython 6.3.0\n",
"\n",
"numpy 1.14.2\n",
"sklearn 0.19.1\n",
"scipy 1.0.1\n",
"matplotlib 2.2.2\n",
"tensorflow 1.7.0\n"
]
}
],
"source": [
"%load_ext watermark\n",
"%watermark -v -p numpy,sklearn,scipy,matplotlib,tensorflow"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**12장 분산 텐서플로**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"_이 노트북은 11장에 있는 모든 샘플 코드와 연습문제 해답을 가지고 있습니다._"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 설정"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"파이썬 2와 3을 모두 지원합니다. 공통 모듈을 임포트하고 맷플롯립 그림이 노트북 안에 포함되도록 설정하고 생성한 그림을 저장하기 위한 함수를 준비합니다:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# 파이썬 2와 파이썬 3 지원\n",
"from __future__ import division, print_function, unicode_literals\n",
"\n",
"# 공통\n",
"import numpy as np\n",
"import os\n",
"\n",
"# 일관된 출력을 위해 유사난수 초기화\n",
"def reset_graph(seed=42):\n",
" tf.reset_default_graph()\n",
" tf.set_random_seed(seed)\n",
" np.random.seed(seed)\n",
"\n",
"# 맷플롯립 설정\n",
"%matplotlib inline\n",
"import matplotlib\n",
"import matplotlib.pyplot as plt\n",
"plt.rcParams['axes.labelsize'] = 14\n",
"plt.rcParams['xtick.labelsize'] = 12\n",
"plt.rcParams['ytick.labelsize'] = 12\n",
"\n",
"# 그림을 저장할 폴더\n",
"PROJECT_ROOT_DIR = \".\"\n",
"CHAPTER_ID = \"distributed\"\n",
"\n",
"def save_fig(fig_id, tight_layout=True):\n",
" path = os.path.join(PROJECT_ROOT_DIR, \"images\", CHAPTER_ID, fig_id + \".png\")\n",
" if tight_layout:\n",
" plt.tight_layout()\n",
" plt.savefig(path, format='png', dpi=300)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 로컬 서버"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"import tensorflow as tf"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"c = tf.constant(\"Hello distributed TensorFlow!\")\n",
"server = tf.train.Server.create_local_server()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"b'Hello distributed TensorFlow!'\n"
]
}
],
"source": [
"with tf.Session(server.target) as sess:\n",
" print(sess.run(c))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 클러스터"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"cluster_spec = tf.train.ClusterSpec({\n",
" \"ps\": [\n",
" \"127.0.0.1:2221\", # /job:ps/task:0\n",
" \"127.0.0.1:2222\", # /job:ps/task:1\n",
" ],\n",
" \"worker\": [\n",
" \"127.0.0.1:2223\", # /job:worker/task:0\n",
" \"127.0.0.1:2224\", # /job:worker/task:1\n",
" \"127.0.0.1:2225\", # /job:worker/task:2\n",
" ]})"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"task_ps0 = tf.train.Server(cluster_spec, job_name=\"ps\", task_index=0)\n",
"task_ps1 = tf.train.Server(cluster_spec, job_name=\"ps\", task_index=1)\n",
"task_worker0 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=0)\n",
"task_worker1 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=1)\n",
"task_worker2 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 여러 디바이스와 서버에 연산을 할당하기"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"reset_graph()\n",
"\n",
"with tf.device(\"/job:ps\"):\n",
" a = tf.Variable(1.0, name=\"a\")\n",
"\n",
"with tf.device(\"/job:worker\"):\n",
" b = a + 2\n",
"\n",
"with tf.device(\"/job:worker/task:1\"):\n",
" c = a + b"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"4.0\n"
]
}
],
"source": [
"with tf.Session(\"grpc://127.0.0.1:2221\") as sess:\n",
" sess.run(a.initializer)\n",
" print(c.eval())"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"reset_graph()\n",
"\n",
"with tf.device(tf.train.replica_device_setter(\n",
" ps_tasks=2,\n",
" ps_device=\"/job:ps\",\n",
" worker_device=\"/job:worker\")):\n",
" v1 = tf.Variable(1.0, name=\"v1\") # /job:ps/task:0 (defaults to /cpu:0) 에 할당\n",
" v2 = tf.Variable(2.0, name=\"v2\") # /job:ps/task:1 (defaults to /cpu:0) 에 할당\n",
" v3 = tf.Variable(3.0, name=\"v3\") # /job:ps/task:0 (defaults to /cpu:0) 에 할당\n",
" s = v1 + v2 # /job:worker (defaults to task:0/cpu:0) 에 할당\n",
" with tf.device(\"/task:1\"):\n",
" p1 = 2 * s # /job:worker/task:1 (defaults to /cpu:0) 에 할당\n",
" with tf.device(\"/cpu:0\"):\n",
" p2 = 3 * s # /job:worker/task:1/cpu:0 에 할당\n",
"\n",
"config = tf.ConfigProto()\n",
"config.log_device_placement = True\n",
"\n",
"with tf.Session(\"grpc://127.0.0.1:2221\", config=config) as sess:\n",
" v1.initializer.run()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 리더"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"No more files to read\n",
"[array([[ 4.0000000e+00, 5.0000000e+00],\n",
" [ 1.0000000e+00, -3.9627734e+03]], dtype=float32), array([1, 0], dtype=int32)]\n",
"[array([[7., 8.]], dtype=float32), array([0], dtype=int32)]\n",
"No more training instances\n"
]
}
],
"source": [
"reset_graph()\n",
"\n",
"test_csv = open(\"my_test.csv\", \"w\")\n",
"test_csv.write(\"x1, x2 , target\\n\")\n",
"test_csv.write(\"1., , 0\\n\")\n",
"test_csv.write(\"4., 5. , 1\\n\")\n",
"test_csv.write(\"7., 8. , 0\\n\")\n",
"test_csv.close()\n",
"\n",
"filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
"filename = tf.placeholder(tf.string)\n",
"enqueue_filename = filename_queue.enqueue([filename])\n",
"close_filename_queue = filename_queue.close()\n",
"\n",
"reader = tf.TextLineReader(skip_header_lines=1)\n",
"key, value = reader.read(filename_queue)\n",
"\n",
"x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
"features = tf.stack([x1, x2])\n",
"\n",
"instance_queue = tf.RandomShuffleQueue(\n",
" capacity=10, min_after_dequeue=2,\n",
" dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
" name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
"enqueue_instance = instance_queue.enqueue([features, target])\n",
"close_instance_queue = instance_queue.close()\n",
"\n",
"minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
"\n",
"with tf.Session() as sess:\n",
" sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
" sess.run(close_filename_queue)\n",
" try:\n",
" while True:\n",
" sess.run(enqueue_instance)\n",
" except tf.errors.OutOfRangeError as ex:\n",
" print(\"더 이상 읽을 파일이 없습니다\")\n",
" sess.run(close_instance_queue)\n",
" try:\n",
" while True:\n",
" print(sess.run([minibatch_instances, minibatch_targets]))\n",
" except tf.errors.OutOfRangeError as ex:\n",
" print(\"더 이상 훈련 샘플이 없습니다\")"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"#coord = tf.train.Coordinator()\n",
"#threads = tf.train.start_queue_runners(coord=coord)\n",
"#filename_queue = tf.train.string_input_producer([\"test.csv\"])\n",
"#coord.request_stop()\n",
"#coord.join(threads)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# QueueRunner와 Coordinator"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[array([[ 4.0000000e+00, 5.0000000e+00],\n",
" [ 1.0000000e+00, -3.9627734e+03]], dtype=float32), array([1, 0], dtype=int32)]\n",
"[array([[7., 8.]], dtype=float32), array([0], dtype=int32)]\n",
"더 이상 훈련 샘플이 없습니다\n"
]
}
],
"source": [
"reset_graph()\n",
"\n",
"filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
"filename = tf.placeholder(tf.string)\n",
"enqueue_filename = filename_queue.enqueue([filename])\n",
"close_filename_queue = filename_queue.close()\n",
"\n",
"reader = tf.TextLineReader(skip_header_lines=1)\n",
"key, value = reader.read(filename_queue)\n",
"\n",
"x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
"features = tf.stack([x1, x2])\n",
"\n",
"instance_queue = tf.RandomShuffleQueue(\n",
" capacity=10, min_after_dequeue=2,\n",
" dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
" name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
"enqueue_instance = instance_queue.enqueue([features, target])\n",
"close_instance_queue = instance_queue.close()\n",
"\n",
"minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
"\n",
"n_threads = 5\n",
"queue_runner = tf.train.QueueRunner(instance_queue, [enqueue_instance] * n_threads)\n",
"coord = tf.train.Coordinator()\n",
"\n",
"with tf.Session() as sess:\n",
" sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
" sess.run(close_filename_queue)\n",
" enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)\n",
" try:\n",
" while True:\n",
" print(sess.run([minibatch_instances, minibatch_targets]))\n",
" except tf.errors.OutOfRangeError as ex:\n",
" print(\"더 이상 훈련 샘플이 없습니다\")"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[array([[ 4.0000000e+00, 5.0000000e+00],\n",
" [ 1.0000000e+00, -3.9627734e+03]], dtype=float32), array([1, 0], dtype=int32)]\n",
"[array([[7., 8.]], dtype=float32), array([0], dtype=int32)]\n",
"더 이상 훈련 샘플이 없습니다\n"
]
}
],
"source": [
"reset_graph()\n",
"\n",
"def read_and_push_instance(filename_queue, instance_queue):\n",
" reader = tf.TextLineReader(skip_header_lines=1)\n",
" key, value = reader.read(filename_queue)\n",
" x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
" features = tf.stack([x1, x2])\n",
" enqueue_instance = instance_queue.enqueue([features, target])\n",
" return enqueue_instance\n",
"\n",
"filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
"filename = tf.placeholder(tf.string)\n",
"enqueue_filename = filename_queue.enqueue([filename])\n",
"close_filename_queue = filename_queue.close()\n",
"\n",
"instance_queue = tf.RandomShuffleQueue(\n",
" capacity=10, min_after_dequeue=2,\n",
" dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
" name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
"\n",
"minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
"\n",
"read_and_enqueue_ops = [read_and_push_instance(filename_queue, instance_queue) for i in range(5)]\n",
"queue_runner = tf.train.QueueRunner(instance_queue, read_and_enqueue_ops)\n",
"\n",
"with tf.Session() as sess:\n",
" sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
" sess.run(close_filename_queue)\n",
" coord = tf.train.Coordinator()\n",
" enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)\n",
" try:\n",
" while True:\n",
" print(sess.run([minibatch_instances, minibatch_targets]))\n",
" except tf.errors.OutOfRangeError as ex:\n",
" print(\"더 이상 훈련 샘플이 없습니다\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 타임아웃 지정하기"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2.0\n",
"6.0\n",
"3.0\n",
"4.0\n",
"dequeue 타임 아웃\n"
]
}
],
"source": [
"reset_graph()\n",
"\n",
"q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[()])\n",
"v = tf.placeholder(tf.float32)\n",
"enqueue = q.enqueue([v])\n",
"dequeue = q.dequeue()\n",
"output = dequeue + 1\n",
"\n",
"config = tf.ConfigProto()\n",
"config.operation_timeout_in_ms = 1000\n",
"\n",
"with tf.Session(config=config) as sess:\n",
" sess.run(enqueue, feed_dict={v: 1.0})\n",
" sess.run(enqueue, feed_dict={v: 2.0})\n",
" sess.run(enqueue, feed_dict={v: 3.0})\n",
" print(sess.run(output))\n",
" print(sess.run(output, feed_dict={dequeue: 5}))\n",
" print(sess.run(output))\n",
" print(sess.run(output))\n",
" try:\n",
" print(sess.run(output))\n",
" except tf.errors.DeadlineExceededError as ex:\n",
" print(\"dequeue 타임 아웃\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"# 연습문제 해답"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Coming soon**"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.5"
},
"nav_menu": {},
"toc": {
"navigate_menu": true,
"number_sections": true,
"sideBar": true,
"threshold": 6,
"toc_cell": false,
"toc_section_display": "block",
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 1
}