Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

483

484

485

486

487

488

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

504

505

506

507

508

509

510

511

512

513

514

515

516

517

518

519

520

521

522

523

524

525

526

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

560

561

562

563

564

565

566

567

568

569

570

571

572

573

574

575

576

577

578

579

580

581

582

583

584

585

586

587

588

589

590

591

592

593

594

595

596

597

598

599

600

601

602

603

604

605

606

607

608

609

610

611

612

613

614

615

616

617

618

619

620

621

622

623

624

625

626

627

628

629

630

631

632

633

634

635

636

637

638

639

640

641

642

643

644

645

646

647

648

649

650

651

652

653

654

655

656

657

658

659

660

661

662

663

664

665

666

667

668

669

670

671

672

673

674

675

676

677

678

679

680

681

682

683

684

685

686

687

688

689

690

691

692

693

694

695

696

697

698

699

700

701

702

703

704

705

706

707

708

709

710

711

712

713

714

715

716

717

718

719

720

721

722

723

724

725

726

727

728

729

730

731

732

733

734

735

736

737

738

739

740

741

742

743

744

745

746

747

748

749

750

751

752

753

754

755

756

757

758

759

760

761

762

763

764

765

766

767

768

769

770

771

772

773

774

775

776

777

778

779

780

781

782

783

784

785

786

787

788

789

790

791

792

793

794

795

796

797

798

799

800

801

802

803

804

805

806

807

808

809

810

811

812

813

814

815

816

817

818

819

820

821

822

823

824

825

826

827

828

829

830

831

832

833

834

835

836

837

838

839

840

841

842

843

844

845

846

847

848

849

850

851

852

853

854

855

856

857

858

859

860

861

862

863

864

865

866

867

868

869

870

871

872

873

874

875

876

877

878

879

880

881

882

883

884

885

886

887

888

889

890

891

892

893

894

895

896

897

898

899

900

901

902

903

904

905

906

907

908

909

910

911

912

913

914

915

916

917

918

919

920

921

922

923

924

925

926

927

928

929

930

931

932

933

934

935

936

937

938

939

940

941

942

943

944

945

946

947

948

949

950

951

952

953

954

955

956

957

958

959

960

961

962

963

964

965

966

967

968

969

970

971

972

973

974

975

976

977

978

979

980

981

982

983

984

985

986

987

988

989

990

991

992

993

994

995

996

997

998

999

1000

1001

1002

1003

1004

1005

1006

1007

1008

1009

1010

1011

1012

1013

1014

1015

1016

1017

1018

1019

1020

1021

1022

1023

1024

1025

1026

1027

1028

1029

1030

1031

1032

1033

1034

1035

1036

1037

1038

1039

1040

1041

1042

1043

1044

1045

1046

1047

1048

1049

1050

1051

1052

1053

1054

1055

1056

1057

1058

1059

1060

1061

1062

1063

1064

1065

1066

1067

1068

1069

1070

1071

1072

1073

1074

1075

1076

1077

1078

1079

1080

1081

1082

1083

1084

1085

1086

1087

1088

1089

1090

1091

1092

1093

1094

1095

1096

1097

1098

1099

1100

1101

1102

1103

1104

1105

1106

1107

1108

1109

1110

1111

1112

1113

1114

1115

1116

1117

1118

1119

1120

1121

1122

1123

1124

1125

1126

1127

1128

1129

1130

1131

1132

1133

1134

1135

1136

1137

1138

1139

1140

1141

1142

1143

1144

1145

1146

1147

1148

1149

1150

1151

1152

1153

1154

1155

1156

1157

1158

1159

1160

1161

1162

1163

1164

1165

1166

1167

1168

1169

1170

1171

1172

1173

1174

1175

1176

1177

1178

1179

1180

1181

1182

1183

1184

1185

1186

1187

1188

1189

1190

1191

1192

1193

1194

1195

1196

1197

1198

1199

1200

1201

1202

1203

1204

1205

1206

1207

1208

1209

1210

1211

1212

1213

1214

1215

1216

1217

1218

1219

1220

1221

1222

1223

1224

1225

1226

1227

1228

1229

1230

1231

1232

1233

1234

1235

1236

1237

1238

1239

1240

1241

1242

1243

1244

1245

1246

1247

1248

1249

1250

1251

1252

1253

1254

1255

1256

1257

1258

1259

1260

1261

1262

1263

1264

1265

1266

1267

1268

1269

1270

1271

1272

1273

1274

1275

1276

1277

1278

1279

1280

1281

1282

1283

1284

1285

1286

1287

1288

1289

1290

1291

1292

1293

1294

1295

1296

1297

1298

1299

1300

1301

1302

1303

1304

1305

1306

1307

1308

1309

1310

1311

1312

1313

1314

1315

1316

1317

1318

1319

1320

1321

1322

1323

1324

1325

1326

1327

1328

1329

1330

1331

1332

1333

1334

1335

1336

1337

1338

1339

1340

1341

1342

1343

1344

1345

1346

1347

1348

1349

1350

1351

1352

1353

1354

1355

1356

1357

1358

1359

1360

1361

1362

1363

1364

1365

1366

1367

1368

1369

1370

1371

1372

1373

1374

1375

1376

1377

1378

1379

1380

1381

1382

1383

1384

1385

1386

1387

1388

1389

1390

1391

1392

1393

1394

1395

1396

1397

1398

1399

1400

1401

1402

1403

1404

1405

1406

1407

1408

1409

1410

1411

1412

1413

1414

1415

1416

1417

1418

1419

1420

1421

1422

1423

1424

1425

1426

1427

1428

1429

1430

1431

1432

1433

1434

1435

1436

1437

1438

1439

1440

1441

1442

1443

1444

1445

1446

1447

1448

1449

1450

1451

1452

1453

1454

1455

1456

1457

1458

1459

1460

1461

1462

1463

1464

1465

1466

1467

1468

1469

1470

1471

1472

1473

1474

1475

1476

1477

1478

1479

1480

1481

1482

1483

1484

1485

1486

1487

1488

1489

1490

1491

1492

1493

1494

1495

1496

1497

1498

1499

1500

1501

1502

1503

1504

1505

1506

1507

1508

1509

1510

1511

1512

1513

1514

1515

1516

1517

1518

1519

1520

1521

1522

1523

1524

1525

1526

1527

1528

1529

1530

1531

1532

1533

1534

1535

1536

1537

1538

1539

1540

1541

1542

1543

1544

1545

1546

1547

1548

1549

1550

1551

1552

1553

1554

1555

1556

1557

1558

1559

1560

1561

1562

1563

1564

1565

1566

1567

1568

1569

1570

1571

1572

1573

1574

1575

1576

1577

1578

1579

1580

1581

1582

1583

1584

1585

1586

1587

1588

1589

1590

1591

1592

1593

1594

1595

1596

1597

1598

1599

1600

1601

1602

1603

1604

1605

1606

1607

1608

1609

1610

1611

1612

1613

1614

1615

1616

1617

1618

1619

1620

1621

1622

1623

1624

1625

1626

1627

1628

1629

1630

1631

1632

1633

1634

1635

1636

1637

1638

1639

1640

1641

1642

1643

1644

1645

1646

1647

1648

1649

1650

1651

1652

1653

1654

1655

1656

1657

1658

1659

1660

1661

1662

1663

1664

1665

1666

1667

1668

1669

1670

1671

1672

1673

1674

1675

1676

1677

1678

1679

1680

1681

1682

1683

1684

1685

1686

1687

1688

1689

1690

1691

1692

1693

1694

1695

1696

1697

1698

1699

1700

1701

1702

1703

1704

1705

1706

1707

1708

1709

1710

1711

1712

1713

1714

1715

1716

1717

1718

1719

1720

1721

1722

1723

1724

1725

1726

1727

1728

1729

1730

1731

1732

1733

1734

1735

1736

1737

1738

1739

1740

1741

1742

1743

1744

1745

1746

1747

1748

1749

1750

1751

1752

1753

1754

1755

1756

1757

1758

1759

1760

1761

1762

1763

1764

1765

1766

1767

1768

1769

1770

1771

1772

1773

1774

1775

1776

1777

1778

1779

1780

1781

1782

1783

1784

1785

1786

1787

1788

1789

1790

1791

1792

1793

1794

1795

1796

1797

1798

1799

1800

1801

1802

1803

1804

1805

1806

1807

1808

1809

1810

1811

1812

1813

1814

1815

1816

1817

1818

1819

1820

1821

1822

1823

1824

1825

1826

1827

1828

1829

1830

1831

1832

1833

1834

1835

1836

1837

1838

1839

1840

1841

1842

1843

1844

1845

1846

1847

1848

1849

1850

1851

1852

1853

1854

1855

1856

1857

1858

1859

1860

1861

1862

1863

1864

1865

1866

1867

1868

1869

1870

1871

1872

1873

1874

1875

1876

1877

1878

1879

1880

1881

1882

1883

1884

1885

1886

1887

1888

1889

1890

1891

1892

1893

1894

1895

1896

1897

1898

1899

1900

1901

1902

1903

1904

1905

1906

1907

1908

1909

1910

1911

1912

1913

1914

1915

1916

1917

1918

1919

1920

1921

1922

1923

1924

1925

1926

1927

1928

1929

1930

1931

1932

1933

1934

1935

1936

1937

1938

1939

1940

1941

1942

1943

1944

1945

1946

1947

1948

1949

1950

1951

1952

1953

1954

1955

1956

1957

1958

1959

1960

1961

1962

1963

1964

1965

1966

1967

1968

1969

1970

1971

1972

1973

1974

1975

1976

1977

1978

1979

1980

1981

1982

1983

1984

1985

1986

1987

1988

1989

1990

1991

1992

1993

1994

1995

1996

1997

1998

1999

2000

2001

2002

2003

2004

2005

2006

2007

2008

2009

2010

2011

2012

2013

2014

2015

2016

2017

2018

2019

2020

2021

2022

2023

2024

2025

2026

2027

2028

2029

2030

2031

2032

2033

2034

2035

2036

2037

2038

2039

2040

2041

2042

2043

2044

2045

2046

2047

2048

2049

2050

2051

2052

2053

2054

2055

2056

2057

2058

2059

2060

2061

2062

2063

2064

2065

2066

2067

2068

2069

2070

2071

2072

2073

2074

2075

2076

2077

2078

2079

2080

2081

2082

2083

2084

2085

2086

2087

2088

2089

2090

2091

2092

2093

2094

2095

2096

2097

2098

2099

2100

2101

2102

2103

2104

2105

2106

2107

2108

2109

2110

2111

2112

2113

2114

2115

2116

2117

2118

2119

2120

2121

2122

2123

2124

2125

2126

2127

2128

2129

2130

2131

2132

2133

2134

2135

2136

2137

2138

2139

2140

2141

2142

2143

2144

2145

2146

2147

2148

2149

2150

2151

2152

2153

2154

2155

2156

2157

2158

2159

2160

2161

2162

2163

2164

2165

2166

2167

2168

2169

2170

2171

2172

2173

2174

2175

2176

2177

2178

2179

2180

2181

2182

2183

2184

2185

2186

2187

2188

2189

2190

2191

2192

2193

2194

2195

2196

2197

2198

2199

2200

2201

2202

2203

2204

2205

2206

2207

2208

2209

2210

2211

2212

2213

2214

2215

2216

2217

2218

2219

2220

2221

2222

2223

2224

2225

2226

2227

2228

2229

2230

2231

2232

2233

2234

2235

2236

2237

2238

2239

2240

2241

2242

2243

2244

2245

2246

2247

2248

2249

2250

2251

2252

2253

2254

2255

2256

2257

2258

2259

2260

2261

2262

2263

2264

2265

2266

2267

2268

2269

2270

2271

2272

2273

2274

2275

2276

2277

2278

2279

2280

2281

2282

2283

2284

2285

2286

2287

2288

2289

2290

2291

2292

2293

2294

2295

2296

2297

2298

2299

2300

2301

2302

2303

2304

2305

2306

2307

2308

2309

2310

2311

2312

2313

2314

2315

2316

2317

2318

2319

2320

2321

2322

2323

2324

2325

2326

2327

2328

2329

2330

2331

2332

2333

2334

2335

2336

2337

2338

2339

2340

2341

2342

2343

2344

2345

2346

2347

2348

2349

2350

2351

2352

2353

2354

2355

2356

2357

2358

2359

2360

2361

2362

2363

2364

2365

2366

2367

2368

2369

2370

2371

2372

2373

2374

2375

2376

2377

2378

2379

2380

2381

2382

2383

2384

2385

2386

2387

2388

2389

2390

2391

2392

2393

2394

2395

2396

2397

2398

2399

2400

2401

2402

2403

2404

2405

2406

2407

2408

2409

2410

2411

2412

2413

2414

2415

2416

2417

2418

2419

2420

2421

2422

2423

2424

2425

2426

2427

2428

2429

2430

2431

2432

2433

2434

2435

2436

2437

2438

2439

2440

2441

2442

2443

2444

2445

2446

2447

2448

2449

2450

2451

2452

2453

2454

2455

2456

2457

2458

2459

2460

2461

2462

2463

2464

2465

2466

2467

2468

2469

2470

2471

2472

2473

2474

2475

2476

2477

2478

2479

2480

2481

2482

2483

2484

2485

2486

2487

2488

2489

2490

2491

2492

2493

2494

2495

2496

2497

2498

2499

2500

2501

2502

2503

2504

2505

2506

2507

2508

2509

2510

2511

2512

2513

2514

2515

2516

2517

2518

2519

2520

2521

2522

2523

2524

2525

2526

2527

2528

2529

2530

2531

2532

2533

2534

2535

2536

2537

2538

2539

2540

2541

2542

2543

2544

2545

2546

2547

2548

2549

2550

2551

2552

2553

2554

2555

2556

2557

2558

2559

2560

2561

2562

2563

2564

2565

2566

2567

2568

2569

2570

2571

2572

2573

2574

2575

2576

2577

2578

2579

2580

2581

2582

2583

2584

2585

2586

2587

2588

2589

2590

2591

2592

2593

2594

2595

2596

2597

2598

2599

2600

2601

2602

2603

2604

2605

2606

2607

2608

2609

2610

2611

2612

2613

2614

2615

2616

2617

2618

2619

2620

2621

2622

2623

2624

2625

2626

2627

2628

2629

2630

2631

2632

2633

2634

2635

2636

2637

2638

2639

2640

2641

2642

2643

2644

2645

2646

2647

2648

2649

2650

2651

2652

2653

2654

2655

2656

2657

2658

2659

2660

2661

2662

2663

2664

2665

2666

2667

2668

2669

2670

2671

2672

2673

2674

2675

2676

2677

2678

2679

2680

2681

2682

2683

2684

2685

2686

2687

2688

2689

2690

2691

2692

2693

2694

2695

2696

2697

2698

2699

2700

2701

2702

2703

2704

2705

2706

2707

2708

2709

2710

2711

2712

2713

2714

2715

2716

2717

2718

2719

2720

2721

2722

2723

2724

2725

2726

2727

2728

2729

2730

2731

2732

2733

2734

2735

2736

2737

2738

2739

2740

2741

2742

2743

from __future__ import absolute_import 

from __future__ import division 

from __future__ import print_function 

from __future__ import unicode_literals 

 

from future.standard_library import install_aliases 

install_aliases() 

from builtins import str 

from builtins import object, bytes 

import copy 

from datetime import datetime, timedelta 

import functools 

import getpass 

import imp 

import jinja2 

import json 

import logging 

import os 

import dill 

import re 

import signal 

import socket 

import sys 

from urllib.parse import urlparse 

 

from sqlalchemy import ( 

    Column, Integer, String, DateTime, Text, Boolean, ForeignKey, PickleType, 

    Index, Float) 

from sqlalchemy import case, func, or_, and_ 

from sqlalchemy.ext.declarative import declarative_base, declared_attr 

from sqlalchemy.dialects.mysql import LONGTEXT 

from sqlalchemy.orm import relationship, synonym 

 

from croniter import croniter 

import six 

 

from airflow import settings, utils 

from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor 

from airflow import configuration 

from airflow.utils import ( 

    AirflowException, State, apply_defaults, provide_session, 

    is_container, as_tuple, TriggerRule) 

 

Base = declarative_base() 

ID_LEN = 250 

SQL_ALCHEMY_CONN = configuration.get('core', 'SQL_ALCHEMY_CONN') 

DAGS_FOLDER = os.path.expanduser(configuration.get('core', 'DAGS_FOLDER')) 

XCOM_RETURN_KEY = 'return_value' 

 

ENCRYPTION_ON = False 

try: 

    from cryptography.fernet import Fernet 

    FERNET = Fernet(configuration.get('core', 'FERNET_KEY').encode('utf-8')) 

    ENCRYPTION_ON = True 

except: 

    pass 

 

if 'mysql' in SQL_ALCHEMY_CONN: 

    LongText = LONGTEXT 

else: 

    LongText = Text 

 

 

def clear_task_instances(tis, session, activate_dag_runs=True): 

    ''' 

    Clears a set of task instances, but makes sure the running ones 

    get killed. 

    ''' 

    job_ids = [] 

    for ti in tis: 

        if ti.state == State.RUNNING: 

            if ti.job_id: 

                ti.state = State.SHUTDOWN 

                job_ids.append(ti.job_id) 

        else: 

            session.delete(ti) 

    if job_ids: 

        from airflow.jobs import BaseJob as BJ 

        for job in session.query(BJ).filter(BJ.id.in_(job_ids)).all(): 

            job.state = State.SHUTDOWN 

    if activate_dag_runs: 

        execution_dates = {ti.execution_date for ti in tis} 

        dag_ids = {ti.dag_id for ti in tis} 

        drs = session.query(DagRun).filter( 

            DagRun.dag_id.in_(dag_ids), 

            DagRun.execution_date.in_(execution_dates), 

        ).all() 

        for dr in drs: 

            dr.state = State.RUNNING 

 

 

class DagBag(object): 

    """ 

    A dagbag is a collection of dags, parsed out of a folder tree and has high 

    level configuration settings, like what database to use as a backend and 

    what executor to use to fire off tasks. This makes it easier to run 

    distinct environments for say production and development, tests, or for 

    different teams or security profiles. What would have been system level 

    settings are now dagbag level so that one system can run multiple, 

    independent settings sets. 

 

    :param dag_folder: the folder to scan to find DAGs 

    :type dag_folder: str 

    :param executor: the executor to use when executing task instances 

        in this DagBag 

    :param include_examples: whether to include the examples that ship 

        with airflow or not 

    :type include_examples: bool 

    :param sync_to_db: whether to sync the properties of the DAGs to 

        the metadata DB while finding them, typically should be done 

        by the scheduler job only 

    :type sync_to_db: bool 

    """ 

    def __init__( 

            self, 

            dag_folder=None, 

            executor=DEFAULT_EXECUTOR, 

            include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES'), 

            sync_to_db=False): 

 

        dag_folder = dag_folder or DAGS_FOLDER 

        logging.info("Filling up the DagBag from " + dag_folder) 

        self.dag_folder = dag_folder 

        self.dags = {} 

        self.sync_to_db = sync_to_db 

        self.file_last_changed = {} 

        self.executor = executor 

        self.import_errors = {} 

        if include_examples: 

            example_dag_folder = os.path.join( 

                os.path.dirname(__file__), 

                'example_dags') 

            self.collect_dags(example_dag_folder) 

        self.collect_dags(dag_folder) 

        if sync_to_db: 

            self.deactivate_inactive_dags() 

 

    def get_dag(self, dag_id): 

        """ 

        Gets the DAG out of the dictionary, and refreshes it if expired 

        """ 

        if dag_id in self.dags: 

            dag = self.dags[dag_id] 

            if dag.is_subdag: 

                orm_dag = DagModel.get_current(dag.parent_dag.dag_id) 

            else: 

                orm_dag = DagModel.get_current(dag_id) 

            if orm_dag and dag.last_loaded < ( 

                    orm_dag.last_expired or datetime(2100, 1, 1)): 

                self.process_file( 

                    filepath=orm_dag.fileloc, only_if_updated=False) 

                dag = self.dags[dag_id] 

        else: 

            orm_dag = DagModel.get_current(dag_id) 

            self.process_file( 

                filepath=orm_dag.fileloc, only_if_updated=False) 

            if dag_id in self.dags: 

                dag = self.dags[dag_id] 

            else: 

                dag = None 

        return dag 

 

    def process_file(self, filepath, only_if_updated=True, safe_mode=True): 

        """ 

        Given a path to a python module, this method imports the module and 

        look for dag objects within it. 

        """ 

        try: 

            # This failed before in what may have been a git sync 

            # race condition 

            dttm = datetime.fromtimestamp(os.path.getmtime(filepath)) 

            mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1]) 

            mod_name = 'unusual_prefix_' + mod_name 

        except: 

            return 

 

        if safe_mode and os.path.isfile(filepath): 

            # Skip file if no obvious references to airflow or DAG are found. 

            with open(filepath, 'r') as f: 

                content = f.read() 

                if not all([s in content for s in ('DAG', 'airflow')]): 

                    return 

 

        if ( 

                not only_if_updated or 

                filepath not in self.file_last_changed or 

                dttm != self.file_last_changed[filepath]): 

            try: 

                logging.info("Importing " + filepath) 

                if mod_name in sys.modules: 

                    del sys.modules[mod_name] 

                with utils.timeout(30): 

                    m = imp.load_source(mod_name, filepath) 

            except Exception as e: 

                logging.error("Failed to import: " + filepath) 

                self.import_errors[filepath] = str(e) 

                logging.exception(e) 

                self.file_last_changed[filepath] = dttm 

                return 

 

            for dag in list(m.__dict__.values()): 

                if isinstance(dag, DAG): 

                    dag.full_filepath = filepath 

                    dag.is_subdag = False 

                    self.bag_dag(dag, parent_dag=dag, root_dag=dag) 

 

            self.file_last_changed[filepath] = dttm 

 

    @provide_session 

    def kill_zombies(self, session): 

        """ 

        Fails tasks that haven't had a heartbeat in too long 

        """ 

        from airflow.jobs import LocalTaskJob as LJ 

        logging.info("Finding 'running' jobs without a recent heartbeat") 

        secs = (configuration.getint('scheduler', 'job_heartbeat_sec') * 3) + 120 

        limit_dttm = datetime.now() - timedelta(seconds=secs) 

        print("Failing jobs without heartbeat after {}".format(limit_dttm)) 

        jobs = ( 

            session 

            .query(LJ) 

            .filter( 

                LJ.state == State.RUNNING, 

                LJ.latest_heartbeat < limit_dttm) 

            .all() 

        ) 

        for job in jobs: 

            ti = session.query(TaskInstance).filter_by( 

                job_id=job.id, state=State.RUNNING).first() 

            logging.info("Failing job_id '{}'".format(job.id)) 

            if ti and ti.dag_id in self.dags: 

                dag = self.dags[ti.dag_id] 

                if ti.task_id in dag.task_ids: 

                    task = dag.get_task(ti.task_id) 

                    ti.task = task 

                    ti.handle_failure("{} killed as zombie".format(ti)) 

                    logging.info('Marked zombie job {} as failed'.format(ti)) 

            else: 

                job.state = State.FAILED 

        session.commit() 

 

    def bag_dag(self, dag, parent_dag, root_dag): 

        """ 

        Adds the DAG into the bag, recurses into sub dags. 

        """ 

        self.dags[dag.dag_id] = dag 

        dag.resolve_template_files() 

        dag.last_loaded = datetime.now() 

 

        for task in dag.tasks: 

            settings.policy(task) 

 

        if self.sync_to_db: 

            session = settings.Session() 

            orm_dag = session.query( 

                DagModel).filter(DagModel.dag_id == dag.dag_id).first() 

            if not orm_dag: 

                orm_dag = DagModel(dag_id=dag.dag_id) 

            orm_dag.fileloc = root_dag.full_filepath 

            orm_dag.is_subdag = dag.is_subdag 

            orm_dag.owners = root_dag.owner 

            orm_dag.is_active = True 

            session.merge(orm_dag) 

            session.commit() 

            session.close() 

 

        for subdag in dag.subdags: 

            subdag.full_filepath = dag.full_filepath 

            subdag.parent_dag = dag 

            subdag.fileloc = root_dag.full_filepath 

            subdag.is_subdag = True 

            self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag) 

        logging.info('Loaded DAG {dag}'.format(**locals())) 

 

    def collect_dags( 

            self, 

            dag_folder=None, 

            only_if_updated=True): 

        """ 

        Given a file path or a folder, this method looks for python modules, 

        imports them and adds them to the dagbag collection. 

 

        Note that if a .airflowignore file is found while processing, 

        the directory, it will behaves much like a .gitignore does, 

        ignoring files that match any of the regex patterns specified 

        in the file. 

        """ 

        dag_folder = dag_folder or self.dag_folder 

        if os.path.isfile(dag_folder): 

            self.process_file(dag_folder, only_if_updated=only_if_updated) 

        elif os.path.isdir(dag_folder): 

            patterns = [] 

            for root, dirs, files in os.walk(dag_folder, followlinks=True): 

                ignore_file = [f for f in files if f == '.airflowignore'] 

                if ignore_file: 

                    f = open(os.path.join(root, ignore_file[0]), 'r') 

                    patterns += [p for p in f.read().split('\n') if p] 

                    f.close() 

                for f in files: 

                    try: 

                        filepath = os.path.join(root, f) 

                        if not os.path.isfile(filepath): 

                            continue 

                        mod_name, file_ext = os.path.splitext( 

                            os.path.split(filepath)[-1]) 

                        if file_ext != '.py': 

                            continue 

                        if not any( 

                                [re.findall(p, filepath) for p in patterns]): 

                            self.process_file( 

                                filepath, only_if_updated=only_if_updated) 

                    except: 

                        pass 

 

    def deactivate_inactive_dags(self): 

        active_dag_ids = [dag.dag_id for dag in list(self.dags.values())] 

        session = settings.Session() 

        for dag in session.query( 

                DagModel).filter(~DagModel.dag_id.in_(active_dag_ids)).all(): 

            dag.is_active = False 

            session.merge(dag) 

        session.commit() 

        session.close() 

 

    def paused_dags(self): 

        session = settings.Session() 

        dag_ids = [dp.dag_id for dp in session.query(DagModel).filter( 

            DagModel.is_paused == True)] 

        session.commit() 

        session.close() 

        return dag_ids 

 

 

class User(Base): 

    __tablename__ = "user" 

 

    id = Column(Integer, primary_key=True) 

    username = Column(String(ID_LEN), unique=True) 

    email = Column(String(500)) 

    superuser = False 

 

    def __repr__(self): 

        return self.username 

 

    def get_id(self): 

        return str(self.id) 

 

    def is_superuser(self): 

        return self.superuser 

 

 

class Connection(Base): 

    """ 

    Placeholder to store information about different database instances 

    connection information. The idea here is that scripts use references to 

    database instances (conn_id) instead of hard coding hostname, logins and 

    passwords when using operators or hooks. 

    """ 

    __tablename__ = "connection" 

 

    id = Column(Integer(), primary_key=True) 

    conn_id = Column(String(ID_LEN)) 

    conn_type = Column(String(500)) 

    host = Column(String(500)) 

    schema = Column(String(500)) 

    login = Column(String(500)) 

    _password = Column('password', String(500)) 

    port = Column(Integer()) 

    is_encrypted = Column(Boolean, unique=False, default=False) 

    extra = Column(String(5000)) 

 

    def __init__( 

            self, conn_id=None, conn_type=None, 

            host=None, login=None, password=None, 

            schema=None, port=None, extra=None, 

            uri=None): 

        self.conn_id = conn_id 

        self.conn_type = conn_type 

        if uri: 

            self.parse_from_uri(uri) 

        else: 

            self.host = host 

            self.login = login 

            self.password = password 

            self.schema = schema 

            self.port = port 

            self.extra = extra 

 

    def parse_from_uri(self, uri): 

        temp_uri = urlparse(uri) 

        hostname = temp_uri.hostname or '' 

        if '%2f' in hostname: 

            hostname = hostname.replace('%2f', '/').replace('%2F', '/') 

        self.host = hostname 

        self.schema = temp_uri.path[1:] 

        self.login = temp_uri.username 

        self.password = temp_uri.password 

        self.port = temp_uri.port 

 

    def get_password(self): 

        if self._password and self.is_encrypted: 

            if not ENCRYPTION_ON: 

                raise AirflowException( 

                    "Can't decrypt, configuration is missing") 

            return FERNET.decrypt(bytes(self._password, 'utf-8')).decode() 

        else: 

            return self._password 

 

    def set_password(self, value): 

        if value: 

            try: 

                self._password = FERNET.encrypt(bytes(value, 'utf-8')).decode() 

                self.is_encrypted = True 

            except NameError: 

                self._password = value 

                self.is_encrypted = False 

 

    @declared_attr 

    def password(cls): 

        return synonym('_password', 

                       descriptor=property(cls.get_password, cls.set_password)) 

 

    def get_hook(self): 

        from airflow import hooks 

        try: 

            if self.conn_type == 'mysql': 

                return hooks.MySqlHook(mysql_conn_id=self.conn_id) 

            elif self.conn_type == 'postgres': 

                return hooks.PostgresHook(postgres_conn_id=self.conn_id) 

            elif self.conn_type == 'hive_cli': 

                return hooks.HiveCliHook(hive_cli_conn_id=self.conn_id) 

            elif self.conn_type == 'presto': 

                return hooks.PrestoHook(presto_conn_id=self.conn_id) 

            elif self.conn_type == 'hiveserver2': 

                return hooks.HiveServer2Hook(hiveserver2_conn_id=self.conn_id) 

            elif self.conn_type == 'sqlite': 

                return hooks.SqliteHook(sqlite_conn_id=self.conn_id) 

            elif self.conn_type == 'jdbc': 

                return hooks.JdbcHook(jdbc_conn_id=self.conn_id) 

            elif self.conn_type == 'mssql': 

                return hooks.MsSqlHook(mssql_conn_id=self.conn_id) 

            elif self.conn_type == 'oracle': 

                return hooks.OracleHook(oracle_conn_id=self.conn_id) 

            elif self.conn_type == 'vertica': 

                return hooks.VerticaHook(vertica_conn_id=self.conn_id) 

        except: 

            return None 

 

    def __repr__(self): 

        return self.conn_id 

 

    @property 

    def extra_dejson(self): 

        """Returns the extra property by deserializing json""" 

        obj = {} 

        if self.extra: 

            try: 

                obj = json.loads(self.extra) 

            except Exception as e: 

                logging.exception(e) 

                logging.error( 

                    "Failed parsing the json for " 

                    "conn_id {}".format(self.conn_id)) 

        return obj 

 

 

class DagPickle(Base): 

    """ 

    Dags can originate from different places (user repos, master repo, ...) 

    and also get executed in different places (different executors). This 

    object represents a version of a DAG and becomes a source of truth for 

    a BackfillJob execution. A pickle is a native python serialized object, 

    and in this case gets stored in the database for the duration of the job. 

 

    The executors pick up the DagPickle id and read the dag definition from 

    the database. 

    """ 

    id = Column(Integer, primary_key=True) 

    pickle = Column(PickleType(pickler=dill)) 

    created_dttm = Column(DateTime, default=func.now()) 

    pickle_hash = Column(Text) 

 

    __tablename__ = "dag_pickle" 

 

    def __init__(self, dag): 

        self.dag_id = dag.dag_id 

        if hasattr(dag, 'template_env'): 

            dag.template_env = None 

        self.pickle_hash = hash(dag) 

        self.pickle = dag 

 

 

class TaskInstance(Base): 

    """ 

    Task instances store the state of a task instance. This table is the 

    authority and single source of truth around what tasks have run and the 

    state they are in. 

 

    The SqlAchemy model doesn't have a SqlAlchemy foreign key to the task or 

    dag model deliberately to have more control over transactions. 

 

    Database transactions on this table should insure double triggers and 

    any confusion around what task instances are or aren't ready to run 

    even while multiple schedulers may be firing task instances. 

    """ 

 

    __tablename__ = "task_instance" 

 

    task_id = Column(String(ID_LEN), primary_key=True) 

    dag_id = Column(String(ID_LEN), primary_key=True) 

    execution_date = Column(DateTime, primary_key=True) 

    start_date = Column(DateTime) 

    end_date = Column(DateTime) 

    duration = Column(Float) 

    state = Column(String(20)) 

    try_number = Column(Integer) 

    hostname = Column(String(1000)) 

    unixname = Column(String(1000)) 

    job_id = Column(Integer) 

    pool = Column(String(50)) 

    queue = Column(String(50)) 

    priority_weight = Column(Integer) 

    operator = Column(String(1000)) 

    queued_dttm = Column(DateTime) 

 

    __table_args__ = ( 

        Index('ti_dag_state', dag_id, state), 

        Index('ti_state_lkp', dag_id, task_id, execution_date, state), 

        Index('ti_pool', pool, state, priority_weight), 

    ) 

 

    def __init__(self, task, execution_date, state=None): 

        self.dag_id = task.dag_id 

        self.task_id = task.task_id 

        self.execution_date = execution_date 

        self.task = task 

        self.queue = task.queue 

        self.pool = task.pool 

        self.priority_weight = task.priority_weight_total 

        self.try_number = 1 

        self.unixname = getpass.getuser() 

        if state: 

            self.state = state 

 

    def command( 

            self, 

            mark_success=False, 

            ignore_dependencies=False, 

            force=False, 

            local=False, 

            pickle_id=None, 

            raw=False, 

            task_start_date=None, 

            job_id=None, 

            pool=None): 

        """ 

        Returns a command that can be executed anywhere where airflow is 

        installed. This command is part of the message sent to executors by 

        the orchestrator. 

        """ 

        iso = self.execution_date.isoformat() 

        cmd = "airflow run {self.dag_id} {self.task_id} {iso} " 

        cmd += "--mark_success " if mark_success else "" 

        cmd += "--pickle {pickle_id} " if pickle_id else "" 

        cmd += "--job_id {job_id} " if job_id else "" 

        cmd += "-i " if ignore_dependencies else "" 

        cmd += "--force " if force else "" 

        cmd += "--local " if local else "" 

        cmd += "--pool {pool} " if pool else "" 

        cmd += "--raw " if raw else "" 

        if task_start_date: 

            cmd += "-s " + task_start_date.isoformat() + ' ' 

        if not pickle_id and self.task.dag and self.task.dag.full_filepath: 

            cmd += "-sd DAGS_FOLDER/{self.task.dag.filepath} " 

        return cmd.format(**locals()) 

 

    @property 

    def log_filepath(self): 

        iso = self.execution_date.isoformat() 

        log = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER')) 

        return ( 

            "{log}/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals())) 

 

    @property 

    def log_url(self): 

        iso = self.execution_date.isoformat() 

        BASE_URL = configuration.get('webserver', 'BASE_URL') 

        return BASE_URL + ( 

            "/admin/airflow/log" 

            "?dag_id={self.dag_id}" 

            "&task_id={self.task_id}" 

            "&execution_date={iso}" 

        ).format(**locals()) 

 

    @property 

    def mark_success_url(self): 

        iso = self.execution_date.isoformat() 

        BASE_URL = configuration.get('webserver', 'BASE_URL') 

        return BASE_URL + ( 

            "/admin/airflow/action" 

            "?action=success" 

            "&task_id={self.task_id}" 

            "&dag_id={self.dag_id}" 

            "&execution_date={iso}" 

            "&upstream=false" 

            "&downstream=false" 

        ).format(**locals()) 

 

    def current_state(self, main_session=None): 

        """ 

        Get the very latest state from the database, if a session is passed, 

        we use and looking up the state becomes part of the session, otherwise 

        a new session is used. 

        """ 

        session = main_session or settings.Session() 

        TI = TaskInstance 

        ti = session.query(TI).filter( 

            TI.dag_id == self.dag_id, 

            TI.task_id == self.task_id, 

            TI.execution_date == self.execution_date, 

        ).all() 

        if ti: 

            state = ti[0].state 

        else: 

            state = None 

        if not main_session: 

            session.commit() 

            session.close() 

        return state 

 

    def error(self, main_session=None): 

        """ 

        Forces the task instance's state to FAILED in the database. 

        """ 

        session = settings.Session() 

        logging.error("Recording the task instance as FAILED") 

        self.state = State.FAILED 

        session.merge(self) 

        session.commit() 

        session.close() 

 

    def refresh_from_db(self, main_session=None): 

        """ 

        Refreshes the task instance from the database based on the primary key 

        """ 

        session = main_session or settings.Session() 

        TI = TaskInstance 

        ti = session.query(TI).filter( 

            TI.dag_id == self.dag_id, 

            TI.task_id == self.task_id, 

            TI.execution_date == self.execution_date, 

        ).first() 

        if ti: 

            self.state = ti.state 

            self.start_date = ti.start_date 

            self.end_date = ti.end_date 

            self.try_number = ti.try_number 

 

        if not main_session: 

            session.commit() 

            session.close() 

 

    @property 

    def key(self): 

        """ 

        Returns a tuple that identifies the task instance uniquely 

        """ 

        return (self.dag_id, self.task_id, self.execution_date) 

 

    def is_queueable(self, flag_upstream_failed=False): 

        """ 

        Returns a boolean on whether the task instance has met all dependencies 

        and is ready to run. It considers the task's state, the state 

        of its dependencies, depends_on_past and makes sure the execution 

        isn't in the future. It doesn't take into 

        account whether the pool has a slot for it to run. 

 

        :param flag_upstream_failed: This is a hack to generate 

            the upstream_failed state creation while checking to see 

            whether the task instance is runnable. It was the shortest 

            path to add the feature 

        :type flag_upstream_failed: boolean 

        """ 

        if self.execution_date > datetime.now(): 

            return False 

        elif self.state == State.UP_FOR_RETRY and not self.ready_for_retry(): 

            return False 

        elif self.task.end_date and self.execution_date > self.task.end_date: 

            return False 

        elif self.state in (State.SKIPPED, State.QUEUED): 

            return False 

        elif ( 

                self.state in State.runnable() and 

                self.are_dependencies_met( 

                    flag_upstream_failed=flag_upstream_failed)): 

            return True 

        else: 

            return False 

 

    def is_runnable(self, flag_upstream_failed=False): 

        """ 

        Returns whether a task is ready to run AND there's room in the 

        queue. 

        """ 

        return self.is_queueable(flag_upstream_failed) and not self.pool_full() 

 

    def are_dependents_done(self, main_session=None): 

        """ 

        Checks whether the dependents of this task instance have all succeeded. 

        This is meant to be used by wait_for_downstream. 

 

        This is useful when you do not want to start processing the next 

        schedule of a task until the dependents are done. For instance, 

        if the task DROPs and recreates a table. 

        """ 

        session = main_session or settings.Session() 

        task = self.task 

 

        if not task._downstream_list: 

            return True 

 

        downstream_task_ids = [t.task_id for t in task._downstream_list] 

        ti = session.query(func.count(TaskInstance.task_id)).filter( 

            TaskInstance.dag_id == self.dag_id, 

            TaskInstance.task_id.in_(downstream_task_ids), 

            TaskInstance.execution_date == self.execution_date, 

            TaskInstance.state == State.SUCCESS, 

        ) 

        count = ti[0][0] 

        if not main_session: 

            session.commit() 

            session.close() 

        return count == len(task._downstream_list) 

 

    def are_dependencies_met( 

            self, main_session=None, flag_upstream_failed=False): 

        """ 

        Returns a boolean on whether the upstream tasks are in a SUCCESS state 

        and considers depends_on_past and the previous run's state. 

 

        :param flag_upstream_failed: This is a hack to generate 

            the upstream_failed state creation while checking to see 

            whether the task instance is runnable. It was the shortest 

            path to add the feature 

        :type flag_upstream_failed: boolean 

        """ 

        TI = TaskInstance 

        TR = TriggerRule 

 

        # Using the session if passed as param 

        session = main_session or settings.Session() 

        task = self.task 

 

        # Checking that the depends_on_past is fulfilled 

        if (task.depends_on_past and 

                not self.execution_date == task.start_date): 

            previous_ti = session.query(TI).filter( 

                TI.dag_id == self.dag_id, 

                TI.task_id == task.task_id, 

                TI.execution_date == 

                    self.task.dag.previous_schedule(self.execution_date), 

                TI.state == State.SUCCESS, 

            ).first() 

            if not previous_ti: 

                return False 

 

            # Applying wait_for_downstream 

            previous_ti.task = self.task 

            if task.wait_for_downstream and not \ 

                    previous_ti.are_dependents_done(session): 

                return False 

 

        # Checking that all upstream dependencies have succeeded 

        if not task._upstream_list or task.trigger_rule == TR.DUMMY: 

            return True 

        else: 

            upstream_task_ids = [t.task_id for t in task._upstream_list] 

            qry = ( 

                session 

                .query( 

                    func.coalesce(func.sum( 

                        case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), 

                    func.coalesce(func.sum( 

                        case([(TI.state == State.SKIPPED, 1)], else_=0)), 0), 

                    func.coalesce(func.sum( 

                        case([(TI.state == State.FAILED, 1)], else_=0)), 0), 

                    func.coalesce(func.sum( 

                        case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0), 

                    func.count(TI.task_id), 

                ) 

                .filter( 

                    TI.dag_id == self.dag_id, 

                    TI.task_id.in_(upstream_task_ids), 

                    TI.execution_date == self.execution_date, 

                    TI.state.in_([ 

                        State.SUCCESS, State.FAILED, 

                        State.UPSTREAM_FAILED, State.SKIPPED]), 

                ) 

            ) 

            successes, skipped, failed, upstream_failed, done = qry.first() 

            if flag_upstream_failed: 

                if skipped >= len(task._upstream_list): 

                    self.state = State.SKIPPED 

                    self.start_date = datetime.now() 

                    self.end_date = datetime.now() 

                    session.merge(self) 

                elif failed + upstream_failed >= len(task._upstream_list): 

                    self.state = State.UPSTREAM_FAILED 

                    self.start_date = datetime.now() 

                    self.end_date = datetime.now() 

                    session.merge(self) 

 

            if task.trigger_rule == TR.ONE_SUCCESS and successes > 0: 

                return True 

            elif (task.trigger_rule == TR.ONE_FAILED and 

                  (failed + upstream_failed) > 0): 

                return True 

            elif (task.trigger_rule == TR.ALL_SUCCESS and 

                  successes == len(task._upstream_list)): 

                return True 

            elif (task.trigger_rule == TR.ALL_FAILED and 

                  failed + upstream_failed == len(task._upstream_list)): 

                return True 

            elif (task.trigger_rule == TR.ALL_DONE and 

                  done == len(task._upstream_list)): 

                return True 

 

        if not main_session: 

            session.commit() 

            session.close() 

        return False 

 

    def __repr__(self): 

        return ( 

            "<TaskInstance: {ti.dag_id}.{ti.task_id} " 

            "{ti.execution_date} [{ti.state}]>" 

        ).format(ti=self) 

 

    def ready_for_retry(self): 

        """ 

        Checks on whether the task instance is in the right state and timeframe 

        to be retried. 

        """ 

        return self.state == State.UP_FOR_RETRY and \ 

            self.end_date + self.task.retry_delay < datetime.now() 

 

    @provide_session 

    def pool_full(self, session): 

        """ 

        Returns a boolean as to whether the slot pool has room for this 

        task to run 

        """ 

        if not self.task.pool: 

            return False 

 

        pool = ( 

            session 

            .query(Pool) 

            .filter(Pool.pool == self.task.pool) 

            .first() 

        ) 

        if not pool: 

            raise ValueError('Task specified a pool ({}) but the pool ' 

                             'doesn\'t exist!').format(self.task.pool) 

        open_slots = pool.open_slots(session=session) 

 

        return open_slots <= 0 

 

    def run( 

            self, 

            verbose=True, 

            ignore_dependencies=False,  # Doesn't check for deps, just runs 

            force=False,  # Disregards previous successes 

            mark_success=False,  # Don't run the task, act as if it succeeded 

            test_mode=False,  # Doesn't record success or failure in the DB 

            job_id=None, 

            pool=None,): 

        """ 

        Runs the task instance. 

        """ 

        task = self.task 

        self.pool = pool or task.pool 

        session = settings.Session() 

        self.refresh_from_db(session) 

        session.commit() 

        self.job_id = job_id 

        iso = datetime.now().isoformat() 

        self.hostname = socket.gethostname() 

        self.operator = task.__class__.__name__ 

 

        if self.state == State.RUNNING: 

            logging.warning("Another instance is running, skipping.") 

        elif not force and self.state == State.SUCCESS: 

            logging.info( 

                "Task {self} previously succeeded" 

                " on {self.end_date}".format(**locals()) 

            ) 

        elif not ignore_dependencies and \ 

                not self.are_dependencies_met(session): 

            logging.warning("Dependencies not met yet") 

        elif self.state == State.UP_FOR_RETRY and \ 

                not self.ready_for_retry(): 

            next_run = (self.end_date + task.retry_delay).isoformat() 

            logging.info( 

                "Not ready for retry yet. " + 

                "Next run after {0}".format(next_run) 

            ) 

        elif force or self.state in State.runnable(): 

            msg = "\n" + ("-" * 80) 

            if self.state == State.UP_FOR_RETRY: 

                msg += "\nRetry run {self.try_number} out of {task.retries} " 

                msg += "starting @{iso}\n" 

            else: 

                msg += "\nNew run starting @{iso}\n" 

            msg += ("-" * 80) 

            logging.info(msg.format(**locals())) 

 

            self.start_date = datetime.now() 

            if not force and (self.pool or self.task.dag.concurrency_reached): 

                # If a pool is set for this task, marking the task instance 

                # as QUEUED 

                self.state = State.QUEUED 

                self.queued_dttm = datetime.now() 

                session.merge(self) 

                session.commit() 

                session.close() 

                logging.info("Queuing into pool {}".format(self.pool)) 

                return 

            if self.state == State.UP_FOR_RETRY: 

                self.try_number += 1 

            else: 

                self.try_number = 1 

            if not test_mode: 

                session.add(Log(State.RUNNING, self)) 

            self.state = State.RUNNING 

            self.end_date = None 

            if not test_mode: 

                session.merge(self) 

            session.commit() 

            if verbose: 

                if mark_success: 

                    msg = "Marking success for " 

                else: 

                    msg = "Executing " 

                msg += "{self.task} on {self.execution_date}" 

 

            context = {} 

            try: 

                logging.info(msg.format(self=self)) 

                if not mark_success: 

                    context = self.get_template_context() 

 

                    task_copy = copy.copy(task) 

                    self.task = task_copy 

 

                    def signal_handler(signum, frame): 

                        '''Setting kill signal handler''' 

                        logging.error("Killing subprocess") 

                        task_copy.on_kill() 

                        raise AirflowException("Task received SIGTERM signal") 

                    signal.signal(signal.SIGTERM, signal_handler) 

 

                    self.render_templates() 

                    task_copy.pre_execute(context=context) 

 

                    # If a timout is specified for the task, make it fail 

                    # if it goes beyond 

                    result = None 

                    if task_copy.execution_timeout: 

                        with utils.timeout(int( 

                                task_copy.execution_timeout.total_seconds())): 

                            result = task_copy.execute(context=context) 

 

                    else: 

                        result = task_copy.execute(context=context) 

 

                    # If the task returns a result, push an XCom containing it 

                    if result is not None: 

                        self.xcom_push(key=XCOM_RETURN_KEY, value=result) 

 

                    task_copy.post_execute(context=context) 

            except (Exception, KeyboardInterrupt) as e: 

                self.handle_failure(e, test_mode, context) 

                raise 

 

            # Recording SUCCESS 

            session = settings.Session() 

            self.end_date = datetime.now() 

            self.set_duration() 

            self.state = State.SUCCESS 

            if not test_mode: 

                session.add(Log(State.SUCCESS, self)) 

                session.merge(self) 

 

            # Success callback 

            try: 

                if task.on_success_callback: 

                    task.on_success_callback(context) 

            except Exception as e3: 

                logging.error("Failed when executing success callback") 

                logging.exception(e3) 

 

        session.commit() 

 

    def dry_run(self): 

        task = self.task 

        task_copy = copy.copy(task) 

        self.task = task_copy 

 

        self.render_templates() 

        task_copy.dry_run() 

 

 

    def handle_failure(self, error, test_mode=False, context=None): 

        logging.exception(error) 

        task = self.task 

        session = settings.Session() 

        self.end_date = datetime.now() 

        self.set_duration() 

        if not test_mode: 

            session.add(Log(State.FAILED, self)) 

 

        # Let's go deeper 

        try: 

            if self.try_number <= task.retries: 

                self.state = State.UP_FOR_RETRY 

                if task.email_on_retry and task.email: 

                    self.email_alert(error, is_retry=True) 

            else: 

                self.state = State.FAILED 

                if task.email_on_failure and task.email: 

                    self.email_alert(error, is_retry=False) 

        except Exception as e2: 

            logging.error( 

                'Failed to send email to: ' + str(task.email)) 

            logging.exception(e2) 

 

        # Handling callbacks pessimistically 

        try: 

            if self.state == State.UP_FOR_RETRY and task.on_retry_callback: 

                task.on_retry_callback(context) 

            if self.state == State.FAILED and task.on_failure_callback: 

                task.on_failure_callback(context) 

        except Exception as e3: 

            logging.error("Failed at executing callback") 

            logging.exception(e3) 

 

        if not test_mode: 

            session.merge(self) 

        session.commit() 

        logging.error(str(error)) 

 

    @provide_session 

    def get_template_context(self, session=None): 

        task = self.task 

        from airflow import macros 

        tables = None 

        if 'tables' in task.params: 

            tables = task.params['tables'] 

        ds = self.execution_date.isoformat()[:10] 

        yesterday_ds = (self.execution_date - timedelta(1)).isoformat()[:10] 

        tomorrow_ds = (self.execution_date + timedelta(1)).isoformat()[:10] 

        ds_nodash = ds.replace('-', '') 

        ti_key_str = "{task.dag_id}__{task.task_id}__{ds_nodash}" 

        ti_key_str = ti_key_str.format(**locals()) 

 

        params = {} 

        run_id = '' 

        dag_run = None 

        if hasattr(task, 'dag'): 

            if task.dag.params: 

                params.update(task.dag.params) 

            dag_run = ( 

                session.query(DagRun) 

                .filter_by( 

                    dag_id=task.dag.dag_id, 

                    execution_date=self.execution_date) 

                .first() 

            ) 

            run_id = dag_run.run_id if dag_run else None 

            session.expunge_all() 

            session.commit() 

 

        if task.params: 

            params.update(task.params) 

 

        return { 

            'dag': task.dag, 

            'ds': ds, 

            'yesterday_ds': yesterday_ds, 

            'tomorrow_ds': tomorrow_ds, 

            'END_DATE': ds, 

            'ds_nodash': ds_nodash, 

            'end_date': ds, 

            'dag_run': dag_run, 

            'run_id': run_id, 

            'execution_date': self.execution_date, 

            'latest_date': ds, 

            'macros': macros, 

            'params': params, 

            'tables': tables, 

            'task': task, 

            'task_instance': self, 

            'ti': self, 

            'task_instance_key_str': ti_key_str, 

            'conf': configuration, 

        } 

 

    def render_templates(self): 

        task = self.task 

        jinja_context = self.get_template_context() 

        if hasattr(self, 'task') and hasattr(self.task, 'dag'): 

            if self.task.dag.user_defined_macros: 

                jinja_context.update( 

                    self.task.dag.user_defined_macros) 

 

        rt = self.task.render_template  # shortcut to method 

        for attr in task.__class__.template_fields: 

            content = getattr(task, attr) 

            if content: 

                rendered_content = self.task.render_template(content, jinja_context) 

                setattr(task, attr, rendered_content) 

 

    def email_alert(self, exception, is_retry=False): 

        task = self.task 

        title = "Airflow alert: {self}".format(**locals()) 

        exception = str(exception).replace('\n', '<br>') 

        try_ = task.retries + 1 

        body = ( 

            "Try {self.try_number} out of {try_}<br>" 

            "Exception:<br>{exception}<br>" 

            "Log: <a href='{self.log_url}'>Link</a><br>" 

            "Host: {self.hostname}<br>" 

            "Log file: {self.log_filepath}<br>" 

            "Mark success: <a href='{self.mark_success_url}'>Link</a><br>" 

        ).format(**locals()) 

        utils.send_email(task.email, title, body) 

 

    def set_duration(self): 

        if self.end_date and self.start_date: 

            self.duration = (self.end_date - self.start_date).total_seconds() 

        else: 

            self.duration = None 

 

    def xcom_push( 

            self, 

            key, 

            value, 

            execution_date=None): 

        """ 

        Make an XCom available for tasks to pull. 

 

        :param key: A key for the XCom 

        :type key: string 

        :param value: A value for the XCom. The value is pickled and stored 

            in the database. 

        :type value: any pickleable object 

        :param execution_date: if provided, the XCom will not be visible until 

            this date. This can be used, for example, to send a message to a 

            task on a future date without it being immediately visible. 

        :type execution_date: datetime 

        """ 

 

        if execution_date and execution_date < self.execution_date: 

            raise ValueError( 

                'execution_date can not be in the past (current ' 

                'execution_date is {}; received {})'.format( 

                    self.execution_date, execution_date)) 

 

        XCom.set( 

            key=key, 

            value=value, 

            task_id=self.task_id, 

            dag_id=self.dag_id, 

            execution_date=execution_date or self.execution_date) 

 

    def xcom_pull( 

            self, 

            task_ids, 

            dag_id=None, 

            key=XCOM_RETURN_KEY, 

            include_prior_dates=False): 

        """ 

        Pull XComs that optionally meet certain criteria. 

 

        The default value for `key` limits the search to XComs 

        that were returned by other tasks (as opposed to those that were pushed 

        manually). To remove this filter, pass key=None (or any desired value). 

 

        If a single task_id string is provided, the result is the value of the 

        most recent matching XCom from that task_id. If multiple task_ids are 

        provided, a tuple of matching values is returned. None is returned 

        whenever no matches are found. 

 

        :param key: A key for the XCom. If provided, only XComs with matching 

            keys will be returned. The default key is 'return_value', also 

            available as a constant XCOM_RETURN_KEY. This key is automatically 

            given to XComs returned by tasks (as opposed to being pushed 

            manually). To remove the filter, pass key=None. 

        :type key: string 

        :param task_ids: Only XComs from tasks with matching ids will be 

            pulled. Can pass None to remove the filter. 

        :type task_ids: string or iterable of strings (representing task_ids) 

        :param dag_id: If provided, only pulls XComs from this DAG. 

            If None (default), the DAG of the calling task is used. 

        :type dag_id: string 

        :param include_prior_dates: If False, only XComs from the current 

            execution_date are returned. If True, XComs from previous dates 

            are returned as well. 

        :type include_prior_dates: bool 

        """ 

 

        if dag_id is None: 

            dag_id = self.dag_id 

 

        pull_fn = functools.partial( 

            XCom.get_one, 

            execution_date=self.execution_date, 

            key=key, 

            dag_id=dag_id, 

            include_prior_dates=include_prior_dates) 

 

        if is_container(task_ids): 

            return tuple(pull_fn(task_id=t) for t in task_ids) 

        else: 

            return pull_fn(task_id=task_ids) 

 

 

class Log(Base): 

    """ 

    Used to actively log events to the database 

    """ 

 

    __tablename__ = "log" 

 

    id = Column(Integer, primary_key=True) 

    dttm = Column(DateTime) 

    dag_id = Column(String(ID_LEN)) 

    task_id = Column(String(ID_LEN)) 

    event = Column(String(30)) 

    execution_date = Column(DateTime) 

    owner = Column(String(500)) 

    extra = Column(Text) 

 

    def __init__(self, event, task_instance, owner=None, extra=None): 

        self.dttm = datetime.now() 

        self.event = event 

        self.extra = extra 

        self.owner = owner or task_instance.task.owner 

 

        if task_instance: 

            self.dag_id = task_instance.dag_id 

            self.task_id = task_instance.task_id 

            self.execution_date = task_instance.execution_date 

 

 

@functools.total_ordering 

class BaseOperator(object): 

    """ 

    Abstract base class for all operators. Since operators create objects that 

    become node in the dag, BaseOperator contains many recursive methods for 

    dag crawling behavior. To derive this class, you are expected to override 

    the constructor as well as the 'execute' method. 

 

    Operators derived from this task should perform or trigger certain tasks 

    synchronously (wait for completion). Example of operators could be an 

    operator the runs a Pig job (PigOperator), a sensor operator that 

    waits for a partition to land in Hive (HiveSensorOperator), or one that 

    moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these 

    operators (tasks) target specific operations, running specific scripts, 

    functions or data transfers. 

 

    This class is abstract and shouldn't be instantiated. Instantiating a 

    class derived from this one results in the creation of a task object, 

    which ultimately becomes a node in DAG objects. Task dependencies should 

    be set by using the set_upstream and/or set_downstream methods. 

 

    Note that this class is derived from SQLAlquemy's Base class, which 

    allows us to push metadata regarding tasks to the database. Deriving this 

    classes needs to implement the polymorphic specificities documented in 

    SQLAlchemy. This should become clear while reading the code for other 

    operators. 

 

    :param task_id: a unique, meaningful id for the task 

    :type task_id: string 

    :param owner: the owner of the task, using the unix username is recommended 

    :type owner: string 

    :param retries: the number of retries that should be performed before 

        failing the task 

    :type retries: int 

    :param retry_delay: delay between retries 

    :type retry_delay: timedelta 

    :param start_date: The ``start_date`` for the task, determines 

        the ``execution_date`` for the first task instanec. The best practice 

        is to have the start_date rounded 

        to your DAG's ``schedule_interval``. Daily jobs have their start_date 

        some day at 00:00:00, hourly jobs have their start_date at 00:00 

        of a specific hour. Note that Airflow simply looks at the latest 

        ``execution_date`` and adds the ``schedule_interval`` to determine 

        the next ``execution_date``. It is also very important 

        to note that different tasks' dependencies 

        need to line up in time. If task A depends on task B and their 

        start_date are offset in a way that their execution_date don't line 

        up, A's dependencies will never be met. If you are looking to delay 

        a task, for example running a daily task at 2AM, look into the 

        ``TimeSensor`` and ``TimeDeltaSensor``. 

    :type start_date: datetime 

    :param end_date: if specified, the scheduler won't go beyond this date 

    :type end_date: datetime 

    :param depends_on_past: when set to true, task instances will run 

        sequentially while relying on the previous task's schedule to 

        succeed. The task instance for the start_date is allowed to run. 

    :type depends_on_past: bool 

    :param wait_for_downstream: when set to true, an instance of task 

        X will wait for tasks immediately downstream of the previous instance 

        of task X to finish successfully before it runs. This is useful if the 

        different instances of a task X alter the same asset, and this asset 

        is used by tasks downstream of task X. Note that depends_on_past 

        is forced to True wherever wait_for_downstream is used. 

    :type wait_for_downstream: bool 

    :param queue: which queue to target when running this job. Not 

        all executors implement queue management, the CeleryExecutor 

        does support targeting specific queues. 

    :type queue: str 

    :param dag: a reference to the dag the task is attached to (if any) 

    :type dag: DAG 

    :param priority_weight: priority weight of this task against other task. 

        This allows the executor to trigger higher priority tasks before 

        others when things get backed up. 

    :type priority_weight: int 

    :param pool: the slot pool this task should run in, slot pools are a 

        way to limit concurrency for certain tasks 

    :type pool: str 

    :param sla: time by which the job is expected to succeed. Note that 

        this represents the ``timedelta`` after the period is closed. For 

        example if you set an SLA of 1 hour, the scheduler would send dan email 

        soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance 

        has not succeede yet. 

        The scheduler pays special attention for jobs with an SLA and 

        sends alert 

        emails for sla misses. SLA misses are also recorded in the database 

        for future reference. All tasks that share the same SLA time 

        get bundled in a single email, sent soon after that time. SLA 

        notification are sent once and only once for each task instance. 

    :type sla: datetime.timedelta 

    :param execution_timeout: max time allowed for the execution of 

        this task instance, if it goes beyond it will raise and fail. 

    :type execution_timeout: datetime.timedelta 

    :param on_failure_callback: a function to be called when a task instance 

        of this task fails. a context dictionary is passed as a single 

        parameter to this function. Context contains references to related 

        objects to the task instance and is documented under the macros 

        section of the API. 

    :type on_failure_callback: callable 

    :param on_retry_callback: much like the ``on_failure_callback`` excepts 

        that it is executed when retries occur. 

    :param on_success_callback: much like the ``on_failure_callback`` excepts 

        that it is executed when the task succeeds. 

    :type on_success_callback: callable 

    :param trigger_rule: defines the rule by which dependencies are applied 

        for the task to get triggered. Options are: 

        ``{ all_success | all_failed | all_done | one_success | 

        one_failed | dummy}`` 

        default is ``all_success``. Options can be set as string or 

        using the constants defined in the static class 

        ``airflow.utils.TriggerRule`` 

    :type trigger_rule: str 

    """ 

 

    # For derived classes to define which fields will get jinjaified 

    template_fields = [] 

    # Defines wich files extensions to look for in the templated fields 

    template_ext = [] 

    # Defines the color in the UI 

    ui_color = '#fff' 

    ui_fgcolor = '#000' 

 

    @apply_defaults 

    def __init__( 

            self, 

            task_id, 

            owner, 

            email=None, 

            email_on_retry=True, 

            email_on_failure=True, 

            retries=0, 

            retry_delay=timedelta(seconds=300), 

            start_date=None, 

            end_date=None, 

            schedule_interval=None,  # not hooked as of now 

            depends_on_past=False, 

            wait_for_downstream=False, 

            dag=None, 

            params=None, 

            default_args=None, 

            adhoc=False, 

            priority_weight=1, 

            queue=configuration.get('celery', 'default_queue'), 

            pool=None, 

            sla=None, 

            execution_timeout=None, 

            on_failure_callback=None, 

            on_success_callback=None, 

            on_retry_callback=None, 

            trigger_rule=TriggerRule.ALL_SUCCESS, 

            *args, 

            **kwargs): 

 

        utils.validate_key(task_id) 

        self.dag_id = dag.dag_id if dag else 'adhoc_' + owner 

        self.task_id = task_id 

        self.owner = owner 

        self.email = email 

        self.email_on_retry = email_on_retry 

        self.email_on_failure = email_on_failure 

        self.start_date = start_date 

        if start_date and not isinstance(start_date, datetime): 

            logging.warning( 

                "start_date for {} isn't datetime.datetime".format(self)) 

        self.end_date = end_date 

        self.trigger_rule = trigger_rule 

        self.depends_on_past = depends_on_past 

        self.wait_for_downstream = wait_for_downstream 

        if wait_for_downstream: 

            self.depends_on_past = True 

 

        if schedule_interval: 

            logging.warning( 

                "schedule_interval is used for {}, though it has " 

                "been deprecated as a task parameter, you need to " 

                "specify it as a DAG parameter instead".format(self)) 

        self._schedule_interval = schedule_interval 

        self.retries = retries 

        self.queue = queue 

        self.pool = pool 

        self.sla = sla 

        self.execution_timeout = execution_timeout 

        self.on_failure_callback = on_failure_callback 

        self.on_success_callback = on_success_callback 

        self.on_retry_callback = on_retry_callback 

        if isinstance(retry_delay, timedelta): 

            self.retry_delay = retry_delay 

        else: 

            logging.debug("retry_delay isn't timedelta object, assuming secs") 

            self.retry_delay = timedelta(seconds=retry_delay) 

        self.params = params or {}  # Available in templates! 

        self.adhoc = adhoc 

        self.priority_weight = priority_weight 

        if dag: 

            dag.add_task(self) 

            self.dag = dag 

 

        # Private attributes 

        self._upstream_list = [] 

        self._downstream_list = [] 

 

        self._comps = { 

            'task_id', 

            'dag_id', 

            'owner', 

            'email', 

            'email_on_retry', 

            'retry_delay', 

            'start_date', 

            'schedule_interval', 

            'depends_on_past', 

            'wait_for_downstream', 

            'adhoc', 

            'priority_weight', 

            'sla', 

            'execution_timeout', 

            'on_failure_callback', 

            'on_success_callback', 

            'on_retry_callback', 

        } 

 

    def __eq__(self, other): 

        return ( 

            type(self) == type(other) and 

            all(self.__dict__.get(c, None) == other.__dict__.get(c, None) 

                for c in self._comps)) 

 

    def __neq__(self, other): 

        return not self == other 

 

    def __lt__(self, other): 

        return self.task_id < other.task_id 

 

    def __hash__(self): 

        hash_components = [type(self)] 

        for c in self._comps: 

            val = getattr(self, c, None) 

            try: 

                hash(val) 

                hash_components.append(val) 

            except TypeError: 

                hash_components.append(repr(val)) 

        return hash(tuple(hash_components)) 

 

    @property 

    def schedule_interval(self): 

        """ 

        The schedule interval of the DAG always wins over individual tasks so 

        that tasks within a DAG always line up. The task still needs a 

        schedule_interval as it may not be attached to a DAG. 

        """ 

        if hasattr(self, 'dag') and self.dag: 

            return self.dag._schedule_interval 

        else: 

            return self._schedule_interval 

 

    @property 

    def priority_weight_total(self): 

        return sum([ 

            t.priority_weight 

            for t in self.get_flat_relatives(upstream=False) 

        ]) + self.priority_weight 

 

    def pre_execute(self, context): 

        """ 

        This is triggered right before self.execute, it's mostly a hook 

        for people deriving operators. 

        """ 

        pass 

 

    def execute(self, context): 

        """ 

        This is the main method to derive when creating an operator. 

        Context is the same dictionary used as when rendering jinja templates. 

 

        Refer to get_template_context for more context. 

        """ 

        raise NotImplementedError() 

 

    def post_execute(self, context): 

        """ 

        This is triggered right after self.execute, it's mostly a hook 

        for people deriving operators. 

        """ 

        pass 

 

    def on_kill(self): 

        ''' 

        Override this method to cleanup subprocesses when a task instance 

        gets killed. Any use of the threading, subprocess or multiprocessing 

        module within an operator needs to be cleaned up or it will leave 

        ghost processes behind. 

        ''' 

        pass 

 

    def __deepcopy__(self, memo): 

        """ 

        Hack sorting double chained task lists by task_id to avoid hitting 

        max_depth on deepcopy operations. 

        """ 

        sys.setrecursionlimit(5000)  # TODO fix this in a better way 

        cls = self.__class__ 

        result = cls.__new__(cls) 

        memo[id(self)] = result 

 

        self._upstream_list = sorted(self._upstream_list, key=lambda x: x.task_id) 

        self._downstream_list = sorted(self._downstream_list, key=lambda x: x.task_id) 

        for k, v in list(self.__dict__.items()): 

            if k not in ('user_defined_macros', 'params'): 

                setattr(result, k, copy.deepcopy(v, memo)) 

        return result 

 

    def render_template_from_field(self, content, context, jinja_env): 

        ''' 

        Renders a template from a field. If the field is a string, it will 

        simply render the string and return the result. If it is a collection or 

        nested set of collections, it will traverse the structure and render 

        all strings in it. 

        ''' 

        rt = self.render_template 

        if isinstance(content, six.string_types): 

            result = jinja_env.from_string(content).render(**context) 

        elif isinstance(content, (list, tuple)): 

            result = [rt(e, context) for e in content] 

        elif isinstance(content, dict): 

            result = { 

                k: rt(v, context) 

                for k, v in list(content.items())} 

        else: 

            param_type = type(content) 

            msg = ( 

                "Type '{param_type}' used for parameter '{attr}' is " 

                "not supported for templating").format(**locals()) 

            raise AirflowException(msg) 

        return result 

 

    def render_template(self, content, context): 

        ''' 

        Renders a template either from a file or directly in a field, and returns 

        the rendered result. 

        ''' 

        jinja_env = self.dag.get_template_env() \ 

            if hasattr(self, 'dag') \ 

            else jinja2.Environment(cache_size=0) 

 

        exts = self.__class__.template_ext 

        if ( 

                isinstance(content, six.string_types) and 

                any([content.endswith(ext) for ext in exts])): 

            return jinja_env.get_template(content).render(**context) 

        else: 

            return self.render_template_from_field(content, context, jinja_env) 

 

    def prepare_template(self): 

        ''' 

        Hook that is triggered after the templated fields get replaced 

        by their content. If you need your operator to alter the 

        content of the file before the template is rendered, 

        it should override this method to do so. 

        ''' 

        pass 

 

    def resolve_template_files(self): 

        # Getting the content of files for template_field / template_ext 

        for attr in self.template_fields: 

            content = getattr(self, attr) 

            if (content and isinstance(content, six.string_types) and 

                    any([content.endswith(ext) for ext in self.template_ext])): 

                env = self.dag.get_template_env() 

                try: 

                    setattr(self, attr, env.loader.get_source(env, content)[0]) 

                except Exception as e: 

                    logging.exception(e) 

        self.prepare_template() 

 

    @property 

    def upstream_list(self): 

        """@property: list of tasks directly upstream""" 

        return self._upstream_list 

 

    @property 

    def downstream_list(self): 

        """@property: list of tasks directly downstream""" 

        return self._downstream_list 

 

    def clear( 

            self, start_date=None, end_date=None, 

            upstream=False, downstream=False): 

        """ 

        Clears the state of task instances associated with the task, following 

        the parameters specified. 

        """ 

        session = settings.Session() 

 

        TI = TaskInstance 

        qry = session.query(TI).filter(TI.dag_id == self.dag_id) 

 

        if start_date: 

            qry = qry.filter(TI.execution_date >= start_date) 

        if end_date: 

            qry = qry.filter(TI.execution_date <= end_date) 

 

        tasks = [self.task_id] 

 

        if upstream: 

            tasks += \ 

                [t.task_id for t in self.get_flat_relatives(upstream=True)] 

 

        if downstream: 

            tasks += \ 

                [t.task_id for t in self.get_flat_relatives(upstream=False)] 

 

        qry = qry.filter(TI.task_id.in_(tasks)) 

 

        count = qry.count() 

        clear_task_instances(qry, session) 

 

        session.commit() 

        session.close() 

        return count 

 

    def get_task_instances(self, session, start_date=None, end_date=None): 

        """ 

        Get a set of task instance related to this task for a specific date 

        range. 

        """ 

        TI = TaskInstance 

        end_date = end_date or datetime.now() 

        return session.query(TI).filter( 

            TI.dag_id == self.dag_id, 

            TI.task_id == self.task_id, 

            TI.execution_date >= start_date, 

            TI.execution_date <= end_date, 

        ).order_by(TI.execution_date).all() 

 

    def get_flat_relatives(self, upstream=False, l=None): 

        """ 

        Get a flat list of relatives, either upstream or downstream. 

        """ 

        if not l: 

            l = [] 

        for t in self.get_direct_relatives(upstream): 

            if not utils.is_in(t, l): 

                l.append(t) 

                t.get_flat_relatives(upstream, l) 

        return l 

 

    def detect_downstream_cycle(self, task=None): 

        """ 

        When invoked, this routine will raise an exception if a cycle is 

        detected downstream from self. It is invoked when tasks are added to 

        the DAG to detect cycles. 

        """ 

        if not task: 

            task = self 

        for t in self.get_direct_relatives(): 

            if task is t: 

                msg = "Cycle detect in DAG. Faulty task: {0}".format(task) 

                raise AirflowException(msg) 

            else: 

                t.detect_downstream_cycle(task=task) 

        return False 

 

    def run( 

            self, start_date=None, end_date=None, ignore_dependencies=False, 

            force=False, mark_success=False): 

        """ 

        Run a set of task instances for a date range. 

        """ 

        start_date = start_date or self.start_date 

        end_date = end_date or self.end_date or datetime.now() 

 

        for dt in self.dag.date_range(start_date, end_date=end_date): 

            TaskInstance(self, dt).run( 

                mark_success=mark_success, 

                ignore_dependencies=ignore_dependencies, 

                force=force,) 

 

    def dry_run(self): 

        logging.info('Dry run') 

        for attr in self.template_fields: 

            content = getattr(self, attr) 

            if content and isinstance(content, six.string_types): 

                logging.info('Rendering template for {0}'.format(attr)) 

                logging.info(content) 

 

 

    def get_direct_relatives(self, upstream=False): 

        """ 

        Get the direct relatives to the current task, upstream or 

        downstream. 

        """ 

        if upstream: 

            return self.upstream_list 

        else: 

            return self.downstream_list 

 

    def __repr__(self): 

        return "<Task({self.__class__.__name__}): {self.task_id}>".format(self=self) 

 

    @property 

    def task_type(self): 

        return self.__class__.__name__ 

 

    def append_only_new(self, l, item): 

        if any([item is t for t in l]): 

            raise AirflowException( 

                'Dependency {self}, {item} already registered' 

                ''.format(**locals())) 

        else: 

            l.append(item) 

 

    def _set_relatives(self, task_or_task_list, upstream=False): 

        try: 

            task_list = list(task_or_task_list) 

        except TypeError: 

            task_list = [task_or_task_list] 

        for task in task_list: 

            if not isinstance(task, BaseOperator): 

                raise AirflowException('Expecting a task') 

            if upstream: 

                self.append_only_new(task._downstream_list, self) 

                self.append_only_new(self._upstream_list, task) 

            else: 

                self.append_only_new(task._upstream_list, self) 

                self.append_only_new(self._downstream_list, task) 

        self.detect_downstream_cycle() 

 

    def set_downstream(self, task_or_task_list): 

        """ 

        Set a task, or a task task to be directly downstream from the current 

        task. 

        """ 

        self._set_relatives(task_or_task_list, upstream=False) 

 

    def set_upstream(self, task_or_task_list): 

        """ 

        Set a task, or a task task to be directly upstream from the current 

        task. 

        """ 

        self._set_relatives(task_or_task_list, upstream=True) 

 

    def xcom_push( 

            self, 

            context, 

            key, 

            value, 

            execution_date=None): 

        """ 

        See TaskInstance.xcom_push() 

        """ 

        context['ti'].xcom_push( 

            key=key, 

            value=value, 

            execution_date=execution_date) 

 

    def xcom_pull( 

            self, 

            context, 

            task_ids, 

            dag_id=None, 

            key=XCOM_RETURN_KEY, 

            include_prior_dates=None): 

        """ 

        See TaskInstance.xcom_pull() 

        """ 

        return context['ti'].xcom_pull( 

            key=key, 

            task_ids=task_ids, 

            dag_id=dag_id, 

            include_prior_dates=include_prior_dates) 

 

 

class DagModel(Base): 

 

    __tablename__ = "dag" 

    """ 

    These items are stored in the database for state related information 

    """ 

    dag_id = Column(String(ID_LEN), primary_key=True) 

    # A DAG can be paused from the UI / DB 

    is_paused = Column(Boolean, default=False) 

    # Whether the DAG is a subdag 

    is_subdag = Column(Boolean, default=False) 

    # Whether that DAG was seen on the last DagBag load 

    is_active = Column(Boolean, default=False) 

    # Last time the scheduler started 

    last_scheduler_run = Column(DateTime) 

    # Last time this DAG was pickled 

    last_pickled = Column(DateTime) 

    # When the DAG received a refreshed signal last, used to know when 

    # we need to force refresh 

    last_expired = Column(DateTime) 

    # Whether (one  of) the scheduler is scheduling this DAG at the moment 

    scheduler_lock = Column(Boolean) 

    # Foreign key to the latest pickle_id 

    pickle_id = Column(Integer) 

    # The location of the file containing the DAG object 

    fileloc = Column(String(2000)) 

    # String representing the owners 

    owners = Column(String(2000)) 

 

    def __repr__(self): 

        return "<DAG: {self.dag_id}>".format(self=self) 

 

    @classmethod 

    def get_current(cls, dag_id): 

        session = settings.Session() 

        obj = session.query(cls).filter(cls.dag_id == dag_id).first() 

        session.expunge_all() 

        session.commit() 

        session.close() 

        return obj 

 

 

@functools.total_ordering 

class DAG(object): 

    """ 

    A dag (directed acyclic graph) is a collection of tasks with directional 

    dependencies. A dag also has a schedule, a start end an end date 

    (optional). For each schedule, (say daily or hourly), the DAG needs to run 

    each individual tasks as their dependencies are met. Certain tasks have 

    the property of depending on their own past, meaning that they can't run 

    until their previous schedule (and upstream tasks) are completed. 

 

    DAGs essentially act as namespaces for tasks. A task_id can only be 

    added once to a DAG. 

 

    :param dag_id: The id of the DAG 

    :type dag_id: string 

    :param schedule_interval: Defines how often that DAG runs, this 

        timedelta object gets added to your latest task instance's 

        execution_date to figure out the next schedule 

    :type schedule_interval: datetime.timedelta or 

        dateutil.relativedelta.relativedelta or str that acts as a cron 

        expression 

    :param start_date: The timestamp from which the scheduler will 

        attempt to backfill 

    :type start_date: datetime.datetime 

    :param end_date: A date beyond which your DAG won't run, leave to None 

        for open ended scheduling 

    :type end_date: datetime.datetime 

    :param template_searchpath: This list of folders (non relative) 

        defines where jinja will look for your templates. Order matters. 

        Note that jinja/airflow includes the path of your DAG file by 

        default 

    :type template_searchpath: string or list of stings 

    :param user_defined_macros: a dictionary of macros that will be exposed 

        in your jinja templates. For example, passing ``dict(foo='bar')`` 

        to this argument allows you to ``{{ foo }}`` in all jinja 

        templates related to this DAG. Note that you can pass any 

        type of object here. 

    :type user_defined_macros: dict 

    :param default_args: A dictionary of default parameters to be used 

        as constructor keyword parameters when initialising operators. 

        Note that operators have the same hook, and precede those defined 

        here, meaning that if your dict contains `'depends_on_past': True` 

        here and `'depends_on_past': False` in the operator's call 

        `default_args`, the actual value will be `False`. 

    :type default_args: dict 

    :param params: a dictionary of DAG level parameters that are made 

        accessible in templates, namespaced under `params`. These 

        params can be overridden at the task level. 

    :type params: dict 

    :param concurrency: the number of task instances allowed to run 

        concurrently 

    :type concurrency: int 

    """ 

 

    def __init__( 

            self, dag_id, 

            schedule_interval=timedelta(days=1), 

            start_date=None, end_date=None, 

            full_filepath=None, 

            template_searchpath=None, 

            user_defined_macros=None, 

            default_args=None, 

            concurrency=configuration.getint('core', 'dag_concurrency'), 

            params=None): 

 

        self.user_defined_macros = user_defined_macros 

        self.default_args = default_args or {} 

        self.params = params 

        utils.validate_key(dag_id) 

        self.tasks = [] 

        self.dag_id = dag_id 

        self.start_date = start_date 

        self.end_date = end_date or datetime.now() 

        self.schedule_interval = schedule_interval 

        if schedule_interval in utils.cron_presets: 

            self._schedule_interval = utils.cron_presets.get(schedule_interval) 

        elif schedule_interval == '@once': 

            self._schedule_interval = None 

        else: 

            self._schedule_interval = schedule_interval 

        self.full_filepath = full_filepath if full_filepath else '' 

        if isinstance(template_searchpath, six.string_types): 

            template_searchpath = [template_searchpath] 

        self.template_searchpath = template_searchpath 

        self.parent_dag = None  # Gets set when DAGs are loaded 

        self.last_loaded = datetime.now() 

        self.safe_dag_id = dag_id.replace('.', '__dot__') 

        self.concurrency = concurrency 

 

        self._comps = { 

            'dag_id', 

            'tasks', 

            'parent_dag', 

            'start_date', 

            'schedule_interval', 

            'full_filepath', 

            'template_searchpath', 

            'last_loaded', 

        } 

 

    def __repr__(self): 

        return "<DAG: {self.dag_id}>".format(self=self) 

 

    def __eq__(self, other): 

        return ( 

            type(self) == type(other) and 

            all(self.__dict__.get(c, None) == other.__dict__.get(c, None) 

                for c in self._comps)) 

 

    def __neq__(self, other): 

        return not self == other 

 

    def __lt__(self, other): 

        return self.dag_id < other.dag_id 

 

    def __hash__(self): 

        hash_components = [type(self)] 

        for c in self._comps: 

            val = getattr(self, c, None) 

            try: 

                hash(val) 

                hash_components.append(val) 

            except TypeError: 

                hash_components.append(repr(val)) 

        return hash(tuple(hash_components)) 

 

    def date_range(self, start_date, num=None, end_date=datetime.now()): 

        if num: 

            end_date = None 

        return utils.date_range( 

            start_date=start_date, end_date=end_date, 

            num=num, delta=self._schedule_interval) 

 

    def following_schedule(self, dttm): 

        if isinstance(self._schedule_interval, six.string_types): 

            cron = croniter(self._schedule_interval, dttm) 

            return cron.get_next(datetime) 

        elif isinstance(self._schedule_interval, timedelta): 

            return dttm + self._schedule_interval 

 

    def previous_schedule(self, dttm): 

        if isinstance(self._schedule_interval, six.string_types): 

            cron = croniter(self._schedule_interval, dttm) 

            return cron.get_prev(datetime) 

        elif isinstance(self._schedule_interval, timedelta): 

            return dttm - self._schedule_interval 

 

    @property 

    def task_ids(self): 

        return [t.task_id for t in self.tasks] 

 

    @property 

    def filepath(self): 

        """ 

        File location of where the dag object is instantiated 

        """ 

        fn = self.full_filepath.replace(DAGS_FOLDER + '/', '') 

        fn = fn.replace(os.path.dirname(__file__) + '/', '') 

        return fn 

 

    @property 

    def folder(self): 

        """ 

        Folder location of where the dag object is instantiated 

        """ 

        return os.path.dirname(self.full_filepath) 

 

    @property 

    def owner(self): 

        return ", ".join(list(set([t.owner for t in self.tasks]))) 

 

    @property 

    @provide_session 

    def concurrency_reached(self, session=None): 

        """ 

        Returns a boolean as to whether the concurrency limit for this DAG 

        has been reached 

        """ 

        TI = TaskInstance 

        qry = session.query(func.count(TI)).filter( 

            TI.dag_id == self.dag_id, 

            TI.task_id.in_(self.task_ids), 

            TI.state == State.RUNNING, 

        ) 

        return qry.scalar() >= self.concurrency 

 

    @property 

    def latest_execution_date(self): 

        """ 

        Returns the latest date for which at least one task instance exists 

        """ 

        TI = TaskInstance 

        session = settings.Session() 

        execution_date = session.query(func.max(TI.execution_date)).filter( 

            TI.dag_id == self.dag_id, 

            TI.task_id.in_(self.task_ids) 

        ).scalar() 

        session.commit() 

        session.close() 

        return execution_date 

 

    @property 

    def subdags(self): 

        """ 

        Returns a list of the subdag objects associated to this DAG 

        """ 

        # Late import to prevent circular imports 

        from airflow.operators import SubDagOperator 

        l = [] 

        for task in self.tasks: 

            if isinstance(task, SubDagOperator): 

                l.append(task.subdag) 

                l += task.subdag.subdags 

        return l 

 

    def get_active_runs(self): 

        """ 

        Maintains and returns the currently active runs as a list of dates 

        """ 

        TI = TaskInstance 

        session =  settings.Session() 

        active_dates = [] 

        active_runs = ( 

            session.query(DagRun) 

            .filter( 

                DagRun.dag_id == self.dag_id, 

                DagRun.state == State.RUNNING) 

            .all() 

        ) 

        for run in active_runs: 

            logging.info("Checking state for {}".format(run)) 

            task_instances = session.query(TI).filter( 

                TI.dag_id == run.dag_id, 

                TI.task_id.in_(self.task_ids), 

                TI.execution_date == run.execution_date, 

            ).all() 

            if len(task_instances) == len(self.tasks): 

                task_states = [ti.state for ti in task_instances] 

                if State.FAILED in task_states: 

                    logging.info('Marking run {} failed'.format(run)) 

                    run.state = State.FAILED 

                elif len( 

                    set(task_states) | 

                    set([State.SUCCESS, State.SKIPPED]) 

                ) == 2: 

                    logging.info('Marking run {} successful'.format(run)) 

                    run.state = State.SUCCESS 

                else: 

                    active_dates.append(run.execution_date) 

            else: 

                active_dates.append(run.execution_date) 

        session.commit() 

        return active_dates 

 

    def resolve_template_files(self): 

        for t in self.tasks: 

            t.resolve_template_files() 

 

    def crawl_for_tasks(objects): 

        """ 

        Typically called at the end of a script by passing globals() as a 

        parameter. This allows to not explicitly add every single task to the 

        dag explicitly. 

        """ 

        raise NotImplementedError("") 

 

    def override_start_date(self, start_date): 

        """ 

        Sets start_date of all tasks and of the DAG itself to a certain date. 

        This is used by BackfillJob. 

        """ 

        for t in self.tasks: 

            t.start_date = start_date 

        self.start_date = start_date 

 

    def get_template_env(self): 

        ''' 

        Returns a jinja2 Environment while taking into account the DAGs 

        template_searchpath and user_defined_macros 

        ''' 

        searchpath = [self.folder] 

        if self.template_searchpath: 

            searchpath += self.template_searchpath 

 

        env = jinja2.Environment( 

            loader=jinja2.FileSystemLoader(searchpath), 

            extensions=["jinja2.ext.do"], 

            cache_size=0) 

        if self.user_defined_macros: 

            env.globals.update(self.user_defined_macros) 

 

        return env 

 

    def set_dependency(self, upstream_task_id, downstream_task_id): 

        """ 

        Simple utility method to set dependency between two tasks that 

        already have been added to the DAG using add_task() 

        """ 

        self.get_task(upstream_task_id).set_downstream( 

            self.get_task(downstream_task_id)) 

 

    def get_task_instances( 

            self, session, start_date=None, end_date=None, state=None): 

        TI = TaskInstance 

        if not start_date: 

            start_date = (datetime.today()-timedelta(30)).date() 

            start_date = datetime.combine(start_date, datetime.min.time()) 

        if not end_date: 

            end_date = datetime.now() 

        tis = session.query(TI).filter( 

            TI.dag_id == self.dag_id, 

            TI.execution_date >= start_date, 

            TI.execution_date <= end_date, 

            TI.task_id.in_([t.task_id for t in self.tasks]), 

        ) 

        if state: 

            tis = tis.filter(TI.state == state) 

        tis = tis.all() 

        return tis 

 

    @property 

    def roots(self): 

        return [t for t in self.tasks if not t.downstream_list] 

 

    @provide_session 

    def set_dag_runs_state( 

            self, start_date, end_date, state=State.RUNNING, session=None): 

        dates = utils.date_range(start_date, end_date) 

        drs = session.query(DagModel).filter_by(dag_id=self.dag_id).all() 

        for dr in drs: 

            dr.state = State.RUNNING 

 

    def clear( 

            self, start_date=None, end_date=None, 

            only_failed=False, 

            only_running=False, 

            confirm_prompt=False, 

            include_subdags=True, 

            reset_dag_runs=True, 

            dry_run=False): 

        session = settings.Session() 

        """ 

        Clears a set of task instances associated with the current dag for 

        a specified date range. 

        """ 

        TI = TaskInstance 

        tis = session.query(TI) 

        if include_subdags: 

            # Crafting the right filter for dag_id and task_ids combo 

            conditions = [] 

            for dag in self.subdags + [self]: 

                conditions.append( 

                    TI.dag_id.like(dag.dag_id) & TI.task_id.in_(dag.task_ids) 

                ) 

            tis = tis.filter(or_(*conditions)) 

        else: 

            tis = session.query(TI).filter(TI.dag_id == self.dag_id) 

            tis = tis.filter(TI.task_id.in_(self.task_ids)) 

 

        if start_date: 

            tis = tis.filter(TI.execution_date >= start_date) 

        if end_date: 

            tis = tis.filter(TI.execution_date <= end_date) 

        if only_failed: 

            tis = tis.filter(TI.state == State.FAILED) 

        if only_running: 

            tis = tis.filter(TI.state == State.RUNNING) 

 

        if dry_run: 

            tis = tis.all() 

            session.expunge_all() 

            return tis 

 

        count = tis.count() 

        do_it = True 

        if count == 0: 

            print("Nothing to clear.") 

            return 0 

        if confirm_prompt: 

            ti_list = "\n".join([str(t) for t in tis]) 

            question = ( 

                "You are about to delete these {count} tasks:\n" 

                "{ti_list}\n\n" 

                "Are you sure? (yes/no): ").format(**locals()) 

            do_it = utils.ask_yesno(question) 

 

        if do_it: 

            clear_task_instances(tis, session) 

            if reset_dag_runs: 

                self.set_dag_runs_state(start_date, end_date, session=session) 

        else: 

            count = 0 

            print("Bail. Nothing was cleared.") 

 

        session.commit() 

        session.close() 

        return count 

 

    def __deepcopy__(self, memo): 

        # Swiwtcharoo to go around deepcopying objects coming through the 

        # backdoor 

        cls = self.__class__ 

        result = cls.__new__(cls) 

        memo[id(self)] = result 

        for k, v in list(self.__dict__.items()): 

            if k not in ('user_defined_macros', 'params'): 

                setattr(result, k, copy.deepcopy(v, memo)) 

 

        result.user_defined_macros = self.user_defined_macros 

        result.params = self.params 

        return result 

 

    def sub_dag( 

            self, task_regex, 

            include_downstream=False, include_upstream=True): 

        """ 

        Returns a subset of the current dag as a deep copy of the current dag 

        based on a regex that should match one or many tasks, and includes 

        upstream and downstream neighbours based on the flag passed. 

        """ 

 

        dag = copy.deepcopy(self) 

 

        regex_match = [ 

            t for t in dag.tasks if re.findall(task_regex, t.task_id)] 

        also_include = [] 

        for t in regex_match: 

            if include_downstream: 

                also_include += t.get_flat_relatives(upstream=False) 

            if include_upstream: 

                also_include += t.get_flat_relatives(upstream=True) 

        # Compiling the unique list of tasks that made the cut 

        tasks = list(set(regex_match + also_include)) 

        dag.tasks = tasks 

        for t in dag.tasks: 

            # Removing upstream/downstream references to tasks that did not 

            # made the cut 

            t._upstream_list = [ 

                ut for ut in t._upstream_list if utils.is_in(ut, tasks)] 

            t._downstream_list = [ 

                ut for ut in t._downstream_list if utils.is_in(ut, tasks)] 

 

        return dag 

 

    def has_task(self, task_id): 

        return task_id in (t.task_id for t in self.tasks) 

 

    def get_task(self, task_id): 

        for task in self.tasks: 

            if task.task_id == task_id: 

                return task 

        raise AirflowException("Task {task_id} not found".format(**locals())) 

 

    def pickle(self, main_session=None): 

        session = main_session or settings.Session() 

        dag = session.query( 

            DagModel).filter(DagModel.dag_id == self.dag_id).first() 

        dp = None 

        if dag and dag.pickle_id: 

            dp = session.query(DagPickle).filter( 

                DagPickle.id == dag.pickle_id).first() 

        if not dp or dp.pickle != self: 

            dp = DagPickle(dag=self) 

            session.add(dp) 

            self.last_pickled = datetime.now() 

            session.commit() 

            self.pickle_id = dp.id 

 

        if not main_session: 

            session.close() 

        return dp 

 

    def tree_view(self): 

        """ 

        Shows an ascii tree representation of the DAG 

        """ 

        def get_downstream(task, level=0): 

            print((" " * level * 4) + str(task)) 

            level += 1 

            for t in task.upstream_list: 

                get_downstream(t, level) 

 

        for t in self.roots: 

            get_downstream(t) 

 

    def add_task(self, task): 

        ''' 

        Add a task to the DAG 

 

        :param task: the task you want to add 

        :type task: task 

        ''' 

        if not self.start_date and not task.start_date: 

            raise AirflowException("Task is missing the start_date parameter") 

        if not task.start_date: 

            task.start_date = self.start_date 

 

        if task.task_id in [t.task_id for t in self.tasks]: 

            raise AirflowException( 

                "Task id '{0}' has already been added " 

                "to the DAG ".format(task.task_id)) 

        else: 

            self.tasks.append(task) 

            task.dag_id = self.dag_id 

            task.dag = self 

        self.task_count = len(self.tasks) 

 

    def add_tasks(self, tasks): 

        ''' 

        Add a list of tasks to the DAG 

 

        :param task: a lit of tasks you want to add 

        :type task: list of tasks 

        ''' 

        for task in tasks: 

            self.add_task(task) 

 

    def db_merge(self): 

        BO = BaseOperator 

        session = settings.Session() 

        tasks = session.query(BO).filter(BO.dag_id == self.dag_id).all() 

        for t in tasks: 

            session.delete(t) 

        session.commit() 

        session.merge(self) 

        session.commit() 

 

    def run( 

            self, start_date=None, end_date=None, mark_success=False, 

            include_adhoc=False, local=False, executor=None, 

            donot_pickle=configuration.getboolean('core', 'donot_pickle'), 

            ignore_dependencies=False, 

            pool=None): 

        from airflow.jobs import BackfillJob 

        if not executor and local: 

            executor = LocalExecutor() 

        elif not executor: 

            executor = DEFAULT_EXECUTOR 

        job = BackfillJob( 

            self, 

            start_date=start_date, 

            end_date=end_date, 

            mark_success=mark_success, 

            include_adhoc=include_adhoc, 

            executor=executor, 

            donot_pickle=donot_pickle, 

            ignore_dependencies=ignore_dependencies, 

            pool=pool) 

        job.run() 

 

 

class Chart(Base): 

    __tablename__ = "chart" 

 

    id = Column(Integer, primary_key=True) 

    label = Column(String(200)) 

    conn_id = Column(String(ID_LEN), nullable=False) 

    user_id = Column(Integer(), ForeignKey('user.id'), nullable=True) 

    chart_type = Column(String(100), default="line") 

    sql_layout = Column(String(50), default="series") 

    sql = Column(Text, default="SELECT series, x, y FROM table") 

    y_log_scale = Column(Boolean) 

    show_datatable = Column(Boolean) 

    show_sql = Column(Boolean, default=True) 

    height = Column(Integer, default=600) 

    default_params = Column(String(5000), default="{}") 

    owner = relationship( 

        "User", cascade=False, cascade_backrefs=False, backref='charts') 

    x_is_date = Column(Boolean, default=True) 

    iteration_no = Column(Integer, default=0) 

    last_modified = Column(DateTime, default=datetime.now()) 

 

    def __repr__(self): 

        return self.label 

 

 

class KnownEventType(Base): 

    __tablename__ = "known_event_type" 

 

    id = Column(Integer, primary_key=True) 

    know_event_type = Column(String(200)) 

 

    def __repr__(self): 

        return self.know_event_type 

 

 

class KnownEvent(Base): 

    __tablename__ = "known_event" 

 

    id = Column(Integer, primary_key=True) 

    label = Column(String(200)) 

    start_date = Column(DateTime) 

    end_date = Column(DateTime) 

    user_id = Column(Integer(), ForeignKey('user.id'),) 

    known_event_type_id = Column(Integer(), ForeignKey('known_event_type.id'),) 

    reported_by = relationship( 

        "User", cascade=False, cascade_backrefs=False, backref='known_events') 

    event_type = relationship( 

        "KnownEventType", 

        cascade=False, 

        cascade_backrefs=False, backref='known_events') 

    description = Column(Text) 

 

    def __repr__(self): 

        return self.label 

 

 

class Variable(Base): 

    __tablename__ = "variable" 

 

    id = Column(Integer, primary_key=True) 

    key = Column(String(ID_LEN), unique=True) 

    val = Column(Text) 

 

    def __repr__(self): 

        return '{} : {}'.format(self.key, self.val) 

 

    @classmethod 

    @provide_session 

    def get(cls, key, default_var=None, deserialize_json=False, session=None): 

        obj = session.query(cls).filter(cls.key == key).first() 

        if obj is None: 

            if default_var is not None: 

                v = default_var 

            else: 

                raise ValueError('Variable {} does not exist'.format(key)) 

        else: 

            v = obj.val 

        if deserialize_json and v: 

            v = json.loads(v) 

        return v 

 

 

class XCom(Base): 

    """ 

    Base class for XCom objects. 

    """ 

    __tablename__ = "xcom" 

 

    id = Column(Integer, primary_key=True) 

    key = Column(String(512)) 

    value = Column(PickleType(pickler=dill)) 

    timestamp = Column( 

        DateTime, default=func.now(), nullable=False) 

    execution_date = Column(DateTime, nullable=False) 

 

    # source information 

    task_id = Column(String(ID_LEN), nullable=False) 

    dag_id = Column(String(ID_LEN), nullable=False) 

 

    def __repr__(self): 

        return '<XCom "{key}" ({task_id} @ {execution_date})>'.format( 

            key=self.key, 

            task_id=self.task_id, 

            execution_date=self.execution_date) 

 

    @classmethod 

    @provide_session 

    def set( 

            cls, 

            key, 

            value, 

            execution_date, 

            task_id, 

            dag_id, 

            session=None): 

        """ 

        Store an XCom value. 

        """ 

        session.expunge_all() 

 

        # remove any duplicate XComs 

        session.query(cls).filter( 

            cls.key == key, 

            cls.execution_date == execution_date, 

            cls.task_id == task_id, 

            cls.dag_id == dag_id).delete() 

 

        # insert new XCom 

        session.add(XCom( 

            key=key, 

            value=value, 

            execution_date=execution_date, 

            task_id=task_id, 

            dag_id=dag_id)) 

 

        session.commit() 

 

    @classmethod 

    @provide_session 

    def get_one( 

            cls, 

            execution_date, 

            key=None, 

            task_id=None, 

            dag_id=None, 

            include_prior_dates=False, 

            session=None): 

        """ 

        Retrieve an XCom value, optionally meeting certain criteria 

        """ 

        filters = [] 

        if key: 

            filters.append(cls.key == key) 

        if task_id: 

            filters.append(cls.task_id == task_id) 

        if dag_id: 

            filters.append(cls.dag_id == dag_id) 

        if include_prior_dates: 

            filters.append(cls.execution_date <= execution_date) 

        else: 

            filters.append(cls.execution_date == execution_date) 

 

        query = ( 

            session.query(cls.value) 

            .filter(and_(*filters)) 

            .order_by(cls.execution_date.desc(), cls.timestamp.desc()) 

            .limit(1)) 

 

        result = query.first() 

        if result: 

            return result.value 

 

    @classmethod 

    @provide_session 

    def get_many( 

            cls, 

            execution_date, 

            key=None, 

            task_ids=None, 

            dag_ids=None, 

            include_prior_dates=False, 

            limit=100, 

            session=None): 

        """ 

        Retrieve an XCom value, optionally meeting certain criteria 

        """ 

        filters = [] 

        if key: 

            filters.append(cls.key == key) 

        if task_ids: 

            filters.append(cls.task_id.in_(as_tuple(task_ids))) 

        if dag_ids: 

            filters.append(cls.dag_id.in_(as_tuple(dag_ids))) 

        if include_prior_dates: 

            filters.append(cls.execution_date <= execution_date) 

        else: 

            filters.append(cls.execution_date == execution_date) 

 

        query = ( 

            session.query(cls) 

            .filter(and_(*filters)) 

            .order_by(cls.execution_date.desc(), cls.timestamp.desc()) 

            .limit(limit)) 

 

        return query.all() 

 

    @classmethod 

    @provide_session 

    def delete(cls, xcoms, session=None): 

        if isinstance(xcoms, XCom): 

            xcoms = [xcoms] 

        for xcom in xcoms: 

            if not isinstance(xcom, XCom): 

                raise TypeError( 

                    'Expected XCom; received {}'.format(type(xcom))) 

            session.delete(xcom) 

        session.commit() 

 

 

class DagRun(Base): 

    """ 

    DagRun describes an instance of a Dag. It can be created 

    by the scheduler (for regular runs) or by an external trigger 

    """ 

    __tablename__ = "dag_run" 

 

    id = Column(Integer, primary_key=True) 

    dag_id = Column(String(ID_LEN)) 

    execution_date = Column(DateTime, default=datetime.now()) 

    state = Column(String(50), default=State.RUNNING) 

    run_id = Column(String(ID_LEN)) 

    external_trigger = Column(Boolean, default=True) 

    conf = Column(PickleType) 

 

    __table_args__ = ( 

        Index('dr_run_id', dag_id, run_id, unique=True), 

    ) 

 

    def __repr__(self): 

        return ( 

            '<DagRun {dag_id} @ {execution_date}: {run_id}, ' 

            'externally triggered: {external_trigger}>' 

        ).format( 

            dag_id=self.dag_id, 

            execution_date=self.execution_date, 

            run_id=self.run_id, 

            external_trigger=self.external_trigger) 

 

 

class Pool(Base): 

    __tablename__ = "slot_pool" 

 

    id = Column(Integer, primary_key=True) 

    pool = Column(String(50), unique=True) 

    slots = Column(Integer, default=0) 

    description = Column(Text) 

 

    def __repr__(self): 

        return self.pool 

 

    @provide_session 

    def used_slots(self, session): 

        """ 

        Returns the number of slots used at the moment 

        """ 

        running = ( 

            session 

            .query(TaskInstance) 

            .filter(TaskInstance.pool == self.pool) 

            .filter(TaskInstance.state == State.RUNNING) 

            .count() 

        ) 

        return running 

 

    @provide_session 

    def queued_slots(self, session): 

        """ 

        Returns the number of slots used at the moment 

        """ 

        return ( 

            session 

            .query(TaskInstance) 

            .filter(TaskInstance.pool == self.pool) 

            .filter(TaskInstance.state == State.QUEUED) 

            .count() 

        ) 

 

    @provide_session 

    def open_slots(self, session): 

        """ 

        Returns the number of slots open at the moment 

        """ 

        used_slots = self.used_slots(session=session) 

        return self.slots - used_slots 

 

 

class SlaMiss(Base): 

    """ 

    Model that stores a history of the SLA that have been missed. 

    It is used to keep track of SLA failures over time and to avoid double 

    triggering alert emails. 

    """ 

    __tablename__ = "sla_miss" 

 

    task_id = Column(String(ID_LEN), primary_key=True) 

    dag_id = Column(String(ID_LEN), primary_key=True) 

    execution_date = Column(DateTime, primary_key=True) 

    email_sent = Column(Boolean, default=False) 

    timestamp = Column(DateTime) 

    description = Column(Text) 

 

    def __repr__(self): 

        return str(( 

            self.dag_id, self.task_id, self.execution_date.isoformat())) 

 

 

class ImportError(Base): 

    __tablename__ = "import_error" 

    id = Column(Integer, primary_key=True) 

    timestamp = Column(DateTime) 

    filename = Column(String(1024)) 

    stacktrace = Column(Text)