Common API Workflows
Learn how to implement common integration patterns with the PlanOps API.
Project Management Workflows
Create a Complete Project
Set up a new project with tasks and documents:
import requests
headers = {'Authorization': f'Bearer {token}'}
base_url = 'https://test-api.projectjump.app'
# 1. Create the project
project_data = {
"name": "New Office Building",
"description": "5-story commercial office construction",
"status": "planning",
"start_date": "2025-02-01T00:00:00Z",
"end_date": "2025-12-31T23:59:59Z"
}
project = requests.post(
f'{base_url}/api/v1/projects',
headers=headers,
json=project_data
).json()
print(f"Created project: {project['id']}")
# 2. Create initial tasks
tasks = [
{
"title": "Site survey and preparation",
"description": "Complete initial site survey",
"project_id": project['id'],
"status": "pending",
"priority": "high",
"due_date": "2025-02-15T17:00:00Z"
},
{
"title": "Obtain building permits",
"description": "Submit and obtain all required permits",
"project_id": project['id'],
"status": "pending",
"priority": "high",
"due_date": "2025-02-28T17:00:00Z"
},
{
"title": "Foundation work",
"description": "Excavation and foundation construction",
"project_id": project['id'],
"status": "pending",
"priority": "medium",
"due_date": "2025-03-31T17:00:00Z"
}
]
for task_data in tasks:
task = requests.post(
f'{base_url}/api/v1/tasks',
headers=headers,
json=task_data
).json()
print(f"Created task: {task['title']}")
# 3. Upload initial documents
documents = [
('site_plan.pdf', 'Site Plan', 'drawing'),
('building_permit.pdf', 'Building Permit Application', 'permit'),
]
for filename, title, doc_type in documents:
with open(filename, 'rb') as f:
files = {'file': (filename, f, 'application/pdf')}
data = {
'project_id': project['id'],
'document_type': doc_type,
'title': title,
}
doc = requests.post(
f'{base_url}/api/v1/documents',
headers={'Authorization': f'Bearer {token}'},
files=files,
data=data
).json()
print(f"Uploaded: {doc['title']}")
Monitor Project Progress
Track task completion and project status:
def get_project_status(project_id):
"""Get comprehensive project status."""
# Fetch project details
project = requests.get(
f'{base_url}/api/v1/projects/{project_id}',
headers=headers
).json()
# Fetch all tasks
tasks = requests.get(
f'{base_url}/api/v1/tasks',
headers=headers,
params={'project_id': project_id}
).json()
# Calculate statistics
total_tasks = len(tasks['items'])
completed_tasks = sum(1 for t in tasks['items'] if t['status'] == 'completed')
overdue_tasks = sum(1 for t in tasks['items']
if t['status'] != 'completed' and
t['due_date'] < datetime.now().isoformat())
return {
'project_name': project['name'],
'status': project['status'],
'total_tasks': total_tasks,
'completed_tasks': completed_tasks,
'completion_rate': (completed_tasks / total_tasks * 100) if total_tasks > 0 else 0,
'overdue_tasks': overdue_tasks
}
status = get_project_status('proj_abc123')
print(f"Project: {status['project_name']}")
print(f"Completion: {status['completion_rate']:.1f}%")
print(f"Overdue tasks: {status['overdue_tasks']}")
Document Management Workflows
Bulk Document Upload
Upload multiple documents with metadata:
import os
from pathlib import Path
def upload_documents_from_folder(folder_path, project_id):
"""Upload all PDFs from a folder to a project."""
folder = Path(folder_path)
uploaded_count = 0
for file_path in folder.glob('*.pdf'):
# Extract metadata from filename (e.g., "DRW-001_Foundation-Plan.pdf")
filename = file_path.stem
parts = filename.split('_')
drawing_number = parts[0] if len(parts) > 0 else 'Unknown'
title = parts[1].replace('-', ' ') if len(parts) > 1 else filename
with open(file_path, 'rb') as f:
files = {'file': (file_path.name, f, 'application/pdf')}
data = {
'project_id': project_id,
'document_type': 'drawing',
'title': title,
'drawing_number': drawing_number,
}
try:
doc = requests.post(
f'{base_url}/api/v1/documents',
headers={'Authorization': f'Bearer {token}'},
files=files,
data=data,
timeout=60 # Longer timeout for large files
).json()
print(f"✓ Uploaded: {doc['title']} ({doc['id']})")
uploaded_count += 1
except Exception as e:
print(f"✗ Failed to upload {file_path.name}: {e}")
return uploaded_count
# Upload all PDFs from a directory
count = upload_documents_from_folder('./drawings', 'proj_abc123')
print(f"\nUploaded {count} documents")
Document Search and Export
Search for documents and export results:
import csv
from datetime import datetime, timedelta
def export_recent_documents(project_id, days=7):
"""Export documents uploaded in the last N days to CSV."""
since_date = (datetime.now() - timedelta(days=days)).isoformat()
# Search for recent documents
params = {
'project_id': project_id,
'created_after': since_date,
'limit': 100
}
response = requests.get(
f'{base_url}/api/v1/documents',
headers=headers,
params=params
)
documents = response.json()['items']
# Export to CSV
filename = f'documents_last_{days}_days.csv'
with open(filename, 'w', newline='') as f:
fieldnames = ['id', 'title', 'document_type', 'created_at', 'file_size']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for doc in documents:
writer.writerow({
'id': doc['id'],
'title': doc['title'],
'document_type': doc.get('document_type', 'unknown'),
'created_at': doc['created_at'],
'file_size': doc.get('file_size', 0)
})
print(f"Exported {len(documents)} documents to {filename}")
return documents
docs = export_recent_documents('proj_abc123', days=7)
Task Management Workflows
Automated Task Assignment
Assign tasks based on rules:
def assign_tasks_by_priority():
"""Auto-assign high-priority tasks to available team members."""
# Get unassigned high-priority tasks
tasks = requests.get(
f'{base_url}/api/v1/tasks',
headers=headers,
params={
'status': 'pending',
'priority': 'high',
'assigned_to': None # Unassigned
}
).json()
# Get available team members
team = requests.get(
f'{base_url}/api/v1/organizations/current/members',
headers=headers
).json()
# Simple round-robin assignment
for i, task in enumerate(tasks['items']):
assignee = team['items'][i % len(team['items'])]
# Update task with assignment
updated = requests.patch(
f'{base_url}/api/v1/tasks/{task["id"]}',
headers=headers,
json={'assigned_to': assignee['id']}
).json()
print(f"Assigned '{task['title']}' to {assignee['name']}")
assign_tasks_by_priority()
Task Dependency Management
Create and manage task dependencies:
def create_task_chain(project_id, task_sequence):
"""Create a sequence of dependent tasks."""
created_tasks = []
previous_task_id = None
for i, task_info in enumerate(task_sequence):
task_data = {
'project_id': project_id,
'title': task_info['title'],
'description': task_info['description'],
'status': 'pending' if i == 0 else 'blocked',
'priority': task_info.get('priority', 'medium'),
'dependencies': [previous_task_id] if previous_task_id else []
}
task = requests.post(
f'{base_url}/api/v1/tasks',
headers=headers,
json=task_data
).json()
created_tasks.append(task)
previous_task_id = task['id']
print(f"{i+1}. Created: {task['title']}")
return created_tasks
# Example: Construction phase sequence
phases = [
{'title': 'Site Preparation', 'description': 'Clear and level site'},
{'title': 'Foundation Work', 'description': 'Excavate and pour foundation'},
{'title': 'Framing', 'description': 'Structural framing'},
{'title': 'MEP Rough-in', 'description': 'Mechanical, electrical, plumbing'},
{'title': 'Finishes', 'description': 'Interior and exterior finishes'},
]
tasks = create_task_chain('proj_abc123', phases)
Reporting Workflows
Generate Project Report
Create comprehensive project reports:
from datetime import datetime
import json
def generate_project_report(project_id):
"""Generate a comprehensive project status report."""
# Fetch project data
project = requests.get(
f'{base_url}/api/v1/projects/{project_id}',
headers=headers
).json()
# Fetch tasks
tasks = requests.get(
f'{base_url}/api/v1/tasks',
headers=headers,
params={'project_id': project_id}
).json()
# Fetch documents
documents = requests.get(
f'{base_url}/api/v1/documents',
headers=headers,
params={'project_id': project_id}
).json()
# Fetch recent activities
activities = requests.get(
f'{base_url}/api/v1/activities',
headers=headers,
params={'project_id': project_id, 'limit': 20}
).json()
# Generate report
report = {
'generated_at': datetime.now().isoformat(),
'project': {
'name': project['name'],
'status': project['status'],
'start_date': project['start_date'],
'end_date': project['end_date']
},
'statistics': {
'total_tasks': len(tasks['items']),
'completed_tasks': sum(1 for t in tasks['items'] if t['status'] == 'completed'),
'total_documents': len(documents['items']),
'recent_activities': len(activities['items'])
},
'tasks_by_status': {},
'tasks_by_priority': {},
'recent_activities': activities['items'][:10]
}
# Count tasks by status
for task in tasks['items']:
status = task['status']
report['tasks_by_status'][status] = report['tasks_by_status'].get(status, 0) + 1
# Count tasks by priority
for task in tasks['items']:
priority = task.get('priority', 'medium')
report['tasks_by_priority'][priority] = report['tasks_by_priority'].get(priority, 0) + 1
# Save to file
filename = f"project_report_{project_id}_{datetime.now().strftime('%Y%m%d')}.json"
with open(filename, 'w') as f:
json.dump(report, f, indent=2)
print(f"Report generated: {filename}")
return report
report = generate_project_report('proj_abc123')
print(f"Total tasks: {report['statistics']['total_tasks']}")
print(f"Completion rate: {report['statistics']['completed_tasks'] / report['statistics']['total_tasks'] * 100:.1f}%")
Export Tasks to Gantt Chart
Export task data for visualization:
import csv
from datetime import datetime
def export_gantt_data(project_id):
"""Export tasks in Gantt chart format."""
tasks = requests.get(
f'{base_url}/api/v1/tasks',
headers=headers,
params={'project_id': project_id}
).json()
filename = f'gantt_data_{project_id}.csv'
with open(filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow([
'Task ID', 'Task Name', 'Start Date', 'End Date',
'Duration (days)', 'Status', 'Assignee', 'Dependencies'
])
for task in tasks['items']:
start = datetime.fromisoformat(task.get('start_date', task['created_at']))
end = datetime.fromisoformat(task.get('due_date', task['created_at']))
duration = (end - start).days
writer.writerow([
task['id'],
task['title'],
start.strftime('%Y-%m-%d'),
end.strftime('%Y-%m-%d'),
duration,
task['status'],
task.get('assigned_to', 'Unassigned'),
','.join(task.get('dependencies', []))
])
print(f"Gantt data exported to {filename}")
export_gantt_data('proj_abc123')
Integration Workflows
Sync with External CDE
Synchronize documents with external Common Data Environment:
def sync_documents_to_cde(project_id, cde_connection_id):
"""Sync project documents to external CDE."""
# Get documents to sync
docs = requests.get(
f'{base_url}/api/v1/documents',
headers=headers,
params={
'project_id': project_id,
'synced': False # Only unsynced documents
}
).json()
synced_count = 0
for doc in docs['items']:
# Download document from PlanOps
download = requests.get(
f'{base_url}/api/v1/documents/{doc["id"]}/download',
headers=headers
)
# Upload to external CDE (pseudo-code - adjust for your CDE)
cde_response = upload_to_cde(
connection_id=cde_connection_id,
file_content=download.content,
metadata={
'title': doc['title'],
'document_type': doc.get('document_type'),
'project_id': project_id
}
)
if cde_response['success']:
# Mark as synced in PlanOps
requests.patch(
f'{base_url}/api/v1/documents/{doc["id"]}',
headers=headers,
json={
'synced': True,
'external_id': cde_response['cde_document_id']
}
)
synced_count += 1
print(f"✓ Synced: {doc['title']}")
else:
print(f"✗ Failed: {doc['title']}")
return synced_count
Webhook Event Handler
Process webhook events from PlanOps:
from flask import Flask, request, jsonify
import hmac
import hashlib
app = Flask(__name__)
WEBHOOK_SECRET = 'your_webhook_secret'
@app.route('/webhooks/planops', methods=['POST'])
def handle_planops_webhook():
"""Handle incoming webhooks from PlanOps."""
# Verify webhook signature
signature = request.headers.get('X-PlanOps-Signature')
computed_signature = hmac.new(
WEBHOOK_SECRET.encode(),
request.data,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, computed_signature):
return jsonify({'error': 'Invalid signature'}), 401
# Process event
event = request.json
event_type = event['type']
if event_type == 'task.created':
handle_task_created(event['data'])
elif event_type == 'task.updated':
handle_task_updated(event['data'])
elif event_type == 'document.uploaded':
handle_document_uploaded(event['data'])
return jsonify({'status': 'processed'}), 200
def handle_task_created(task_data):
"""Handle task creation event."""
print(f"New task created: {task_data['title']}")
# Send notification, update external system, etc.
if __name__ == '__main__':
app.run(port=5000)
Next Steps
- Explore the API Reference for complete endpoint documentation
- Review Authentication for advanced auth scenarios
- Check Getting Started for basic examples
Last updated: 2025-12-12